Some checks failed
Build & Push Docker / build (push) Has been cancelled
- Nový modul scenar/core.py (491 řádků čisté logiky)
- Refactored cgi-bin/scenar.py (450 řádků CGI wrapper)
- Inline editor s JavaScript row managementem
- Custom exceptions (ScenarsError, ValidationError, TemplateError)
- Kompletní test coverage (10 testů, všechny ✅)
- Fixed Dockerfile (COPY scenar/, requirements.txt)
- Fixed requirements.txt (openpyxl==3.1.5)
- Fixed pytest.ini (pythonpath = .)
- Nové testy: test_http_inline.py, test_inline_builder.py
- HTTP testy označeny jako @pytest.mark.integration
- Build script: scripts/build_image.sh
- Dokumentace: COMPLETION.md
557 lines
20 KiB
Python
557 lines
20 KiB
Python
"""
|
|
Core logic for Scenar Creator — Excel parsing, timetable generation, validation.
|
|
Separates business logic from CGI/HTTP concerns.
|
|
"""
|
|
|
|
import pandas as pd
|
|
from openpyxl import Workbook
|
|
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
|
|
from openpyxl.utils import get_column_letter
|
|
from datetime import datetime
|
|
from io import BytesIO
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DEFAULT_COLOR = "#ffffff"
|
|
MAX_FILE_SIZE_MB = 10
|
|
REQUIRED_COLUMNS = ["Datum", "Zacatek", "Konec", "Program", "Typ", "Garant", "Poznamka"]
|
|
|
|
|
|
class ScenarsError(Exception):
|
|
"""Base exception for Scenar Creator."""
|
|
pass
|
|
|
|
|
|
class ValidationError(ScenarsError):
|
|
"""Raised when input validation fails."""
|
|
pass
|
|
|
|
|
|
class TemplateError(ScenarsError):
|
|
"""Raised when Excel template is invalid."""
|
|
pass
|
|
|
|
|
|
def validate_inputs(title: str, detail: str, file_size: int) -> None:
|
|
"""Validate user inputs for security and sanity."""
|
|
if not title or not isinstance(title, str):
|
|
raise ValidationError("Title is required and must be a string")
|
|
if len(title.strip()) == 0:
|
|
raise ValidationError("Title cannot be empty")
|
|
if len(title) > 200:
|
|
raise ValidationError("Title is too long (max 200 characters)")
|
|
|
|
if not detail or not isinstance(detail, str):
|
|
raise ValidationError("Detail is required and must be a string")
|
|
if len(detail.strip()) == 0:
|
|
raise ValidationError("Detail cannot be empty")
|
|
if len(detail) > 500:
|
|
raise ValidationError("Detail is too long (max 500 characters)")
|
|
|
|
if file_size > MAX_FILE_SIZE_MB * 1024 * 1024:
|
|
raise ValidationError(f"File size exceeds {MAX_FILE_SIZE_MB} MB limit")
|
|
|
|
|
|
def normalize_time(time_str: str):
|
|
"""Parse time string in formats %H:%M or %H:%M:%S."""
|
|
for fmt in ('%H:%M', '%H:%M:%S'):
|
|
try:
|
|
return datetime.strptime(time_str, fmt).time()
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def validate_excel_template(df: pd.DataFrame) -> None:
|
|
"""Validate that Excel has required columns."""
|
|
missing_cols = set(REQUIRED_COLUMNS) - set(df.columns)
|
|
if missing_cols:
|
|
raise TemplateError(
|
|
f"Excel template missing required columns: {', '.join(missing_cols)}. "
|
|
f"Expected: {', '.join(REQUIRED_COLUMNS)}"
|
|
)
|
|
|
|
|
|
def read_excel(file_content: bytes, show_debug: bool = False) -> tuple:
|
|
"""
|
|
Parse Excel file and return (valid_data, error_rows).
|
|
|
|
Handles different column naming conventions:
|
|
- Old format: Datum, Zacatek, Konec, Program, Typ, Garant, Poznamka
|
|
- New template: Datum, Zacatek bloku, Konec bloku, Nazev bloku, Typ bloku, Garant, Poznamka
|
|
|
|
Returns:
|
|
tuple: (pandas.DataFrame with valid rows, list of dicts with error details)
|
|
"""
|
|
try:
|
|
excel_data = pd.read_excel(BytesIO(file_content), skiprows=0)
|
|
except Exception as e:
|
|
raise TemplateError(f"Failed to read Excel file: {str(e)}")
|
|
|
|
# Map column names from various possible names to our standard names
|
|
column_mapping = {
|
|
'Zacatek bloku': 'Zacatek',
|
|
'Konec bloku': 'Konec',
|
|
'Nazev bloku': 'Program',
|
|
'Typ bloku': 'Typ',
|
|
}
|
|
|
|
excel_data = excel_data.rename(columns=column_mapping)
|
|
|
|
# Validate template
|
|
validate_excel_template(excel_data)
|
|
|
|
if show_debug:
|
|
logger.debug(f"Raw data:\n{excel_data.head()}")
|
|
|
|
error_rows = []
|
|
valid_data = []
|
|
|
|
for index, row in excel_data.iterrows():
|
|
try:
|
|
datum = pd.to_datetime(row["Datum"], errors='coerce').date()
|
|
zacatek = normalize_time(str(row["Zacatek"]))
|
|
konec = normalize_time(str(row["Konec"]))
|
|
|
|
if pd.isna(datum) or zacatek is None or konec is None:
|
|
raise ValueError("Invalid date or time format")
|
|
|
|
valid_data.append({
|
|
"index": index,
|
|
"Datum": datum,
|
|
"Zacatek": zacatek,
|
|
"Konec": konec,
|
|
"Program": row["Program"],
|
|
"Typ": row["Typ"],
|
|
"Garant": row["Garant"],
|
|
"Poznamka": row["Poznamka"],
|
|
"row_data": row
|
|
})
|
|
except Exception as e:
|
|
error_rows.append({"index": index, "row": row, "error": str(e)})
|
|
|
|
valid_data = pd.DataFrame(valid_data)
|
|
|
|
# Early return if no valid rows
|
|
if valid_data.empty:
|
|
logger.warning("No valid rows after parsing")
|
|
return valid_data.drop(columns='index', errors='ignore'), error_rows
|
|
|
|
if show_debug:
|
|
logger.debug(f"Cleaned data:\n{valid_data.head()}")
|
|
logger.debug(f"Error rows: {error_rows}")
|
|
|
|
# Detect overlaps
|
|
overlap_errors = []
|
|
for date, group in valid_data.groupby('Datum'):
|
|
sorted_group = group.sort_values(by='Zacatek')
|
|
previous_end_time = None
|
|
for _, r in sorted_group.iterrows():
|
|
if previous_end_time and r['Zacatek'] < previous_end_time:
|
|
overlap_errors.append({
|
|
"index": r["index"],
|
|
"Datum": r["Datum"],
|
|
"Zacatek": r["Zacatek"],
|
|
"Konec": r["Konec"],
|
|
"Program": r["Program"],
|
|
"Typ": r["Typ"],
|
|
"Garant": r["Garant"],
|
|
"Poznamka": r["Poznamka"],
|
|
"Error": f"Overlapping time block with previous block ending at {previous_end_time}",
|
|
"row_data": r["row_data"]
|
|
})
|
|
previous_end_time = r['Konec']
|
|
|
|
if overlap_errors:
|
|
if show_debug:
|
|
logger.debug(f"Overlap errors: {overlap_errors}")
|
|
valid_data = valid_data[~valid_data.index.isin([e['index'] for e in overlap_errors])]
|
|
error_rows.extend(overlap_errors)
|
|
|
|
return valid_data.drop(columns='index'), error_rows
|
|
|
|
|
|
def get_program_types(form_data: dict) -> tuple:
|
|
"""
|
|
Extract program types from form data.
|
|
|
|
Form fields: type_code_{i}, desc_{i}, color_{i}
|
|
|
|
Returns:
|
|
tuple: (program_descriptions dict, program_colors dict)
|
|
"""
|
|
program_descriptions = {}
|
|
program_colors = {}
|
|
|
|
def get_value(data, key, default=''):
|
|
# Support both dict-like and cgi.FieldStorage objects
|
|
if hasattr(data, 'getvalue'):
|
|
return data.getvalue(key, default)
|
|
return data.get(key, default)
|
|
|
|
for key in list(form_data.keys()):
|
|
if key.startswith('type_code_'):
|
|
index = key.split('_')[-1]
|
|
type_code = (get_value(form_data, f'type_code_{index}', '') or '').strip()
|
|
description = (get_value(form_data, f'desc_{index}', '') or '').strip()
|
|
raw_color = (get_value(form_data, f'color_{index}', DEFAULT_COLOR) or DEFAULT_COLOR)
|
|
|
|
if not type_code:
|
|
continue
|
|
|
|
color_hex = 'FF' + str(raw_color).lstrip('#')
|
|
program_descriptions[type_code] = description
|
|
program_colors[type_code] = color_hex
|
|
|
|
return program_descriptions, program_colors
|
|
|
|
|
|
def calculate_row_height(cell_value, column_width):
|
|
"""Calculate row height based on content."""
|
|
if not cell_value:
|
|
return 15
|
|
max_line_length = column_width * 1.2
|
|
lines = str(cell_value).split('\n')
|
|
line_count = 0
|
|
for line in lines:
|
|
line_count += len(line) // max_line_length + 1
|
|
return line_count * 15
|
|
|
|
|
|
def calculate_column_width(text):
|
|
"""Calculate column width based on text length."""
|
|
max_length = max(len(line) for line in str(text).split('\n'))
|
|
return max_length * 1.2
|
|
|
|
|
|
def create_timetable(data: pd.DataFrame, title: str, detail: str,
|
|
program_descriptions: dict, program_colors: dict) -> Workbook:
|
|
"""
|
|
Create an OpenPyXL timetable workbook.
|
|
|
|
Args:
|
|
data: DataFrame with validated schedule data
|
|
title: Event title
|
|
detail: Event detail/description
|
|
program_descriptions: {type: description}
|
|
program_colors: {type: color_hex}
|
|
|
|
Returns:
|
|
openpyxl.Workbook
|
|
|
|
Raises:
|
|
ScenarsError: if data is invalid or types are missing
|
|
"""
|
|
if data.empty:
|
|
raise ScenarsError("Data is empty after validation")
|
|
|
|
missing_types = [typ for typ in data["Typ"].unique() if typ not in program_colors]
|
|
if missing_types:
|
|
raise ScenarsError(
|
|
f"Missing type definitions: {', '.join(missing_types)}. "
|
|
"Please define all program types."
|
|
)
|
|
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
|
|
thick_border = Border(left=Side(style='thick', color='000000'),
|
|
right=Side(style='thick', color='000000'),
|
|
top=Side(style='thick', color='000000'),
|
|
bottom=Side(style='thick', color='000000'))
|
|
|
|
# Title and detail
|
|
ws['A1'] = title
|
|
ws['A1'].alignment = Alignment(horizontal="center", vertical="center")
|
|
ws['A1'].font = Font(size=24, bold=True)
|
|
ws['A1'].border = thick_border
|
|
|
|
ws['A2'] = detail
|
|
ws['A2'].alignment = Alignment(horizontal="center", vertical="center")
|
|
ws['A2'].font = Font(size=16, italic=True)
|
|
ws['A2'].border = thick_border
|
|
|
|
if ws.column_dimensions[get_column_letter(1)].width is None:
|
|
ws.column_dimensions[get_column_letter(1)].width = 40
|
|
|
|
title_row_height = calculate_row_height(title, ws.column_dimensions[get_column_letter(1)].width)
|
|
detail_row_height = calculate_row_height(detail, ws.column_dimensions[get_column_letter(1)].width)
|
|
ws.row_dimensions[1].height = title_row_height
|
|
ws.row_dimensions[2].height = detail_row_height
|
|
|
|
data = data.sort_values(by=["Datum", "Zacatek"])
|
|
|
|
start_times = data["Zacatek"]
|
|
end_times = data["Konec"]
|
|
|
|
if start_times.isnull().any() or end_times.isnull().any():
|
|
raise ScenarsError("Data contains invalid time values")
|
|
|
|
try:
|
|
min_time = min(start_times)
|
|
max_time = max(end_times)
|
|
except ValueError as e:
|
|
raise ScenarsError(f"Error determining time range: {e}")
|
|
|
|
time_slots = pd.date_range(
|
|
datetime.combine(datetime.today(), min_time),
|
|
datetime.combine(datetime.today(), max_time),
|
|
freq='15min'
|
|
).time
|
|
|
|
total_columns = len(time_slots) + 1
|
|
ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=total_columns)
|
|
ws.merge_cells(start_row=2, start_column=1, end_row=2, end_column=total_columns)
|
|
|
|
row_offset = 3
|
|
col_offset = 1
|
|
cell = ws.cell(row=row_offset, column=col_offset, value="Datum")
|
|
cell.fill = PatternFill(start_color="D3D3D3", end_color="D3D3D3", fill_type="solid")
|
|
cell.alignment = Alignment(horizontal="center", vertical="center")
|
|
cell.font = Font(bold=True)
|
|
cell.border = thick_border
|
|
|
|
for i, time_slot in enumerate(time_slots, start=col_offset + 1):
|
|
cell = ws.cell(row=row_offset, column=i, value=time_slot.strftime("%H:%M"))
|
|
cell.fill = PatternFill(start_color="D3D3D3", end_color="D3D3D3", fill_type="solid")
|
|
cell.alignment = Alignment(horizontal="center", vertical="center")
|
|
cell.font = Font(bold=True)
|
|
cell.border = thick_border
|
|
|
|
current_row = row_offset + 1
|
|
grouped_data = data.groupby(data['Datum'])
|
|
|
|
for date, group in grouped_data:
|
|
day_name = date.strftime("%A")
|
|
date_str = date.strftime(f"%d.%m {day_name}")
|
|
|
|
cell = ws.cell(row=current_row, column=col_offset, value=date_str)
|
|
cell.alignment = Alignment(horizontal="center", vertical="center")
|
|
cell.fill = PatternFill(start_color="D3D3D3", end_color="D3D3D3", fill_type="solid")
|
|
cell.font = Font(bold=True, size=14)
|
|
cell.border = thick_border
|
|
|
|
# Track which cells are already filled (for overlap detection)
|
|
date_row = current_row
|
|
occupied_cells = set() # (row, col) pairs already filled
|
|
|
|
for _, row in group.iterrows():
|
|
start_time = row["Zacatek"]
|
|
end_time = row["Konec"]
|
|
try:
|
|
start_index = list(time_slots).index(start_time) + col_offset + 1
|
|
end_index = list(time_slots).index(end_time) + col_offset + 1
|
|
except ValueError as e:
|
|
logger.error(f"Time slot not found: {start_time} to {end_time}")
|
|
continue
|
|
|
|
cell_value = f"{row['Program']}"
|
|
if pd.notna(row['Garant']):
|
|
cell_value += f"\n{row['Garant']}"
|
|
if pd.notna(row['Poznamka']):
|
|
cell_value += f"\n\n{row['Poznamka']}"
|
|
|
|
# Check for overlaps
|
|
working_row = date_row + 1
|
|
conflict = False
|
|
for col in range(start_index, end_index):
|
|
if (working_row, col) in occupied_cells:
|
|
conflict = True
|
|
break
|
|
|
|
# If conflict, find next available row
|
|
if conflict:
|
|
while any((working_row, col) in occupied_cells for col in range(start_index, end_index)):
|
|
working_row += 1
|
|
|
|
# Mark cells as occupied
|
|
for col in range(start_index, end_index):
|
|
occupied_cells.add((working_row, col))
|
|
|
|
try:
|
|
ws.merge_cells(start_row=working_row, start_column=start_index,
|
|
end_row=working_row, end_column=end_index - 1)
|
|
# Get the first cell of the merge (not the merged cell)
|
|
cell = ws.cell(row=working_row, column=start_index)
|
|
cell.value = cell_value
|
|
|
|
except Exception as e:
|
|
raise ScenarsError(f"Error creating timetable cell: {str(e)}")
|
|
|
|
cell.alignment = Alignment(wrap_text=True, horizontal="center", vertical="center")
|
|
lines = str(cell_value).split("\n")
|
|
for idx, _ in enumerate(lines):
|
|
if idx == 0:
|
|
cell.font = Font(bold=True)
|
|
elif idx == 1:
|
|
cell.font = Font(bold=False)
|
|
elif idx > 1 and pd.notna(row['Poznamka']):
|
|
cell.font = Font(italic=True)
|
|
|
|
cell.fill = PatternFill(start_color=program_colors[row["Typ"]],
|
|
end_color=program_colors[row["Typ"]],
|
|
fill_type="solid")
|
|
cell.border = thick_border
|
|
|
|
# Update current_row to be after all rows for this date
|
|
if occupied_cells:
|
|
max_row_for_date = max(r for r, c in occupied_cells)
|
|
current_row = max_row_for_date + 1
|
|
else:
|
|
current_row += 1
|
|
|
|
# Legend
|
|
legend_row = current_row + 2
|
|
legend_max_length = 0
|
|
ws.cell(row=legend_row, column=1, value="Legenda:").font = Font(bold=True)
|
|
legend_row += 1
|
|
for typ, desc in program_descriptions.items():
|
|
legend_text = f"{desc} ({typ})"
|
|
legend_cell = ws.cell(row=legend_row, column=1, value=legend_text)
|
|
legend_cell.fill = PatternFill(start_color=program_colors[typ], fill_type="solid")
|
|
legend_max_length = max(legend_max_length, calculate_column_width(legend_text))
|
|
legend_row += 1
|
|
|
|
ws.column_dimensions[get_column_letter(1)].width = legend_max_length
|
|
for col in range(2, total_columns + 1):
|
|
ws.column_dimensions[get_column_letter(col)].width = 15
|
|
|
|
for row in ws.iter_rows(min_row=1, max_row=current_row - 1, min_col=1, max_col=total_columns):
|
|
for cell in row:
|
|
cell.alignment = Alignment(wrap_text=True, horizontal="center", vertical="center")
|
|
cell.border = thick_border
|
|
|
|
for row in ws.iter_rows(min_row=1, max_row=current_row - 1):
|
|
max_height = 0
|
|
for cell in row:
|
|
if cell.value:
|
|
height = calculate_row_height(cell.value, ws.column_dimensions[get_column_letter(cell.column)].width)
|
|
if height > max_height:
|
|
max_height = height
|
|
ws.row_dimensions[row[0].row].height = max_height
|
|
|
|
return wb
|
|
|
|
|
|
def parse_inline_schedule(form_data) -> pd.DataFrame:
|
|
"""
|
|
Parse inline schedule form data into DataFrame.
|
|
|
|
Form fields:
|
|
datum_{i}, zacatek_{i}, konec_{i}, program_{i}, typ_{i}, garant_{i}, poznamka_{i}
|
|
|
|
Args:
|
|
form_data: dict or cgi.FieldStorage with form data
|
|
|
|
Returns:
|
|
DataFrame with parsed schedule data
|
|
|
|
Raises:
|
|
ValidationError: if required fields missing or invalid
|
|
"""
|
|
rows = []
|
|
row_indices = set()
|
|
|
|
# Helper to get value from both dict and FieldStorage
|
|
def get_value(data, key, default=''):
|
|
if hasattr(data, 'getvalue'): # cgi.FieldStorage
|
|
return data.getvalue(key, default).strip()
|
|
else: # dict
|
|
return data.get(key, default).strip()
|
|
|
|
# Find all row indices
|
|
for key in form_data.keys():
|
|
if key.startswith('datum_'):
|
|
idx = key.split('_')[-1]
|
|
row_indices.add(idx)
|
|
|
|
for idx in sorted(row_indices, key=int):
|
|
datum_str = get_value(form_data, f'datum_{idx}', '')
|
|
zacatek_str = get_value(form_data, f'zacatek_{idx}', '')
|
|
konec_str = get_value(form_data, f'konec_{idx}', '')
|
|
program = get_value(form_data, f'program_{idx}', '')
|
|
typ = get_value(form_data, f'typ_{idx}', '')
|
|
garant = get_value(form_data, f'garant_{idx}', '')
|
|
poznamka = get_value(form_data, f'poznamka_{idx}', '')
|
|
|
|
# Skip empty rows
|
|
if not any([datum_str, zacatek_str, konec_str, program, typ]):
|
|
continue
|
|
|
|
# Validate required fields
|
|
if not all([datum_str, zacatek_str, konec_str, program, typ]):
|
|
raise ValidationError(
|
|
f"Řádek {int(idx)+1}: Všechna povinná pole (Datum, Začátek, Konec, Program, Typ) musí být vyplněna"
|
|
)
|
|
|
|
try:
|
|
datum = pd.to_datetime(datum_str).date()
|
|
except Exception:
|
|
raise ValidationError(f"Řádek {int(idx)+1}: Neplatné datum")
|
|
|
|
zacatek = normalize_time(zacatek_str)
|
|
konec = normalize_time(konec_str)
|
|
|
|
if zacatek is None or konec is None:
|
|
raise ValidationError(f"Řádek {int(idx)+1}: Neplatný čas (použijte HH:MM nebo HH:MM:SS)")
|
|
|
|
rows.append({
|
|
'Datum': datum,
|
|
'Zacatek': zacatek,
|
|
'Konec': konec,
|
|
'Program': program,
|
|
'Typ': typ,
|
|
'Garant': garant if garant else None,
|
|
'Poznamka': poznamka if poznamka else None,
|
|
})
|
|
|
|
if not rows:
|
|
raise ValidationError("Žádné platné řádky ve formuláři")
|
|
|
|
return pd.DataFrame(rows)
|
|
|
|
|
|
def parse_inline_types(form_data) -> tuple:
|
|
"""
|
|
Parse inline type definitions from form data.
|
|
|
|
Form fields: type_name_{i}, type_desc_{i}, type_color_{i}
|
|
|
|
Args:
|
|
form_data: dict or cgi.FieldStorage with form data
|
|
|
|
Returns:
|
|
tuple: (program_descriptions dict, program_colors dict)
|
|
"""
|
|
descriptions = {}
|
|
colors = {}
|
|
type_indices = set()
|
|
|
|
# Helper to get value from both dict and FieldStorage
|
|
def get_value(data, key, default=''):
|
|
if hasattr(data, 'getvalue'): # cgi.FieldStorage
|
|
return data.getvalue(key, default).strip()
|
|
else: # dict
|
|
return data.get(key, default).strip()
|
|
|
|
# Find all type indices
|
|
for key in form_data.keys():
|
|
if key.startswith('type_name_'):
|
|
idx = key.split('_')[-1]
|
|
type_indices.add(idx)
|
|
|
|
for idx in sorted(type_indices, key=int):
|
|
type_name = get_value(form_data, f'type_name_{idx}', '')
|
|
type_desc = get_value(form_data, f'type_desc_{idx}', '')
|
|
type_color = get_value(form_data, f'type_color_{idx}', DEFAULT_COLOR)
|
|
|
|
# Skip empty types
|
|
if not type_name:
|
|
continue
|
|
|
|
descriptions[type_name] = type_desc
|
|
colors[type_name] = 'FF' + type_color.lstrip('#')
|
|
|
|
return descriptions, colors
|