diff --git a/Dockerfile b/Dockerfile index 5a1bea6..6c5312d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,7 +2,8 @@ FROM python:3.12-slim WORKDIR /app -# systémové balíčky – jen minimum +ENV PYTHONUNBUFFERED=1 + RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates \ && rm -rf /var/lib/apt/lists/* @@ -12,5 +13,5 @@ RUN pip install --no-cache-dir -r requirements.txt COPY main.py . -CMD ["python", "main.py"] +CMD ["python", "-u", "main.py"] diff --git a/kubernetes/credentials.sh b/kubernetes/credentials.sh new file mode 100755 index 0000000..b5070eb --- /dev/null +++ b/kubernetes/credentials.sh @@ -0,0 +1,4 @@ +kubectl -n mailu create secret generic mail-classifier-secret \ + --from-literal=imap_user='martin@sukany.cz' \ + --from-literal=imap_pass='treasure-Hunter' + diff --git a/kubernetes/deployment.yaml b/kubernetes/deployment.yaml new file mode 100644 index 0000000..0c358ff --- /dev/null +++ b/kubernetes/deployment.yaml @@ -0,0 +1,56 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: mail-classifier + namespace: mailu +spec: + replicas: 1 + selector: + matchLabels: + app: mail-classifier + template: + metadata: + labels: + app: mail-classifier + spec: + containers: + - name: mail-classifier + image: git.apps.sukany.cz/martin/mail-clasifier:latest + imagePullPolicy: IfNotPresent + env: + - name: IMAP_HOST + value: "mailu-front.mailu.svc" + - name: IMAP_PORT + value: "993" + - name: IMAP_USER + valueFrom: + secretKeyRef: + name: mail-classifier-secret + key: imap_user + - name: IMAP_PASS + valueFrom: + secretKeyRef: + name: mail-classifier-secret + key: imap_pass + - name: OLLAMA_URL + value: "http://ollama-service.open-webui.svc:11434" + - name: MODEL_NAME + value: "mail-router" + - name: CHECK_INTERVAL + value: "300" # 5 minut, klidně si zkrať + - name: MAX_BODY_CHARS + value: "8000" + - name: LOG_LEVEL + value: "INFO" # na ladění DEBUG + - name: OLLAMA_TIMEOUT + value: "120" # první request může být delší kvůli warm-upu + - name: OLLAMA_MAX_RETRIES + value: "3" + resources: + requests: + cpu: "100m" + memory: "128Mi" + limits: + cpu: "500m" + memory: "256Mi" + diff --git a/main.py b/main.py index 48a5dac..79b6385 100755 --- a/main.py +++ b/main.py @@ -4,87 +4,201 @@ import imaplib import email import json import requests +import logging +import sys +import traceback +import re +# ---------- Logging setup ---------- -IMAP_HOST = os.environ.get("IMAP_HOST", "imap.mailu.svc") +LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() + +logging.basicConfig( + level=getattr(logging, LOG_LEVEL, logging.INFO), + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + handlers=[logging.StreamHandler(sys.stdout)], +) +logger = logging.getLogger("mail-classifier") + +# ---------- Env variables ---------- + +IMAP_HOST = os.environ.get("IMAP_HOST", "mailu-front.mailu.svc") IMAP_PORT = int(os.environ.get("IMAP_PORT", "993")) IMAP_USER = os.environ.get("IMAP_USER") IMAP_PASS = os.environ.get("IMAP_PASS") -OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://ollama.open-webui.svc:11434") +OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://ollama-service.open-webui.svc:11434") MODEL_NAME = os.environ.get("MODEL_NAME", "mail-router") MAX_BODY_CHARS = int(os.environ.get("MAX_BODY_CHARS", "8000")) -CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", "30")) # v sekundách +CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", "300")) # v sekundách -# povolené složky (bezpečnost proti blbosti modelu) +OLLAMA_TIMEOUT = int(os.environ.get("OLLAMA_TIMEOUT", "120")) # read timeout v sekundách +OLLAMA_MAX_RETRIES = int(os.environ.get("OLLAMA_MAX_RETRIES", "3")) + +# povolené složky (whitelist) – MUSÍ odpovídat tomu, co je v Modelfile ALLOWED_FOLDERS = { "INBOX", - "INBOX.Work", - "INBOX.Family", + "INBOX.Pracovni", + "INBOX.Osobni", "INBOX.Finance", - "INBOX.Notifications", - "INBOX.Newsletters", - "INBOX.Social", - "INBOX.Todo", - "INBOX.TrashCandidates", + "INBOX.Notifikace", + "INBOX.Zpravodaje", + "INBOX.SocialniSite", + "INBOX.Ukoly", + "INBOX.Nepodstatne", + "INBOX.ZTJ", } +# tvrdá pravidla podle subjectu +HARDCODED_SUBJECT_RULES = [ + # faktury / vyúčtování vždy do Úkoly + (re.compile(r"\bfaktura\b", re.IGNORECASE), "INBOX.Ukoly"), + (re.compile(r"\bvyúčtován[íi]\b|\bvyuctovan[íi]\b", re.IGNORECASE), "INBOX.Ukoly"), + (re.compile(r"\bdaňov[ýy]\s+doklad\b", re.IGNORECASE), "INBOX.Ukoly"), + (re.compile(r"\binvoice\b", re.IGNORECASE), "INBOX.Ukoly"), +] + +# ZTJ – klíčová slova v subjectu +HARDCODED_ZTJ_SUBJECT = re.compile( + r"\b(Život trochu jinak|zivot trochu jinak|ZTJ)\b", + re.IGNORECASE, +) + + +def log_config(): + """Vypíše aktuální konfiguraci (bez hesla).""" + logger.info("=== mail-classifier configuration ===") + logger.info(f"IMAP_HOST = {IMAP_HOST}") + logger.info(f"IMAP_PORT = {IMAP_PORT}") + logger.info(f"IMAP_USER = {IMAP_USER}") + logger.info("IMAP_PASS = **** (hidden)") + logger.info(f"OLLAMA_URL = {OLLAMA_URL}") + logger.info(f"MODEL_NAME = {MODEL_NAME}") + logger.info(f"MAX_BODY_CHARS = {MAX_BODY_CHARS}") + logger.info(f"CHECK_INTERVAL = {CHECK_INTERVAL} s") + logger.info(f"OLLAMA_TIMEOUT = {OLLAMA_TIMEOUT} s") + logger.info(f"OLLAMA_MAX_RETRIES = {OLLAMA_MAX_RETRIES}") + logger.info(f"LOG_LEVEL = {LOG_LEVEL}") + logger.info(f"ALLOWED_FOLDERS = {sorted(ALLOWED_FOLDERS)}") + logger.info("====================================") + + +# ---------- IMAP helpers ---------- def connect_imap(): - print(f"Connecting to IMAP {IMAP_HOST}:{IMAP_PORT} as {IMAP_USER}") + logger.info(f"Connecting to IMAP {IMAP_HOST}:{IMAP_PORT} as {IMAP_USER}") + if not IMAP_USER or not IMAP_PASS: + logger.error("IMAP_USER or IMAP_PASS is not set! Exiting.") + raise RuntimeError("Missing IMAP credentials") + m = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT) - m.login(IMAP_USER, IMAP_PASS) + typ, data = m.login(IMAP_USER, IMAP_PASS) + if typ != "OK": + logger.error(f"IMAP login failed: {typ} {data}") + raise RuntimeError("IMAP login failed") + logger.info("IMAP login successful") return m def get_unseen_messages(imap_conn): - # vždycky jako zdrojový mailbox zvolíme INBOX typ, _ = imap_conn.select("INBOX") if typ != "OK": - print("Cannot select INBOX") + logger.error(f"Cannot select INBOX, got: {typ}") return [] - status, data = imap_conn.search(None, 'UNSEEN') + status, data = imap_conn.search(None, "UNSEEN") if status != "OK": - print("UNSEEN search failed") + logger.error(f"UNSEEN search failed: {status}") return [] ids = data[0].split() + logger.info(f"Found {len(ids)} unseen messages in INBOX") + if ids: + logger.debug(f"Unseen message IDs: {[i.decode(errors='ignore') for i in ids]}") return ids +# ---------- Email to prompt ---------- + def build_prompt_from_email(msg): headers = [] for h in ["From", "To", "Cc", "Subject", "Date"]: - headers.append(f"{h}: {msg.get(h, '')}") + value = msg.get(h, "") + headers.append(f"{h}: {value}") headers_text = "\n".join(headers) body_text = "" if msg.is_multipart(): for part in msg.walk(): - if part.get_content_type() == "text/plain": + content_type = part.get_content_type() + disp = str(part.get("Content-Disposition") or "") + if content_type == "text/plain" and "attachment" not in disp.lower(): try: - body_text += part.get_payload(decode=True).decode( + part_bytes = part.get_payload(decode=True) + if part_bytes is None: + continue + body_text += part_bytes.decode( part.get_content_charset() or "utf-8", errors="ignore", ) - except Exception: + except Exception as e: + logger.debug(f"Error decoding multipart part: {e}") continue else: try: - body_text = msg.get_payload(decode=True).decode( - msg.get_content_charset() or "utf-8", - errors="ignore", - ) - except Exception: + part_bytes = msg.get_payload(decode=True) + if part_bytes is not None: + body_text = part_bytes.decode( + msg.get_content_charset() or "utf-8", + errors="ignore", + ) + except Exception as e: + logger.debug(f"Error decoding singlepart message: {e}") body_text = "" + if len(body_text) > MAX_BODY_CHARS: + logger.debug(f"Body truncated from {len(body_text)} to {MAX_BODY_CHARS} chars") body_text = body_text[:MAX_BODY_CHARS] - return f"HEADERS:\n{headers_text}\n\nBODY:\n{body_text}" + prompt = f"HEADERS:\n{headers_text}\n\nBODY:\n{body_text}" + logger.debug(f"Built prompt (first 500 chars): {prompt[:500].replace(chr(10), ' ')}") + return prompt +# ---------- LLM warm-up ---------- + +def warmup_model(): + """ + Jednoduchý warm-up dotaz, aby si Ollama natáhla model před prvním reálným mailem. + """ + logger.info("Warming up Ollama model...") + payload = { + "model": MODEL_NAME, + "stream": False, + "format": "json", + "messages": [ + { + "role": "user", + "content": ( + "HEADERS:\nFrom: warmup@example.com\nSubject: warmup\n\n" + "BODY:\nThis is a warmup request, respond with a valid JSON " + "using folder INBOX and confidence 0." + ), + } + ], + } + try: + r = requests.post(f"{OLLAMA_URL}/api/chat", json=payload, timeout=OLLAMA_TIMEOUT) + r.raise_for_status() + content = r.json().get("message", {}).get("content", "") + logger.info(f"Warm-up response (first 200 chars): {content[:200].replace(chr(10), ' ')}") + except Exception as e: + logger.warning(f"Warm-up failed (will continue anyway): {e}") + + +# ---------- LLM call ---------- + def classify_email(prompt): payload = { "model": MODEL_NAME, @@ -92,79 +206,165 @@ def classify_email(prompt): "format": "json", "messages": [ {"role": "user", "content": prompt} - ] + ], } - r = requests.post(f"{OLLAMA_URL}/api/chat", json=payload, timeout=60) - r.raise_for_status() - data = r.json() - content = data["message"]["content"] - # debug - print("Model raw content:", content[:200].replace("\n", " "), "...") - return json.loads(content) + last_exc = None + for attempt in range(1, OLLAMA_MAX_RETRIES + 1): + logger.info( + f"Calling model {MODEL_NAME} at {OLLAMA_URL}/api/chat " + f"(attempt {attempt}/{OLLAMA_MAX_RETRIES})" + ) + try: + r = requests.post( + f"{OLLAMA_URL}/api/chat", + json=payload, + timeout=OLLAMA_TIMEOUT, + ) + r.raise_for_status() + data = r.json() + content = data.get("message", {}).get("content", "") + logger.info( + f"Model returned content (first 300 chars): " + f"{content[:300].replace(chr(10), ' ')}" + ) + try: + result = json.loads(content) + except Exception as e: + logger.error(f"Error parsing JSON from model content: {e}") + logger.debug(f"Raw content was: {content}") + raise + logger.info(f"Parsed model result: {result}") + return result + + except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: + last_exc = e + logger.warning(f"Ollama request failed with {type(e).__name__}: {e}") + if attempt < OLLAMA_MAX_RETRIES: + backoff = 5 * attempt + logger.info(f"Retrying in {backoff} seconds...") + time.sleep(backoff) + except Exception as e: + # ostatní chyby nemá smysl retryovat + logger.error(f"Ollama request failed (non-retryable): {e}") + logger.debug(traceback.format_exc()) + raise + + logger.error(f"Ollama request failed after {OLLAMA_MAX_RETRIES} attempts: {last_exc}") + raise last_exc or RuntimeError("Ollama request failed") -def normalize_folder(result): +def normalize_folder(result, msg): + """ + Vrátí cílovou složku: + 1) nejdřív tvrdá pravidla (ZTJ, faktury → Úkoly), + 2) pak výsledek z modelu + threshold + whitelist. + """ + subject = msg.get("Subject", "") or "" + + # ZTJ – pokud je v subjectu klíčové slovo, vždy do INBOX.ZTJ + if HARDCODED_ZTJ_SUBJECT.search(subject): + logger.info("Hardcoded ZTJ rule matched subject, forcing folder=INBOX.ZTJ") + return "INBOX.ZTJ" + + # faktury / vyúčtování → Úkoly + for pattern, folder in HARDCODED_SUBJECT_RULES: + if pattern.search(subject): + logger.info( + f"Hardcoded subject rule matched pattern {pattern.pattern}, " + f"forcing folder={folder}" + ) + return folder + + # jinak necháme rozhodnout model folder = result.get("folder", "INBOX") - confidence = float(result.get("confidence", 0.0)) + try: + confidence = float(result.get("confidence", 0.0)) + except Exception: + confidence = 0.0 + + logger.info(f"Model suggested folder={folder}, confidence={confidence}") - # threshold – pod 0.5 necháme v INBOX if confidence < 0.5: - print(f"Low confidence ({confidence}), forcing INBOX") + logger.info(f"Low confidence ({confidence}), using INBOX as fallback") return "INBOX" - # pokud model vrátí něco mimo seznam – fallback na INBOX if folder not in ALLOWED_FOLDERS: - print(f"Folder {folder} not in allowed list, forcing INBOX") + logger.warning(f"Folder {folder} not in ALLOWED_FOLDERS, using INBOX") return "INBOX" return folder +# ---------- IMAP folder operations ---------- + def ensure_folder(imap_conn, folder): """ - Zkontroluje existenci složky pomocí LIST a případně ji vytvoří. - Nemění aktuálně zvolený mailbox (na rozdíl od SELECT/EXAMINE). + Zkontroluje existenci složky přes LIST a případně ji vytvoří + SUBSCRIBE. + Nemění aktuální mailbox. """ - # LIST "" "INBOX.Foo" + logger.debug(f"Ensuring folder exists: {folder}") typ, mailboxes = imap_conn.list('""', f'"{folder}"') - # mailboxes může být None nebo prázdné, pokud složka neexistuje if typ == "OK" and mailboxes and mailboxes[0] is not None: - # složka existuje + logger.debug(f"Folder {folder} already exists") return - print(f"Folder {folder} does not exist, creating...") + logger.info(f"Folder {folder} does not exist, creating...") typ, data = imap_conn.create(folder) if typ != "OK": - print(f"WARNING: failed to create folder {folder}: {data}") + logger.error(f"Failed to create folder {folder}: {data}") + else: + logger.info(f"Folder {folder} created successfully") + # pokusíme se složku i SUBSCRIBE-nout, aby ji klient (Roundcube) viděl + try: + st, dat = imap_conn.subscribe(folder) + if st == "OK": + logger.info(f"Folder {folder} subscribed successfully") + else: + logger.warning(f"Failed to subscribe folder {folder}: {dat}") + except Exception as e: + logger.warning(f"IMAP server does not support SUBSCRIBE or it failed: {e}") def move_message(imap_conn, msg_id, target_folder): - # před přesunem zajistíme, že složka existuje + msg_id_str = msg_id.decode(errors="ignore") + logger.info(f"Moving message {msg_id_str} -> {target_folder}") ensure_folder(imap_conn, target_folder) - # COPY z aktuálního mailboxu (INBOX) do target typ, data = imap_conn.copy(msg_id, target_folder) if typ != "OK": - print(f"Failed to copy message {msg_id} to {target_folder}: {data}") + logger.error(f"Failed to copy message {msg_id_str} to {target_folder}: {data}") return - # označíme zprávu v INBOX jako smazanou a expunge - imap_conn.store(msg_id, "+FLAGS", "\\Deleted") - imap_conn.expunge() - print(f"Moved message {msg_id.decode()} -> {target_folder}") + # neoznačujeme jako \Seen, jen mažeme ze source folderu + typ, data = imap_conn.store(msg_id, "+FLAGS", "\\Deleted") + if typ != "OK": + logger.error(f"Failed to mark message {msg_id_str} as deleted: {data}") + return + typ, data = imap_conn.expunge() + if typ != "OK": + logger.error(f"Failed to expunge after moving message {msg_id_str}: {data}") + return + + logger.info(f"Message {msg_id_str} moved to {target_folder} and expunged from INBOX") + + +# ---------- Main processing ---------- def process_once(): + logger.info("Starting one processing iteration") imap_conn = connect_imap() try: ids = get_unseen_messages(imap_conn) - print(f"Found {len(ids)} unseen messages in INBOX") - for msg_id in ids: - typ, data = imap_conn.fetch(msg_id, "(RFC822)") + msg_id_str = msg_id.decode(errors="ignore") + logger.info(f"Processing message ID {msg_id_str}") + + # BODY.PEEK[] – neznačí zprávu jako \Seen + typ, data = imap_conn.fetch(msg_id, "(BODY.PEEK[])") if typ != "OK": - print(f"Fetch failed for {msg_id}") + logger.error(f"Fetch failed for {msg_id_str}: {data}") continue raw_email = data[0][1] @@ -174,22 +374,38 @@ def process_once(): try: result = classify_email(prompt) except Exception as e: - print(f"Error calling model for {msg_id}: {e}") + logger.error(f"Error calling model for message {msg_id_str}: {e}") + logger.debug(traceback.format_exc()) continue - target_folder = normalize_folder(result) - move_message(imap_conn, msg_id, target_folder) + target_folder = normalize_folder(result, msg) + try: + move_message(imap_conn, msg_id, target_folder) + except Exception as e: + logger.error(f"Error moving message {msg_id_str} to {target_folder}: {e}") + logger.debug(traceback.format_exc()) + continue finally: - imap_conn.logout() + logger.info("Logging out from IMAP") + try: + imap_conn.logout() + except Exception as e: + logger.warning(f"Error during IMAP logout: {e}") def main(): + logger.info("mail-classifier starting up...") + log_config() + # warm-up modelu, aby první reálný request netimeoutoval + warmup_model() while True: try: process_once() except Exception as e: - print(f"Error in main loop: {e}") + logger.error(f"Error in main loop: {e}") + logger.debug(traceback.format_exc()) + logger.info(f"Sleeping for {CHECK_INTERVAL} seconds") time.sleep(CHECK_INTERVAL)