import os import time import imaplib import email import json import requests import logging import sys import traceback import re # ---------- Logging setup ---------- LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() logging.basicConfig( level=getattr(logging, LOG_LEVEL, logging.INFO), format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) logger = logging.getLogger("mail-classifier") # ---------- Env variables ---------- IMAP_HOST = os.environ.get("IMAP_HOST", "mailu-front.mailu.svc") IMAP_PORT = int(os.environ.get("IMAP_PORT", "993")) IMAP_USER = os.environ.get("IMAP_USER") IMAP_PASS = os.environ.get("IMAP_PASS") OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://ollama-service.open-webui.svc:11434") MODEL_NAME = os.environ.get("MODEL_NAME", "mail-router") MAX_BODY_CHARS = int(os.environ.get("MAX_BODY_CHARS", "8000")) CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", "300")) # v sekundách OLLAMA_TIMEOUT = int(os.environ.get("OLLAMA_TIMEOUT", "120")) # read timeout v sekundách OLLAMA_MAX_RETRIES = int(os.environ.get("OLLAMA_MAX_RETRIES", "3")) # povolené složky (whitelist) – MUSÍ odpovídat tomu, co je v Modelfile ALLOWED_FOLDERS = { "INBOX", "INBOX.Pracovni", "INBOX.Osobni", "INBOX.Finance", "INBOX.Notifikace", "INBOX.Zpravodaje", "INBOX.SocialniSite", "INBOX.Ukoly", "INBOX.Nepodstatne", "INBOX.ZTJ", } # tvrdá pravidla podle subjectu HARDCODED_SUBJECT_RULES = [ # faktury / vyúčtování vždy do Úkoly (re.compile(r"\bfaktura\b", re.IGNORECASE), "INBOX.Ukoly"), (re.compile(r"\bvyúčtován[íi]\b|\bvyuctovan[íi]\b", re.IGNORECASE), "INBOX.Ukoly"), (re.compile(r"\bdaňov[ýy]\s+doklad\b", re.IGNORECASE), "INBOX.Ukoly"), (re.compile(r"\binvoice\b", re.IGNORECASE), "INBOX.Ukoly"), ] # ZTJ – klíčová slova v subjectu HARDCODED_ZTJ_SUBJECT = re.compile( r"\b(Život trochu jinak|zivot trochu jinak|ZTJ)\b", re.IGNORECASE, ) def log_config(): """Vypíše aktuální konfiguraci (bez hesla).""" logger.info("=== mail-classifier configuration ===") logger.info(f"IMAP_HOST = {IMAP_HOST}") logger.info(f"IMAP_PORT = {IMAP_PORT}") logger.info(f"IMAP_USER = {IMAP_USER}") logger.info("IMAP_PASS = **** (hidden)") logger.info(f"OLLAMA_URL = {OLLAMA_URL}") logger.info(f"MODEL_NAME = {MODEL_NAME}") logger.info(f"MAX_BODY_CHARS = {MAX_BODY_CHARS}") logger.info(f"CHECK_INTERVAL = {CHECK_INTERVAL} s") logger.info(f"OLLAMA_TIMEOUT = {OLLAMA_TIMEOUT} s") logger.info(f"OLLAMA_MAX_RETRIES = {OLLAMA_MAX_RETRIES}") logger.info(f"LOG_LEVEL = {LOG_LEVEL}") logger.info(f"ALLOWED_FOLDERS = {sorted(ALLOWED_FOLDERS)}") logger.info("====================================") # ---------- IMAP helpers ---------- def connect_imap(): logger.info(f"Connecting to IMAP {IMAP_HOST}:{IMAP_PORT} as {IMAP_USER}") if not IMAP_USER or not IMAP_PASS: logger.error("IMAP_USER or IMAP_PASS is not set! Exiting.") raise RuntimeError("Missing IMAP credentials") m = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT) typ, data = m.login(IMAP_USER, IMAP_PASS) if typ != "OK": logger.error(f"IMAP login failed: {typ} {data}") raise RuntimeError("IMAP login failed") logger.info("IMAP login successful") return m def get_unseen_messages(imap_conn): typ, _ = imap_conn.select("INBOX") if typ != "OK": logger.error(f"Cannot select INBOX, got: {typ}") return [] status, data = imap_conn.search(None, "UNSEEN") if status != "OK": logger.error(f"UNSEEN search failed: {status}") return [] ids = data[0].split() logger.info(f"Found {len(ids)} unseen messages in INBOX") if ids: logger.debug(f"Unseen message IDs: {[i.decode(errors='ignore') for i in ids]}") return ids # ---------- Email to prompt ---------- def build_prompt_from_email(msg): headers = [] for h in ["From", "To", "Cc", "Subject", "Date"]: value = msg.get(h, "") headers.append(f"{h}: {value}") headers_text = "\n".join(headers) body_text = "" if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() disp = str(part.get("Content-Disposition") or "") if content_type == "text/plain" and "attachment" not in disp.lower(): try: part_bytes = part.get_payload(decode=True) if part_bytes is None: continue body_text += part_bytes.decode( part.get_content_charset() or "utf-8", errors="ignore", ) except Exception as e: logger.debug(f"Error decoding multipart part: {e}") continue else: try: part_bytes = msg.get_payload(decode=True) if part_bytes is not None: body_text = part_bytes.decode( msg.get_content_charset() or "utf-8", errors="ignore", ) except Exception as e: logger.debug(f"Error decoding singlepart message: {e}") body_text = "" if len(body_text) > MAX_BODY_CHARS: logger.debug(f"Body truncated from {len(body_text)} to {MAX_BODY_CHARS} chars") body_text = body_text[:MAX_BODY_CHARS] prompt = f"HEADERS:\n{headers_text}\n\nBODY:\n{body_text}" logger.debug(f"Built prompt (first 500 chars): {prompt[:500].replace(chr(10), ' ')}") return prompt # ---------- LLM warm-up ---------- def warmup_model(): """ Jednoduchý warm-up dotaz, aby si Ollama natáhla model před prvním reálným mailem. """ logger.info("Warming up Ollama model...") payload = { "model": MODEL_NAME, "stream": False, "format": "json", "messages": [ { "role": "user", "content": ( "HEADERS:\nFrom: warmup@example.com\nSubject: warmup\n\n" "BODY:\nThis is a warmup request, respond with a valid JSON " "using folder INBOX and confidence 0." ), } ], } try: r = requests.post(f"{OLLAMA_URL}/api/chat", json=payload, timeout=OLLAMA_TIMEOUT) r.raise_for_status() content = r.json().get("message", {}).get("content", "") logger.info(f"Warm-up response (first 200 chars): {content[:200].replace(chr(10), ' ')}") except Exception as e: logger.warning(f"Warm-up failed (will continue anyway): {e}") # ---------- LLM call ---------- def classify_email(prompt): payload = { "model": MODEL_NAME, "stream": False, "format": "json", "messages": [ {"role": "user", "content": prompt} ], } last_exc = None for attempt in range(1, OLLAMA_MAX_RETRIES + 1): logger.info( f"Calling model {MODEL_NAME} at {OLLAMA_URL}/api/chat " f"(attempt {attempt}/{OLLAMA_MAX_RETRIES})" ) try: r = requests.post( f"{OLLAMA_URL}/api/chat", json=payload, timeout=OLLAMA_TIMEOUT, ) r.raise_for_status() data = r.json() content = data.get("message", {}).get("content", "") logger.info( f"Model returned content (first 300 chars): " f"{content[:300].replace(chr(10), ' ')}" ) try: result = json.loads(content) except Exception as e: logger.error(f"Error parsing JSON from model content: {e}") logger.debug(f"Raw content was: {content}") raise logger.info(f"Parsed model result: {result}") return result except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: last_exc = e logger.warning(f"Ollama request failed with {type(e).__name__}: {e}") if attempt < OLLAMA_MAX_RETRIES: backoff = 5 * attempt logger.info(f"Retrying in {backoff} seconds...") time.sleep(backoff) except Exception as e: # ostatní chyby nemá smysl retryovat logger.error(f"Ollama request failed (non-retryable): {e}") logger.debug(traceback.format_exc()) raise logger.error(f"Ollama request failed after {OLLAMA_MAX_RETRIES} attempts: {last_exc}") raise last_exc or RuntimeError("Ollama request failed") def normalize_folder(result, msg): """ Vrátí cílovou složku: 1) nejdřív tvrdá pravidla (ZTJ, faktury → Úkoly), 2) pak výsledek z modelu + threshold + whitelist. """ subject = msg.get("Subject", "") or "" # ZTJ – pokud je v subjectu klíčové slovo, vždy do INBOX.ZTJ if HARDCODED_ZTJ_SUBJECT.search(subject): logger.info("Hardcoded ZTJ rule matched subject, forcing folder=INBOX.ZTJ") return "INBOX.ZTJ" # faktury / vyúčtování → Úkoly for pattern, folder in HARDCODED_SUBJECT_RULES: if pattern.search(subject): logger.info( f"Hardcoded subject rule matched pattern {pattern.pattern}, " f"forcing folder={folder}" ) return folder # jinak necháme rozhodnout model folder = result.get("folder", "INBOX") try: confidence = float(result.get("confidence", 0.0)) except Exception: confidence = 0.0 logger.info(f"Model suggested folder={folder}, confidence={confidence}") if confidence < 0.5: logger.info(f"Low confidence ({confidence}), using INBOX as fallback") return "INBOX" if folder not in ALLOWED_FOLDERS: logger.warning(f"Folder {folder} not in ALLOWED_FOLDERS, using INBOX") return "INBOX" return folder # ---------- IMAP folder operations ---------- def ensure_folder(imap_conn, folder): """ Zkontroluje existenci složky přes LIST a případně ji vytvoří + SUBSCRIBE. Nemění aktuální mailbox. """ logger.debug(f"Ensuring folder exists: {folder}") typ, mailboxes = imap_conn.list('""', f'"{folder}"') if typ == "OK" and mailboxes and mailboxes[0] is not None: logger.debug(f"Folder {folder} already exists") return logger.info(f"Folder {folder} does not exist, creating...") typ, data = imap_conn.create(folder) if typ != "OK": logger.error(f"Failed to create folder {folder}: {data}") else: logger.info(f"Folder {folder} created successfully") # pokusíme se složku i SUBSCRIBE-nout, aby ji klient (Roundcube) viděl try: st, dat = imap_conn.subscribe(folder) if st == "OK": logger.info(f"Folder {folder} subscribed successfully") else: logger.warning(f"Failed to subscribe folder {folder}: {dat}") except Exception as e: logger.warning(f"IMAP server does not support SUBSCRIBE or it failed: {e}") def move_message(imap_conn, msg_id, target_folder): msg_id_str = msg_id.decode(errors="ignore") logger.info(f"Moving message {msg_id_str} -> {target_folder}") ensure_folder(imap_conn, target_folder) typ, data = imap_conn.copy(msg_id, target_folder) if typ != "OK": logger.error(f"Failed to copy message {msg_id_str} to {target_folder}: {data}") return # neoznačujeme jako \Seen, jen mažeme ze source folderu typ, data = imap_conn.store(msg_id, "+FLAGS", "\\Deleted") if typ != "OK": logger.error(f"Failed to mark message {msg_id_str} as deleted: {data}") return typ, data = imap_conn.expunge() if typ != "OK": logger.error(f"Failed to expunge after moving message {msg_id_str}: {data}") return logger.info(f"Message {msg_id_str} moved to {target_folder} and expunged from INBOX") # ---------- Main processing ---------- def process_once(): logger.info("Starting one processing iteration") imap_conn = connect_imap() try: ids = get_unseen_messages(imap_conn) for msg_id in ids: msg_id_str = msg_id.decode(errors="ignore") logger.info(f"Processing message ID {msg_id_str}") # BODY.PEEK[] – neznačí zprávu jako \Seen typ, data = imap_conn.fetch(msg_id, "(BODY.PEEK[])") if typ != "OK": logger.error(f"Fetch failed for {msg_id_str}: {data}") continue raw_email = data[0][1] msg = email.message_from_bytes(raw_email) prompt = build_prompt_from_email(msg) try: result = classify_email(prompt) except Exception as e: logger.error(f"Error calling model for message {msg_id_str}: {e}") logger.debug(traceback.format_exc()) continue target_folder = normalize_folder(result, msg) try: move_message(imap_conn, msg_id, target_folder) except Exception as e: logger.error(f"Error moving message {msg_id_str} to {target_folder}: {e}") logger.debug(traceback.format_exc()) continue finally: logger.info("Logging out from IMAP") try: imap_conn.logout() except Exception as e: logger.warning(f"Error during IMAP logout: {e}") def main(): logger.info("mail-classifier starting up...") log_config() # warm-up modelu, aby první reálný request netimeoutoval warmup_model() while True: try: process_once() except Exception as e: logger.error(f"Error in main loop: {e}") logger.debug(traceback.format_exc()) logger.info(f"Sleeping for {CHECK_INTERVAL} seconds") time.sleep(CHECK_INTERVAL) if __name__ == "__main__": main()