488 lines
16 KiB
Python
Executable File
488 lines
16 KiB
Python
Executable File
import os
|
||
import time
|
||
import imaplib
|
||
import email
|
||
import requests
|
||
import logging
|
||
import sys
|
||
import traceback
|
||
import re
|
||
|
||
# ---------- Logging setup ----------
|
||
|
||
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
|
||
|
||
logging.basicConfig(
|
||
level=getattr(logging, LOG_LEVEL, logging.INFO),
|
||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||
handlers=[logging.StreamHandler(sys.stdout)],
|
||
)
|
||
logger = logging.getLogger("mail-classifier")
|
||
|
||
# ---------- Env variables ----------
|
||
|
||
IMAP_HOST = os.environ.get("IMAP_HOST", "mailu-front.mailu.svc")
|
||
IMAP_PORT = int(os.environ.get("IMAP_PORT", "993"))
|
||
IMAP_USER = os.environ.get("IMAP_USER")
|
||
IMAP_PASS = os.environ.get("IMAP_PASS")
|
||
|
||
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://ollama-service.open-webui.svc:11434")
|
||
MODEL_NAME = os.environ.get("MODEL_NAME", "mail-router")
|
||
|
||
MAX_BODY_CHARS = int(os.environ.get("MAX_BODY_CHARS", "2000"))
|
||
CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", "300")) # v sekundách
|
||
|
||
OLLAMA_TIMEOUT = int(os.environ.get("OLLAMA_TIMEOUT", "120")) # read timeout v sekundách
|
||
OLLAMA_MAX_RETRIES = int(os.environ.get("OLLAMA_MAX_RETRIES", "3"))
|
||
|
||
# prahy jistoty
|
||
MIN_CONFIDENCE_DEFAULT = float(os.environ.get("MIN_CONFIDENCE_DEFAULT", "0.4"))
|
||
MIN_CONFIDENCE_RELAXED = float(os.environ.get("MIN_CONFIDENCE_RELAXED", "0.3"))
|
||
|
||
# povolené složky (whitelist) – MUSÍ odpovídat tomu, co je v Modelfile
|
||
ALLOWED_FOLDERS = {
|
||
"INBOX",
|
||
"INBOX.Pracovni",
|
||
"INBOX.Osobni",
|
||
"INBOX.Finance",
|
||
"INBOX.Notifikace",
|
||
"INBOX.Zpravodaje",
|
||
"INBOX.SocialniSite",
|
||
"INBOX.Ukoly",
|
||
"INBOX.Nepodstatne",
|
||
"INBOX.ZTJ",
|
||
}
|
||
|
||
# tvrdá pravidla podle subjectu – Úkoly (faktury, vyúčtování)
|
||
HARDCODED_SUBJECT_RULES = [
|
||
(re.compile(r"\bfaktura\b", re.IGNORECASE), "INBOX.Ukoly"),
|
||
(re.compile(r"\bvyúčtován[íi]\b|\bvyuctovan[íi]\b", re.IGNORECASE), "INBOX.Ukoly"),
|
||
(re.compile(r"\bdaňov[ýy]\s+doklad\b", re.IGNORECASE), "INBOX.Ukoly"),
|
||
(re.compile(r"\binvoice\b", re.IGNORECASE), "INBOX.Ukoly"),
|
||
]
|
||
|
||
# ZTJ – klíčová slova v subjectu
|
||
HARDCODED_ZTJ_SUBJECT = re.compile(
|
||
r"\b(Život trochu jinak|zivot trochu jinak|ZTJ)\b",
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
# marketing / newsletter – hrubý pattern
|
||
HARDCODED_MARKETING_SUBJECT = re.compile(
|
||
r"\b(black friday|sleva|slevy|akce|speciální nabídka|newsletter|zpravodaj|unsubscribe)\b",
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
|
||
def log_config():
|
||
"""Vypíše aktuální konfiguraci (bez hesla)."""
|
||
logger.info("=== mail-classifier configuration ===")
|
||
logger.info(f"IMAP_HOST = {IMAP_HOST}")
|
||
logger.info(f"IMAP_PORT = {IMAP_PORT}")
|
||
logger.info(f"IMAP_USER = {IMAP_USER}")
|
||
logger.info("IMAP_PASS = **** (hidden)")
|
||
logger.info(f"OLLAMA_URL = {OLLAMA_URL}")
|
||
logger.info(f"MODEL_NAME = {MODEL_NAME}")
|
||
logger.info(f"MAX_BODY_CHARS = {MAX_BODY_CHARS}")
|
||
logger.info(f"CHECK_INTERVAL = {CHECK_INTERVAL} s")
|
||
logger.info(f"OLLAMA_TIMEOUT = {OLLAMA_TIMEOUT} s")
|
||
logger.info(f"OLLAMA_MAX_RETRIES = {OLLAMA_MAX_RETRIES}")
|
||
logger.info(f"MIN_CONF_DEFAULT = {MIN_CONFIDENCE_DEFAULT}")
|
||
logger.info(f"MIN_CONF_RELAXED = {MIN_CONFIDENCE_RELAXED}")
|
||
logger.info(f"LOG_LEVEL = {LOG_LEVEL}")
|
||
logger.info(f"ALLOWED_FOLDERS = {sorted(ALLOWED_FOLDERS)}")
|
||
logger.info("====================================")
|
||
|
||
|
||
# ---------- IMAP helpers ----------
|
||
|
||
def connect_imap():
|
||
logger.info(f"Connecting to IMAP {IMAP_HOST}:{IMAP_PORT} as {IMAP_USER}")
|
||
if not IMAP_USER or not IMAP_PASS:
|
||
logger.error("IMAP_USER or IMAP_PASS is not set! Exiting.")
|
||
raise RuntimeError("Missing IMAP credentials")
|
||
|
||
m = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT)
|
||
typ, data = m.login(IMAP_USER, IMAP_PASS)
|
||
if typ != "OK":
|
||
logger.error(f"IMAP login failed: {typ} {data}")
|
||
raise RuntimeError("IMAP login failed")
|
||
logger.info("IMAP login successful")
|
||
return m
|
||
|
||
|
||
def get_unseen_messages(imap_conn):
|
||
typ, _ = imap_conn.select("INBOX")
|
||
if typ != "OK":
|
||
logger.error(f"Cannot select INBOX, got: {typ}")
|
||
return []
|
||
|
||
status, data = imap_conn.search(None, "UNSEEN")
|
||
if status != "OK":
|
||
logger.error(f"UNSEEN search failed: {status}")
|
||
return []
|
||
|
||
ids = data[0].split()
|
||
logger.info(f"Found {len(ids)} unseen messages in INBOX")
|
||
if ids:
|
||
logger.debug(f"Unseen message IDs: {[i.decode(errors='ignore') for i in ids]}")
|
||
return ids
|
||
|
||
|
||
# ---------- Email to prompt ----------
|
||
|
||
def build_prompt_from_email(msg):
|
||
headers = []
|
||
for h in ["From", "To", "Cc", "Subject", "Date"]:
|
||
value = msg.get(h, "")
|
||
headers.append(f"{h}: {value}")
|
||
headers_text = "\n".join(headers)
|
||
|
||
body_text = ""
|
||
if msg.is_multipart():
|
||
for part in msg.walk():
|
||
content_type = part.get_content_type()
|
||
disp = str(part.get("Content-Disposition") or "")
|
||
if content_type == "text/plain" and "attachment" not in disp.lower():
|
||
try:
|
||
part_bytes = part.get_payload(decode=True)
|
||
if part_bytes is None:
|
||
continue
|
||
body_text += part_bytes.decode(
|
||
part.get_content_charset() or "utf-8",
|
||
errors="ignore",
|
||
)
|
||
except Exception as e:
|
||
logger.debug(f"Error decoding multipart part: {e}")
|
||
continue
|
||
else:
|
||
try:
|
||
part_bytes = msg.get_payload(decode=True)
|
||
if part_bytes is not None:
|
||
body_text = part_bytes.decode(
|
||
msg.get_content_charset() or "utf-8",
|
||
errors="ignore",
|
||
)
|
||
except Exception as e:
|
||
logger.debug(f"Error decoding singlepart message: {e}")
|
||
body_text = ""
|
||
|
||
if len(body_text) > MAX_BODY_CHARS:
|
||
logger.debug(f"Body truncated from {len(body_text)} to {MAX_BODY_CHARS} chars")
|
||
body_text = body_text[:MAX_BODY_CHARS]
|
||
|
||
prompt = f"HEADERS:\n{headers_text}\n\nBODY:\n{body_text}"
|
||
logger.debug(f"Built prompt (first 500 chars): {prompt[:500].replace(chr(10), ' ')}")
|
||
return prompt
|
||
|
||
|
||
# ---------- LLM warm-up ----------
|
||
|
||
def warmup_model():
|
||
"""
|
||
Jednoduchý warm-up dotaz, aby si Ollama natáhla model před prvním reálným mailem.
|
||
"""
|
||
logger.info("Warming up Ollama model...")
|
||
payload = {
|
||
"model": MODEL_NAME,
|
||
"stream": False,
|
||
"messages": [
|
||
{
|
||
"role": "user",
|
||
"content": (
|
||
"HEADERS:\nFrom: warmup@example.com\nSubject: warmup\n\n"
|
||
"BODY:\nThis is a warmup request. "
|
||
"Odpověz přesně ve formátu:\n"
|
||
"FOLDER: INBOX\nCONFIDENCE: 0.0\nREASON: warmup\nRULES:\n- warmup"
|
||
),
|
||
}
|
||
],
|
||
}
|
||
try:
|
||
r = requests.post(f"{OLLAMA_URL}/api/chat", json=payload, timeout=OLLAMA_TIMEOUT)
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
content = data.get("message", {}).get("content", "")
|
||
logger.info(f"Warm-up response (first 200 chars): {content[:200].replace(chr(10), ' ')}")
|
||
except Exception as e:
|
||
logger.warning(f"Warm-up failed (will continue anyway): {e}")
|
||
|
||
|
||
# ---------- Parsing model output ----------
|
||
|
||
FOLDER_RE = re.compile(r"^FOLDER:\s*(.+)$", re.MULTILINE)
|
||
CONF_RE = re.compile(r"^CONFIDENCE:\s*([0-9.]+)", re.MULTILINE)
|
||
REASON_RE = re.compile(r"^REASON:\s*(.+)$", re.MULTILINE)
|
||
RULES_RE = re.compile(r"^RULES:\s*(.*)$", re.MULTILINE)
|
||
|
||
|
||
def parse_model_output(content: str) -> dict:
|
||
"""
|
||
Očekávaný formát:
|
||
|
||
FOLDER: INBOX.Pracovni
|
||
CONFIDENCE: 0.8
|
||
REASON: ...
|
||
RULES:
|
||
- ...
|
||
- ...
|
||
|
||
Vrací dict {folder, confidence, reason, rules} nebo vyhodí výjimku.
|
||
"""
|
||
folder_match = FOLDER_RE.search(content)
|
||
conf_match = CONF_RE.search(content)
|
||
reason_match = REASON_RE.search(content)
|
||
rules_match = RULES_RE.search(content)
|
||
|
||
if not folder_match or not conf_match or not reason_match or not rules_match:
|
||
raise ValueError("Missing one of FOLDER/CONFIDENCE/REASON/RULES in model output")
|
||
|
||
folder = folder_match.group(1).strip()
|
||
try:
|
||
confidence = float(conf_match.group(1))
|
||
except Exception:
|
||
confidence = 0.0
|
||
|
||
reason = reason_match.group(1).strip()
|
||
|
||
# rules: vezmeme vše od řádku po "RULES:" dál
|
||
rules_start = rules_match.end()
|
||
rules_block = content[rules_start:].strip()
|
||
rules = []
|
||
for line in rules_block.splitlines():
|
||
line = line.strip()
|
||
if line.startswith("- "):
|
||
rules.append(line[2:].strip())
|
||
|
||
return {
|
||
"folder": folder,
|
||
"confidence": confidence,
|
||
"reason": reason,
|
||
"rules": rules,
|
||
}
|
||
|
||
|
||
# ---------- LLM call ----------
|
||
|
||
def classify_email(prompt):
|
||
payload = {
|
||
"model": MODEL_NAME,
|
||
"stream": False,
|
||
"messages": [
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
}
|
||
|
||
last_exc = None
|
||
for attempt in range(1, OLLAMA_MAX_RETRIES + 1):
|
||
logger.info(
|
||
f"Calling model {MODEL_NAME} at {OLLAMA_URL}/api/chat "
|
||
f"(attempt {attempt}/{OLLAMA_MAX_RETRIES})"
|
||
)
|
||
try:
|
||
r = requests.post(
|
||
f"{OLLAMA_URL}/api/chat",
|
||
json=payload,
|
||
timeout=OLLAMA_TIMEOUT,
|
||
)
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
content = data.get("message", {}).get("content", "")
|
||
logger.info(
|
||
f"Model returned content (first 300 chars): "
|
||
f"{content[:300].replace(chr(10), ' ')}"
|
||
)
|
||
|
||
result = parse_model_output(content)
|
||
logger.info(f"Parsed model result: {result}")
|
||
return result
|
||
|
||
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
|
||
last_exc = e
|
||
logger.warning(f"Ollama request failed with {type(e).__name__}: {e}")
|
||
if attempt < OLLAMA_MAX_RETRIES:
|
||
backoff = 5 * attempt
|
||
logger.info(f"Retrying in {backoff} seconds...")
|
||
time.sleep(backoff)
|
||
except Exception as e:
|
||
# ostatní chyby nemá smysl retryovat (rozbitý output apod.)
|
||
last_exc = e
|
||
logger.error(f"Ollama request failed (non-retryable): {e}")
|
||
logger.debug(traceback.format_exc())
|
||
raise
|
||
|
||
logger.error(f"Ollama request failed after {OLLAMA_MAX_RETRIES} attempts: {last_exc}")
|
||
raise last_exc or RuntimeError("Ollama request failed")
|
||
|
||
|
||
def normalize_folder(result, msg):
|
||
"""
|
||
Vrátí cílovou složku:
|
||
1) nejdřív tvrdá pravidla (ZTJ, faktury → Úkoly, marketing → Zpravodaje),
|
||
2) pak výsledek z modelu + threshold + whitelist.
|
||
"""
|
||
subject = msg.get("Subject", "") or ""
|
||
|
||
# ZTJ – pokud je v subjectu klíčové slovo, vždy do INBOX.ZTJ
|
||
if HARDCODED_ZTJ_SUBJECT.search(subject):
|
||
logger.info("Hardcoded ZTJ rule matched subject, forcing folder=INBOX.ZTJ")
|
||
return "INBOX.ZTJ"
|
||
|
||
# marketing / newsletter → Zpravodaje
|
||
if HARDCODED_MARKETING_SUBJECT.search(subject):
|
||
logger.info("Hardcoded marketing rule matched subject, forcing folder=INBOX.Zpravodaje")
|
||
return "INBOX.Zpravodaje"
|
||
|
||
# faktury / vyúčtování → Úkoly
|
||
for pattern, folder in HARDCODED_SUBJECT_RULES:
|
||
if pattern.search(subject):
|
||
logger.info(
|
||
f"Hardcoded subject rule matched pattern {pattern.pattern}, "
|
||
f"forcing folder={folder}"
|
||
)
|
||
return folder
|
||
|
||
# jinak necháme rozhodnout model
|
||
folder = result.get("folder", "INBOX")
|
||
confidence = float(result.get("confidence", 0.0) or 0.0)
|
||
|
||
logger.info(f"Model suggested folder={folder}, confidence={confidence}")
|
||
|
||
# neznámá složka → INBOX
|
||
if folder not in ALLOWED_FOLDERS:
|
||
logger.warning(f"Folder {folder} not in ALLOWED_FOLDERS, using INBOX")
|
||
return "INBOX"
|
||
|
||
# pro spam/newsletter buďme benevolentnější
|
||
if folder in ("INBOX.Nepodstatne", "INBOX.Zpravodaje"):
|
||
threshold = MIN_CONFIDENCE_RELAXED
|
||
else:
|
||
threshold = MIN_CONFIDENCE_DEFAULT
|
||
|
||
if confidence < threshold:
|
||
logger.info(
|
||
f"Low confidence ({confidence}) for folder {folder} "
|
||
f"(threshold {threshold}), using INBOX as fallback"
|
||
)
|
||
return "INBOX"
|
||
|
||
return folder
|
||
|
||
|
||
# ---------- IMAP folder operations ----------
|
||
|
||
def ensure_folder(imap_conn, folder):
|
||
"""
|
||
Zkontroluje existenci složky přes LIST a případně ji vytvoří + SUBSCRIBE.
|
||
Nemění aktuální mailbox.
|
||
"""
|
||
logger.debug(f"Ensuring folder exists: {folder}")
|
||
typ, mailboxes = imap_conn.list('""', f'"{folder}"')
|
||
if typ == "OK" and mailboxes and mailboxes[0] is not None:
|
||
logger.debug(f"Folder {folder} already exists")
|
||
return
|
||
|
||
logger.info(f"Folder {folder} does not exist, creating...")
|
||
typ, data = imap_conn.create(folder)
|
||
if typ != "OK":
|
||
logger.error(f"Failed to create folder {folder}: {data}")
|
||
else:
|
||
logger.info(f"Folder {folder} created successfully")
|
||
# pokusíme se složku i SUBSCRIBE-nout, aby ji klient (Roundcube) viděl
|
||
try:
|
||
st, dat = imap_conn.subscribe(folder)
|
||
if st == "OK":
|
||
logger.info(f"Folder {folder} subscribed successfully")
|
||
else:
|
||
logger.warning(f"Failed to subscribe folder {folder}: {dat}")
|
||
except Exception as e:
|
||
logger.warning(f"IMAP server does not support SUBSCRIBE or it failed: {e}")
|
||
|
||
|
||
def move_message(imap_conn, msg_id, target_folder):
|
||
msg_id_str = msg_id.decode(errors="ignore")
|
||
logger.info(f"Moving message {msg_id_str} -> {target_folder}")
|
||
ensure_folder(imap_conn, target_folder)
|
||
|
||
typ, data = imap_conn.copy(msg_id, target_folder)
|
||
if typ != "OK":
|
||
logger.error(f"Failed to copy message {msg_id_str} to {target_folder}: {data}")
|
||
return
|
||
|
||
# neoznačujeme jako \Seen, jen mažeme ze source folderu
|
||
typ, data = imap_conn.store(msg_id, "+FLAGS", "\\Deleted")
|
||
if typ != "OK":
|
||
logger.error(f"Failed to mark message {msg_id_str} as deleted: {data}")
|
||
return
|
||
|
||
typ, data = imap_conn.expunge()
|
||
if typ != "OK":
|
||
logger.error(f"Failed to expunge after moving message {msg_id_str}: {data}")
|
||
return
|
||
|
||
logger.info(f"Message {msg_id_str} moved to {target_folder} and expunged from INBOX")
|
||
|
||
|
||
# ---------- Main processing ----------
|
||
|
||
def process_once():
|
||
logger.info("Starting one processing iteration")
|
||
imap_conn = connect_imap()
|
||
try:
|
||
ids = get_unseen_messages(imap_conn)
|
||
for msg_id in ids:
|
||
msg_id_str = msg_id.decode(errors="ignore")
|
||
logger.info(f"Processing message ID {msg_id_str}")
|
||
|
||
# BODY.PEEK[] – neznačí zprávu jako \Seen
|
||
typ, data = imap_conn.fetch(msg_id, "(BODY.PEEK[])")
|
||
if typ != "OK":
|
||
logger.error(f"Fetch failed for {msg_id_str}: {data}")
|
||
continue
|
||
|
||
raw_email = data[0][1]
|
||
msg = email.message_from_bytes(raw_email)
|
||
|
||
prompt = build_prompt_from_email(msg)
|
||
try:
|
||
result = classify_email(prompt)
|
||
except Exception as e:
|
||
logger.error(f"Error calling model for message {msg_id_str}: {e}")
|
||
logger.debug(traceback.format_exc())
|
||
# necháme zprávu v INBOXu, zpracuje se později
|
||
continue
|
||
|
||
target_folder = normalize_folder(result, msg)
|
||
try:
|
||
move_message(imap_conn, msg_id, target_folder)
|
||
except Exception as e:
|
||
logger.error(f"Error moving message {msg_id_str} to {target_folder}: {e}")
|
||
logger.debug(traceback.format_exc())
|
||
continue
|
||
|
||
finally:
|
||
logger.info("Logging out from IMAP")
|
||
try:
|
||
imap_conn.logout()
|
||
except Exception as e:
|
||
logger.warning(f"Error during IMAP logout: {e}")
|
||
|
||
|
||
def main():
|
||
logger.info("mail-classifier starting up...")
|
||
log_config()
|
||
warmup_model()
|
||
while True:
|
||
try:
|
||
process_once()
|
||
except Exception as e:
|
||
logger.error(f"Error in main loop: {e}")
|
||
logger.debug(traceback.format_exc())
|
||
logger.info(f"Sleeping for {CHECK_INTERVAL} seconds")
|
||
time.sleep(CHECK_INTERVAL)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|
||
|