import datetime from collections import defaultdict from io import BytesIO from itertools import chain from typing import Generator, Any, List, Tuple import pandas as pd from reportlab.lib import colors from reportlab.lib.enums import TA_CENTER from reportlab.lib.styles import getSampleStyleSheet from reportlab.lib.units import mm, inch from reportlab.platypus import SimpleDocTemplate, Paragraph, PageBreak, Table from werkzeug.datastructures import FileStorage from .exceptions import InvalidNameOrTypeHeaderException from ..students.models import Student, Group def check_columns(df: pd.DataFrame) -> bool: headers = set(df.keys().values) columns = ['NAZWISKO', 'IMIE', 'INDEKS', 'PESEL', 'EMAIL'] if len(headers - set(columns)) != 0: return False flag = True col_types = ['object', 'object', 'int', 'float64', 'object'] for name, col_type in zip(columns, col_types): if not str(df.dtypes[name]).startswith(col_type): flag = False break return flag def parse_csv(file: FileStorage) -> Generator[Student, Any, None]: df = pd.read_csv(file) if not check_columns(df): raise InvalidNameOrTypeHeaderException students = (Student(last_name=dict(item.items())['NAZWISKO'], first_name=dict(item.items())['IMIE'], index=dict(item.items())['INDEKS'], pesel=str(int(dict(item.items())['PESEL'])) if not pd.isna( dict(item.items())['PESEL']) else None, email=dict(item.items())['EMAIL']) for _, item in df.iterrows()) return students def generate_csv(students_and_groups: List[Tuple[Student, Group]]) -> str: headers = ['PESEL', 'INDEKS', 'IMIE', 'NAZWISKO', 'EMAIL', 'CDYD_KOD', 'PRZ_KOD', 'TZAJ_KOD', 'GR_NR', 'PRG_KOD'] data = [(student.pesel, student.index, student.first_name, student.last_name, student.email, group.cdyd_kod, group.prz_kod, group.tzaj_kod, group.project_supervisor_id, None) for student, group in students_and_groups] dataframe = defaultdict(list) for row in data: for idx, item in enumerate(row): dataframe[headers[idx]].append(item) df = pd.DataFrame(dataframe) return df.to_csv(index=False) def generate_examination_schedule_pdf_file(title: str, nested_enrollments: List[List[object]]) -> bytes: pagesize = (297 * mm, 210 * mm) headers = ["lp.", "Godzina", "Nazwa projektu", "Opiekun", "Zespol", "Komisja"] pdf_buffer = BytesIO() my_doc = SimpleDocTemplate( pdf_buffer, pagesize=pagesize, topMargin=1 * inch, leftMargin=1 * inch, rightMargin=1 * inch, bottomMargin=1 * inch, title=title ) style = getSampleStyleSheet() bodyText = style['BodyText'] bodyText.fontName = 'Helvetica' normal = style["Heading1"] normal.alignment = TA_CENTER flowables = [] # print(nested_enrollments) for enrollments in nested_enrollments: if len(enrollments) == 0: continue date = datetime.datetime.strftime(enrollments[0].start_date, '%d/%m/%Y') paragraph_1 = Paragraph(f"{title} ~ {date}", normal) flowables.append(paragraph_1) data = [headers] for idx, e in enumerate(enrollments, start=1): new_date = e.start_date + datetime.timedelta(hours=2) group_name = e.group.name if e.group is not None else "" if group_name != '': ps = e.group.project_supervisor project_supervisor_fullname = f"{ps.first_name[0]}. {ps.last_name}" students = e.group.students # print(students) team = ", ".join([f"{s.first_name} {s.last_name}" for s in students]) else: project_supervisor_fullname = "" team = "" members = e.committee.members # print(members) if len(members) == 0: committee = '' else: members_iter = (f"{m.first_name[0]} {m.last_name}" for m in members) if project_supervisor_fullname != '': members_iter = chain(members_iter, [project_supervisor_fullname]) committee = ", ".join(members_iter) data.append([str(idx), new_date.strftime("%H:%M"), Paragraph(group_name, bodyText), Paragraph(project_supervisor_fullname, bodyText), Paragraph(team, bodyText), Paragraph(committee, bodyText), ]) # print(data) table = Table(data=data, style=[ ('GRID', (0, 0), (-1, -1), 0.5, colors.black), ('BACKGROUND', (0, 0), (-1, 0), colors.HexColor("#A6F1A6")), ('BACKGROUND', (0, 0), (1, -1), colors.HexColor("#A6F1A6")) ], colWidths=[0.25 * inch, 0.7 * inch, 1.6 * inch, 1.5 * inch, 4 * inch, 3 * inch] ) flowables.append(table) flowables.append(PageBreak()) my_doc.build(flowables) pdf_value = pdf_buffer.getvalue() pdf_buffer.close() return pdf_value