""" Excel reading and form parsing logic for Scenar Creator. Extracted from scenar/core.py — read_excel, get_program_types, parse_inline_schedule, parse_inline_types. """ import pandas as pd from io import BytesIO import logging from .validator import ( validate_excel_template, normalize_time, ValidationError, TemplateError, DEFAULT_COLOR, ) logger = logging.getLogger(__name__) def read_excel(file_content: bytes, show_debug: bool = False) -> tuple: """ Parse Excel file and return (valid_data, error_rows). Handles different column naming conventions: - Old format: Datum, Zacatek, Konec, Program, Typ, Garant, Poznamka - New template: Datum, Zacatek bloku, Konec bloku, Nazev bloku, Typ bloku, Garant, Poznamka Returns: tuple: (pandas.DataFrame with valid rows, list of dicts with error details) """ try: excel_data = pd.read_excel(BytesIO(file_content), skiprows=0) except Exception as e: raise TemplateError(f"Failed to read Excel file: {str(e)}") # Map column names from various possible names to our standard names column_mapping = { 'Zacatek bloku': 'Zacatek', 'Konec bloku': 'Konec', 'Nazev bloku': 'Program', 'Typ bloku': 'Typ', } excel_data = excel_data.rename(columns=column_mapping) # Validate template validate_excel_template(excel_data) if show_debug: logger.debug(f"Raw data:\n{excel_data.head()}") error_rows = [] valid_data = [] for index, row in excel_data.iterrows(): try: datum = pd.to_datetime(row["Datum"], errors='coerce').date() zacatek = normalize_time(str(row["Zacatek"])) konec = normalize_time(str(row["Konec"])) if pd.isna(datum) or zacatek is None or konec is None: raise ValueError("Invalid date or time format") valid_data.append({ "index": index, "Datum": datum, "Zacatek": zacatek, "Konec": konec, "Program": row["Program"], "Typ": row["Typ"], "Garant": row["Garant"], "Poznamka": row["Poznamka"], "row_data": row }) except Exception as e: error_rows.append({"index": index, "row": row, "error": str(e)}) valid_data = pd.DataFrame(valid_data) # Early return if no valid rows if valid_data.empty: logger.warning("No valid rows after parsing") return valid_data.drop(columns='index', errors='ignore'), error_rows if show_debug: logger.debug(f"Cleaned data:\n{valid_data.head()}") logger.debug(f"Error rows: {error_rows}") # Detect overlaps overlap_errors = [] for date, group in valid_data.groupby('Datum'): sorted_group = group.sort_values(by='Zacatek') previous_end_time = None for _, r in sorted_group.iterrows(): if previous_end_time and r['Zacatek'] < previous_end_time: overlap_errors.append({ "index": r["index"], "Datum": r["Datum"], "Zacatek": r["Zacatek"], "Konec": r["Konec"], "Program": r["Program"], "Typ": r["Typ"], "Garant": r["Garant"], "Poznamka": r["Poznamka"], "Error": f"Overlapping time block with previous block ending at {previous_end_time}", "row_data": r["row_data"] }) previous_end_time = r['Konec'] if overlap_errors: if show_debug: logger.debug(f"Overlap errors: {overlap_errors}") valid_data = valid_data[~valid_data.index.isin([e['index'] for e in overlap_errors])] error_rows.extend(overlap_errors) return valid_data.drop(columns='index'), error_rows def get_program_types(form_data: dict) -> tuple: """ Extract program types from form data. Form fields: type_code_{i}, desc_{i}, color_{i} Returns: tuple: (program_descriptions dict, program_colors dict) """ program_descriptions = {} program_colors = {} def get_value(data, key, default=''): # Support both dict-like and cgi.FieldStorage objects if hasattr(data, 'getvalue'): return data.getvalue(key, default) return data.get(key, default) for key in list(form_data.keys()): if key.startswith('type_code_'): index = key.split('_')[-1] type_code = (get_value(form_data, f'type_code_{index}', '') or '').strip() description = (get_value(form_data, f'desc_{index}', '') or '').strip() raw_color = (get_value(form_data, f'color_{index}', DEFAULT_COLOR) or DEFAULT_COLOR) if not type_code: continue color_hex = 'FF' + str(raw_color).lstrip('#') program_descriptions[type_code] = description program_colors[type_code] = color_hex return program_descriptions, program_colors def parse_inline_schedule(form_data) -> pd.DataFrame: """ Parse inline schedule form data into DataFrame. Form fields: datum_{i}, zacatek_{i}, konec_{i}, program_{i}, typ_{i}, garant_{i}, poznamka_{i} Args: form_data: dict or cgi.FieldStorage with form data Returns: DataFrame with parsed schedule data Raises: ValidationError: if required fields missing or invalid """ rows = [] row_indices = set() # Helper to get value from both dict and FieldStorage def get_value(data, key, default=''): if hasattr(data, 'getvalue'): # cgi.FieldStorage return data.getvalue(key, default).strip() else: # dict return data.get(key, default).strip() # Find all row indices for key in form_data.keys(): if key.startswith('datum_'): idx = key.split('_')[-1] row_indices.add(idx) for idx in sorted(row_indices, key=int): datum_str = get_value(form_data, f'datum_{idx}', '') zacatek_str = get_value(form_data, f'zacatek_{idx}', '') konec_str = get_value(form_data, f'konec_{idx}', '') program = get_value(form_data, f'program_{idx}', '') typ = get_value(form_data, f'typ_{idx}', '') garant = get_value(form_data, f'garant_{idx}', '') poznamka = get_value(form_data, f'poznamka_{idx}', '') # Skip empty rows if not any([datum_str, zacatek_str, konec_str, program, typ]): continue # Validate required fields if not all([datum_str, zacatek_str, konec_str, program, typ]): raise ValidationError( f"Řádek {int(idx)+1}: Všechna povinná pole (Datum, Začátek, Konec, Program, Typ) musí být vyplněna" ) try: datum = pd.to_datetime(datum_str).date() except Exception: raise ValidationError(f"Řádek {int(idx)+1}: Neplatné datum") zacatek = normalize_time(zacatek_str) konec = normalize_time(konec_str) if zacatek is None or konec is None: raise ValidationError(f"Řádek {int(idx)+1}: Neplatný čas (použijte HH:MM nebo HH:MM:SS)") rows.append({ 'Datum': datum, 'Zacatek': zacatek, 'Konec': konec, 'Program': program, 'Typ': typ, 'Garant': garant if garant else None, 'Poznamka': poznamka if poznamka else None, }) if not rows: raise ValidationError("Žádné platné řádky ve formuláři") return pd.DataFrame(rows) def parse_inline_types(form_data) -> tuple: """ Parse inline type definitions from form data. Form fields: type_name_{i}, type_desc_{i}, type_color_{i} Args: form_data: dict or cgi.FieldStorage with form data Returns: tuple: (program_descriptions dict, program_colors dict) """ descriptions = {} colors = {} type_indices = set() # Helper to get value from both dict and FieldStorage def get_value(data, key, default=''): if hasattr(data, 'getvalue'): # cgi.FieldStorage return data.getvalue(key, default).strip() else: # dict return data.get(key, default).strip() # Find all type indices for key in form_data.keys(): if key.startswith('type_name_'): idx = key.split('_')[-1] type_indices.add(idx) for idx in sorted(type_indices, key=int): type_name = get_value(form_data, f'type_name_{idx}', '') type_desc = get_value(form_data, f'type_desc_{idx}', '') type_color = get_value(form_data, f'type_color_{idx}', DEFAULT_COLOR) # Skip empty types if not type_name: continue descriptions[type_name] = type_desc colors[type_name] = 'FF' + type_color.lstrip('#') return descriptions, colors