Files
mail-clasifier/main.py
2025-11-25 08:42:37 +00:00

199 lines
5.5 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
import imaplib
import email
import json
import requests
IMAP_HOST = os.environ.get("IMAP_HOST", "imap.mailu.svc")
IMAP_PORT = int(os.environ.get("IMAP_PORT", "993"))
IMAP_USER = os.environ.get("IMAP_USER")
IMAP_PASS = os.environ.get("IMAP_PASS")
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://ollama.open-webui.svc:11434")
MODEL_NAME = os.environ.get("MODEL_NAME", "mail-router")
MAX_BODY_CHARS = int(os.environ.get("MAX_BODY_CHARS", "8000"))
CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", "30")) # v sekundách
# povolené složky (bezpečnost proti blbosti modelu)
ALLOWED_FOLDERS = {
"INBOX",
"INBOX.Work",
"INBOX.Family",
"INBOX.Finance",
"INBOX.Notifications",
"INBOX.Newsletters",
"INBOX.Social",
"INBOX.Todo",
"INBOX.TrashCandidates",
}
def connect_imap():
print(f"Connecting to IMAP {IMAP_HOST}:{IMAP_PORT} as {IMAP_USER}")
m = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT)
m.login(IMAP_USER, IMAP_PASS)
return m
def get_unseen_messages(imap_conn):
# vždycky jako zdrojový mailbox zvolíme INBOX
typ, _ = imap_conn.select("INBOX")
if typ != "OK":
print("Cannot select INBOX")
return []
status, data = imap_conn.search(None, 'UNSEEN')
if status != "OK":
print("UNSEEN search failed")
return []
ids = data[0].split()
return ids
def build_prompt_from_email(msg):
headers = []
for h in ["From", "To", "Cc", "Subject", "Date"]:
headers.append(f"{h}: {msg.get(h, '')}")
headers_text = "\n".join(headers)
body_text = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
try:
body_text += part.get_payload(decode=True).decode(
part.get_content_charset() or "utf-8",
errors="ignore",
)
except Exception:
continue
else:
try:
body_text = msg.get_payload(decode=True).decode(
msg.get_content_charset() or "utf-8",
errors="ignore",
)
except Exception:
body_text = ""
body_text = body_text[:MAX_BODY_CHARS]
return f"HEADERS:\n{headers_text}\n\nBODY:\n{body_text}"
def classify_email(prompt):
payload = {
"model": MODEL_NAME,
"stream": False,
"format": "json",
"messages": [
{"role": "user", "content": prompt}
]
}
r = requests.post(f"{OLLAMA_URL}/api/chat", json=payload, timeout=60)
r.raise_for_status()
data = r.json()
content = data["message"]["content"]
# debug
print("Model raw content:", content[:200].replace("\n", " "), "...")
return json.loads(content)
def normalize_folder(result):
folder = result.get("folder", "INBOX")
confidence = float(result.get("confidence", 0.0))
# threshold pod 0.5 necháme v INBOX
if confidence < 0.5:
print(f"Low confidence ({confidence}), forcing INBOX")
return "INBOX"
# pokud model vrátí něco mimo seznam fallback na INBOX
if folder not in ALLOWED_FOLDERS:
print(f"Folder {folder} not in allowed list, forcing INBOX")
return "INBOX"
return folder
def ensure_folder(imap_conn, folder):
"""
Zkontroluje existenci složky pomocí LIST a případně ji vytvoří.
Nemění aktuálně zvolený mailbox (na rozdíl od SELECT/EXAMINE).
"""
# LIST "" "INBOX.Foo"
typ, mailboxes = imap_conn.list('""', f'"{folder}"')
# mailboxes může být None nebo prázdné, pokud složka neexistuje
if typ == "OK" and mailboxes and mailboxes[0] is not None:
# složka existuje
return
print(f"Folder {folder} does not exist, creating...")
typ, data = imap_conn.create(folder)
if typ != "OK":
print(f"WARNING: failed to create folder {folder}: {data}")
def move_message(imap_conn, msg_id, target_folder):
# před přesunem zajistíme, že složka existuje
ensure_folder(imap_conn, target_folder)
# COPY z aktuálního mailboxu (INBOX) do target
typ, data = imap_conn.copy(msg_id, target_folder)
if typ != "OK":
print(f"Failed to copy message {msg_id} to {target_folder}: {data}")
return
# označíme zprávu v INBOX jako smazanou a expunge
imap_conn.store(msg_id, "+FLAGS", "\\Deleted")
imap_conn.expunge()
print(f"Moved message {msg_id.decode()} -> {target_folder}")
def process_once():
imap_conn = connect_imap()
try:
ids = get_unseen_messages(imap_conn)
print(f"Found {len(ids)} unseen messages in INBOX")
for msg_id in ids:
typ, data = imap_conn.fetch(msg_id, "(RFC822)")
if typ != "OK":
print(f"Fetch failed for {msg_id}")
continue
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
prompt = build_prompt_from_email(msg)
try:
result = classify_email(prompt)
except Exception as e:
print(f"Error calling model for {msg_id}: {e}")
continue
target_folder = normalize_folder(result)
move_message(imap_conn, msg_id, target_folder)
finally:
imap_conn.logout()
def main():
while True:
try:
process_once()
except Exception as e:
print(f"Error in main loop: {e}")
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
main()