""" Core logic for Scenar Creator — Excel parsing, timetable generation, validation. Separates business logic from CGI/HTTP concerns. """ import pandas as pd from openpyxl import Workbook from openpyxl.styles import Alignment, Border, Font, PatternFill, Side from openpyxl.utils import get_column_letter from datetime import datetime from io import BytesIO import logging logger = logging.getLogger(__name__) DEFAULT_COLOR = "#ffffff" MAX_FILE_SIZE_MB = 10 REQUIRED_COLUMNS = ["Datum", "Zacatek", "Konec", "Program", "Typ", "Garant", "Poznamka"] class ScenarsError(Exception): """Base exception for Scenar Creator.""" pass class ValidationError(ScenarsError): """Raised when input validation fails.""" pass class TemplateError(ScenarsError): """Raised when Excel template is invalid.""" pass def validate_inputs(title: str, detail: str, file_size: int) -> None: """Validate user inputs for security and sanity.""" if not title or not isinstance(title, str): raise ValidationError("Title is required and must be a string") if len(title.strip()) == 0: raise ValidationError("Title cannot be empty") if len(title) > 200: raise ValidationError("Title is too long (max 200 characters)") if not detail or not isinstance(detail, str): raise ValidationError("Detail is required and must be a string") if len(detail.strip()) == 0: raise ValidationError("Detail cannot be empty") if len(detail) > 500: raise ValidationError("Detail is too long (max 500 characters)") if file_size > MAX_FILE_SIZE_MB * 1024 * 1024: raise ValidationError(f"File size exceeds {MAX_FILE_SIZE_MB} MB limit") def normalize_time(time_str: str): """Parse time string in formats %H:%M or %H:%M:%S.""" for fmt in ('%H:%M', '%H:%M:%S'): try: return datetime.strptime(time_str, fmt).time() except ValueError: continue return None def validate_excel_template(df: pd.DataFrame) -> None: """Validate that Excel has required columns.""" missing_cols = set(REQUIRED_COLUMNS) - set(df.columns) if missing_cols: raise TemplateError( f"Excel template missing required columns: {', '.join(missing_cols)}. " f"Expected: {', '.join(REQUIRED_COLUMNS)}" ) def read_excel(file_content: bytes, show_debug: bool = False) -> tuple: """ Parse Excel file and return (valid_data, error_rows). Handles different column naming conventions: - Old format: Datum, Zacatek, Konec, Program, Typ, Garant, Poznamka - New template: Datum, Zacatek bloku, Konec bloku, Nazev bloku, Typ bloku, Garant, Poznamka Returns: tuple: (pandas.DataFrame with valid rows, list of dicts with error details) """ try: excel_data = pd.read_excel(BytesIO(file_content), skiprows=0) except Exception as e: raise TemplateError(f"Failed to read Excel file: {str(e)}") # Map column names from various possible names to our standard names column_mapping = { 'Zacatek bloku': 'Zacatek', 'Konec bloku': 'Konec', 'Nazev bloku': 'Program', 'Typ bloku': 'Typ', } excel_data = excel_data.rename(columns=column_mapping) # Validate template validate_excel_template(excel_data) if show_debug: logger.debug(f"Raw data:\n{excel_data.head()}") error_rows = [] valid_data = [] for index, row in excel_data.iterrows(): try: datum = pd.to_datetime(row["Datum"], errors='coerce').date() zacatek = normalize_time(str(row["Zacatek"])) konec = normalize_time(str(row["Konec"])) if pd.isna(datum) or zacatek is None or konec is None: raise ValueError("Invalid date or time format") valid_data.append({ "index": index, "Datum": datum, "Zacatek": zacatek, "Konec": konec, "Program": row["Program"], "Typ": row["Typ"], "Garant": row["Garant"], "Poznamka": row["Poznamka"], "row_data": row }) except Exception as e: error_rows.append({"index": index, "row": row, "error": str(e)}) valid_data = pd.DataFrame(valid_data) # Early return if no valid rows if valid_data.empty: logger.warning("No valid rows after parsing") return valid_data.drop(columns='index', errors='ignore'), error_rows if show_debug: logger.debug(f"Cleaned data:\n{valid_data.head()}") logger.debug(f"Error rows: {error_rows}") # Detect overlaps overlap_errors = [] for date, group in valid_data.groupby('Datum'): sorted_group = group.sort_values(by='Zacatek') previous_end_time = None for _, r in sorted_group.iterrows(): if previous_end_time and r['Zacatek'] < previous_end_time: overlap_errors.append({ "index": r["index"], "Datum": r["Datum"], "Zacatek": r["Zacatek"], "Konec": r["Konec"], "Program": r["Program"], "Typ": r["Typ"], "Garant": r["Garant"], "Poznamka": r["Poznamka"], "Error": f"Overlapping time block with previous block ending at {previous_end_time}", "row_data": r["row_data"] }) previous_end_time = r['Konec'] if overlap_errors: if show_debug: logger.debug(f"Overlap errors: {overlap_errors}") valid_data = valid_data[~valid_data.index.isin([e['index'] for e in overlap_errors])] error_rows.extend(overlap_errors) return valid_data.drop(columns='index'), error_rows def get_program_types(form_data: dict) -> tuple: """ Extract program types from form data. Form fields: type_code_{i}, desc_{i}, color_{i} Returns: tuple: (program_descriptions dict, program_colors dict) """ program_descriptions = {} program_colors = {} def get_value(data, key, default=''): # Support both dict-like and cgi.FieldStorage objects if hasattr(data, 'getvalue'): return data.getvalue(key, default) return data.get(key, default) for key in list(form_data.keys()): if key.startswith('type_code_'): index = key.split('_')[-1] type_code = (get_value(form_data, f'type_code_{index}', '') or '').strip() description = (get_value(form_data, f'desc_{index}', '') or '').strip() raw_color = (get_value(form_data, f'color_{index}', DEFAULT_COLOR) or DEFAULT_COLOR) if not type_code: continue color_hex = 'FF' + str(raw_color).lstrip('#') program_descriptions[type_code] = description program_colors[type_code] = color_hex return program_descriptions, program_colors def calculate_row_height(cell_value, column_width): """Calculate row height based on content.""" if not cell_value: return 15 max_line_length = column_width * 1.2 lines = str(cell_value).split('\n') line_count = 0 for line in lines: line_count += len(line) // max_line_length + 1 return line_count * 15 def calculate_column_width(text): """Calculate column width based on text length.""" max_length = max(len(line) for line in str(text).split('\n')) return max_length * 1.2 def create_timetable(data: pd.DataFrame, title: str, detail: str, program_descriptions: dict, program_colors: dict) -> Workbook: """ Create an OpenPyXL timetable workbook. Args: data: DataFrame with validated schedule data title: Event title detail: Event detail/description program_descriptions: {type: description} program_colors: {type: color_hex} Returns: openpyxl.Workbook Raises: ScenarsError: if data is invalid or types are missing """ if data.empty: raise ScenarsError("Data is empty after validation") missing_types = [typ for typ in data["Typ"].unique() if typ not in program_colors] if missing_types: raise ScenarsError( f"Missing type definitions: {', '.join(missing_types)}. " "Please define all program types." ) wb = Workbook() ws = wb.active thick_border = Border(left=Side(style='thick', color='000000'), right=Side(style='thick', color='000000'), top=Side(style='thick', color='000000'), bottom=Side(style='thick', color='000000')) # Title and detail ws['A1'] = title ws['A1'].alignment = Alignment(horizontal="center", vertical="center") ws['A1'].font = Font(size=24, bold=True) ws['A1'].border = thick_border ws['A2'] = detail ws['A2'].alignment = Alignment(horizontal="center", vertical="center") ws['A2'].font = Font(size=16, italic=True) ws['A2'].border = thick_border if ws.column_dimensions[get_column_letter(1)].width is None: ws.column_dimensions[get_column_letter(1)].width = 40 title_row_height = calculate_row_height(title, ws.column_dimensions[get_column_letter(1)].width) detail_row_height = calculate_row_height(detail, ws.column_dimensions[get_column_letter(1)].width) ws.row_dimensions[1].height = title_row_height ws.row_dimensions[2].height = detail_row_height data = data.sort_values(by=["Datum", "Zacatek"]) start_times = data["Zacatek"] end_times = data["Konec"] if start_times.isnull().any() or end_times.isnull().any(): raise ScenarsError("Data contains invalid time values") try: min_time = min(start_times) max_time = max(end_times) except ValueError as e: raise ScenarsError(f"Error determining time range: {e}") time_slots = pd.date_range( datetime.combine(datetime.today(), min_time), datetime.combine(datetime.today(), max_time), freq='15min' ).time total_columns = len(time_slots) + 1 ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=total_columns) ws.merge_cells(start_row=2, start_column=1, end_row=2, end_column=total_columns) row_offset = 3 col_offset = 1 cell = ws.cell(row=row_offset, column=col_offset, value="Datum") cell.fill = PatternFill(start_color="D3D3D3", end_color="D3D3D3", fill_type="solid") cell.alignment = Alignment(horizontal="center", vertical="center") cell.font = Font(bold=True) cell.border = thick_border for i, time_slot in enumerate(time_slots, start=col_offset + 1): cell = ws.cell(row=row_offset, column=i, value=time_slot.strftime("%H:%M")) cell.fill = PatternFill(start_color="D3D3D3", end_color="D3D3D3", fill_type="solid") cell.alignment = Alignment(horizontal="center", vertical="center") cell.font = Font(bold=True) cell.border = thick_border current_row = row_offset + 1 grouped_data = data.groupby(data['Datum']) for date, group in grouped_data: day_name = date.strftime("%A") date_str = date.strftime(f"%d.%m {day_name}") cell = ws.cell(row=current_row, column=col_offset, value=date_str) cell.alignment = Alignment(horizontal="center", vertical="center") cell.fill = PatternFill(start_color="D3D3D3", end_color="D3D3D3", fill_type="solid") cell.font = Font(bold=True, size=14) cell.border = thick_border # Track which cells are already filled (for overlap detection) date_row = current_row occupied_cells = set() # (row, col) pairs already filled for _, row in group.iterrows(): start_time = row["Zacatek"] end_time = row["Konec"] try: start_index = list(time_slots).index(start_time) + col_offset + 1 end_index = list(time_slots).index(end_time) + col_offset + 1 except ValueError as e: logger.error(f"Time slot not found: {start_time} to {end_time}") continue cell_value = f"{row['Program']}" if pd.notna(row['Garant']): cell_value += f"\n{row['Garant']}" if pd.notna(row['Poznamka']): cell_value += f"\n\n{row['Poznamka']}" # Check for overlaps working_row = date_row + 1 conflict = False for col in range(start_index, end_index): if (working_row, col) in occupied_cells: conflict = True break # If conflict, find next available row if conflict: while any((working_row, col) in occupied_cells for col in range(start_index, end_index)): working_row += 1 # Mark cells as occupied for col in range(start_index, end_index): occupied_cells.add((working_row, col)) try: ws.merge_cells(start_row=working_row, start_column=start_index, end_row=working_row, end_column=end_index - 1) # Get the first cell of the merge (not the merged cell) cell = ws.cell(row=working_row, column=start_index) cell.value = cell_value except Exception as e: raise ScenarsError(f"Error creating timetable cell: {str(e)}") cell.alignment = Alignment(wrap_text=True, horizontal="center", vertical="center") lines = str(cell_value).split("\n") for idx, _ in enumerate(lines): if idx == 0: cell.font = Font(bold=True) elif idx == 1: cell.font = Font(bold=False) elif idx > 1 and pd.notna(row['Poznamka']): cell.font = Font(italic=True) cell.fill = PatternFill(start_color=program_colors[row["Typ"]], end_color=program_colors[row["Typ"]], fill_type="solid") cell.border = thick_border # Update current_row to be after all rows for this date if occupied_cells: max_row_for_date = max(r for r, c in occupied_cells) current_row = max_row_for_date + 1 else: current_row += 1 # Legend legend_row = current_row + 2 legend_max_length = 0 ws.cell(row=legend_row, column=1, value="Legenda:").font = Font(bold=True) legend_row += 1 for typ, desc in program_descriptions.items(): legend_text = f"{desc} ({typ})" legend_cell = ws.cell(row=legend_row, column=1, value=legend_text) legend_cell.fill = PatternFill(start_color=program_colors[typ], fill_type="solid") legend_max_length = max(legend_max_length, calculate_column_width(legend_text)) legend_row += 1 ws.column_dimensions[get_column_letter(1)].width = legend_max_length for col in range(2, total_columns + 1): ws.column_dimensions[get_column_letter(col)].width = 15 for row in ws.iter_rows(min_row=1, max_row=current_row - 1, min_col=1, max_col=total_columns): for cell in row: cell.alignment = Alignment(wrap_text=True, horizontal="center", vertical="center") cell.border = thick_border for row in ws.iter_rows(min_row=1, max_row=current_row - 1): max_height = 0 for cell in row: if cell.value: height = calculate_row_height(cell.value, ws.column_dimensions[get_column_letter(cell.column)].width) if height > max_height: max_height = height ws.row_dimensions[row[0].row].height = max_height return wb def parse_inline_schedule(form_data) -> pd.DataFrame: """ Parse inline schedule form data into DataFrame. Form fields: datum_{i}, zacatek_{i}, konec_{i}, program_{i}, typ_{i}, garant_{i}, poznamka_{i} Args: form_data: dict or cgi.FieldStorage with form data Returns: DataFrame with parsed schedule data Raises: ValidationError: if required fields missing or invalid """ rows = [] row_indices = set() # Helper to get value from both dict and FieldStorage def get_value(data, key, default=''): if hasattr(data, 'getvalue'): # cgi.FieldStorage return data.getvalue(key, default).strip() else: # dict return data.get(key, default).strip() # Find all row indices for key in form_data.keys(): if key.startswith('datum_'): idx = key.split('_')[-1] row_indices.add(idx) for idx in sorted(row_indices, key=int): datum_str = get_value(form_data, f'datum_{idx}', '') zacatek_str = get_value(form_data, f'zacatek_{idx}', '') konec_str = get_value(form_data, f'konec_{idx}', '') program = get_value(form_data, f'program_{idx}', '') typ = get_value(form_data, f'typ_{idx}', '') garant = get_value(form_data, f'garant_{idx}', '') poznamka = get_value(form_data, f'poznamka_{idx}', '') # Skip empty rows if not any([datum_str, zacatek_str, konec_str, program, typ]): continue # Validate required fields if not all([datum_str, zacatek_str, konec_str, program, typ]): raise ValidationError( f"Řádek {int(idx)+1}: Všechna povinná pole (Datum, Začátek, Konec, Program, Typ) musí být vyplněna" ) try: datum = pd.to_datetime(datum_str).date() except Exception: raise ValidationError(f"Řádek {int(idx)+1}: Neplatné datum") zacatek = normalize_time(zacatek_str) konec = normalize_time(konec_str) if zacatek is None or konec is None: raise ValidationError(f"Řádek {int(idx)+1}: Neplatný čas (použijte HH:MM nebo HH:MM:SS)") rows.append({ 'Datum': datum, 'Zacatek': zacatek, 'Konec': konec, 'Program': program, 'Typ': typ, 'Garant': garant if garant else None, 'Poznamka': poznamka if poznamka else None, }) if not rows: raise ValidationError("Žádné platné řádky ve formuláři") return pd.DataFrame(rows) def parse_inline_types(form_data) -> tuple: """ Parse inline type definitions from form data. Form fields: type_name_{i}, type_desc_{i}, type_color_{i} Args: form_data: dict or cgi.FieldStorage with form data Returns: tuple: (program_descriptions dict, program_colors dict) """ descriptions = {} colors = {} type_indices = set() # Helper to get value from both dict and FieldStorage def get_value(data, key, default=''): if hasattr(data, 'getvalue'): # cgi.FieldStorage return data.getvalue(key, default).strip() else: # dict return data.get(key, default).strip() # Find all type indices for key in form_data.keys(): if key.startswith('type_name_'): idx = key.split('_')[-1] type_indices.add(idx) for idx in sorted(type_indices, key=int): type_name = get_value(form_data, f'type_name_{idx}', '') type_desc = get_value(form_data, f'type_desc_{idx}', '') type_color = get_value(form_data, f'type_color_{idx}', DEFAULT_COLOR) # Skip empty types if not type_name: continue descriptions[type_name] = type_desc colors[type_name] = 'FF' + type_color.lstrip('#') return descriptions, colors