284 lines
8.5 KiB
Python
284 lines
8.5 KiB
Python
import copy
|
|
import json
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
from typing import Any, Generator, List, TextIO, Tuple, Union
|
|
|
|
import pandas as pd
|
|
from flask import current_app
|
|
from reportlab.lib import colors
|
|
from reportlab.lib.enums import TA_CENTER
|
|
from reportlab.lib.styles import getSampleStyleSheet
|
|
from reportlab.lib.units import inch, mm
|
|
from reportlab.pdfbase import pdfmetrics
|
|
from reportlab.pdfbase.ttfonts import TTFont
|
|
from reportlab.platypus import PageBreak, Paragraph, SimpleDocTemplate, Table
|
|
from werkzeug.datastructures import FileStorage
|
|
|
|
from ..base.mode import ModeGroups
|
|
from ..examination_schedule.models import TermOfDefence
|
|
from ..students.models import Group, ProjectGradeSheet, Student
|
|
from .exceptions import InvalidNameOrTypeHeaderException
|
|
|
|
|
|
def check_columns(df: pd.DataFrame) -> bool:
|
|
headers = set(df.keys().values)
|
|
column_names = ["NAZWISKO", "IMIE", "INDEKS", "EMAIL"]
|
|
column_types = ["object", "object", "int", "object"]
|
|
return all((column_name in headers for column_name in column_names)) and all(
|
|
(
|
|
str(df.dtypes[column_name]).startswith(column_type)
|
|
for column_name, column_type in zip(column_names, column_types)
|
|
)
|
|
)
|
|
|
|
|
|
def parse_csv(
|
|
file: Union[FileStorage, TextIO], year_group_id: int
|
|
) -> Generator[Student, Any, None]:
|
|
df = pd.read_csv(file)
|
|
|
|
if not check_columns(df):
|
|
raise InvalidNameOrTypeHeaderException
|
|
students = (
|
|
Student(
|
|
last_name=dict(item.items())["NAZWISKO"],
|
|
first_name=dict(item.items())["IMIE"],
|
|
index=dict(item.items())["INDEKS"],
|
|
email=dict(item.items())["EMAIL"],
|
|
year_group_id=year_group_id,
|
|
)
|
|
for _, item in df.iterrows()
|
|
)
|
|
|
|
return students
|
|
|
|
|
|
def generate_csv(students_and_groups: List[Tuple[Student, Group]]) -> str:
|
|
headers = [
|
|
"INDEKS",
|
|
"IMIE",
|
|
"NAZWISKO",
|
|
"EMAIL",
|
|
"CDYD_KOD",
|
|
"PRZ_KOD",
|
|
"TZAJ_KOD",
|
|
"GR_NR",
|
|
"PRG_KOD",
|
|
]
|
|
data = [
|
|
(
|
|
student.index,
|
|
student.first_name,
|
|
student.last_name,
|
|
student.email,
|
|
group.cdyd_kod,
|
|
group.prz_kod,
|
|
group.tzaj_kod,
|
|
group.project_supervisor_id,
|
|
None,
|
|
)
|
|
for student, group in students_and_groups
|
|
]
|
|
dataframe = defaultdict(list)
|
|
for row in data:
|
|
for idx, item in enumerate(row):
|
|
dataframe[headers[idx]].append(item)
|
|
|
|
df = pd.DataFrame(dataframe)
|
|
return df.to_csv(index=False)
|
|
|
|
|
|
def generate_range_dates(
|
|
start_date: datetime, end_date: datetime, step_in_minutes: int
|
|
) -> Generator[Union[datetime, timedelta], Any, None]:
|
|
current_date = copy.copy(start_date)
|
|
while True:
|
|
next_date = current_date + timedelta(minutes=step_in_minutes)
|
|
|
|
if next_date > end_date:
|
|
break
|
|
yield current_date
|
|
current_date = copy.copy(next_date)
|
|
|
|
|
|
def generate_examination_schedule_pdf_file(
|
|
title: str, nested_term_of_defences: List[List[TermOfDefence]], base_dir: Path
|
|
) -> bytes:
|
|
pagesize = (297 * mm, 210 * mm)
|
|
headers = [
|
|
"lp.",
|
|
"Godzina",
|
|
"Nazwa projektu",
|
|
"Opiekun",
|
|
"Zespol",
|
|
"Komisja",
|
|
"Uwagi",
|
|
]
|
|
pdf_buffer = BytesIO()
|
|
my_doc = SimpleDocTemplate(
|
|
pdf_buffer,
|
|
pagesize=pagesize,
|
|
topMargin=1 * inch,
|
|
leftMargin=1 * inch,
|
|
rightMargin=1 * inch,
|
|
bottomMargin=1 * inch,
|
|
title=title,
|
|
)
|
|
|
|
pdfmetrics.registerFont(TTFont("Lato", base_dir / "fonts" / "Lato.ttf"))
|
|
style = getSampleStyleSheet()
|
|
bodyText = style["BodyText"]
|
|
bodyText.fontName = "Lato"
|
|
normal = style["Heading1"]
|
|
normal.alignment = TA_CENTER
|
|
flowables = []
|
|
|
|
# print(nested_enrollments)
|
|
for term_of_defences in nested_term_of_defences:
|
|
if len(term_of_defences) == 0:
|
|
continue
|
|
date = datetime.strftime(term_of_defences[0].start_date.date(), "%d.%m.%Y")
|
|
paragraph_1 = Paragraph(f"{title} ~ {date}", normal)
|
|
flowables.append(paragraph_1)
|
|
data = [headers]
|
|
|
|
for idx, td in enumerate(term_of_defences, start=1):
|
|
new_date = td.start_date + timedelta(hours=2)
|
|
group_name = td.group.name if td.group is not None else ""
|
|
if group_name != "":
|
|
ps = td.group.project_supervisor
|
|
project_supervisor_fullname = f"{ps.first_name[0]}. {ps.last_name}"
|
|
students = td.group.students
|
|
# print(students)
|
|
team = ", ".join([f"{s.first_name} {s.last_name}" for s in students])
|
|
else:
|
|
project_supervisor_fullname = ""
|
|
team = ""
|
|
|
|
members = td.members_of_committee
|
|
# print(members)
|
|
if len(members) == 0:
|
|
committee = ""
|
|
else:
|
|
members_iter = (f"{m.first_name[0]} {m.last_name}" for m in members)
|
|
committee = ", ".join(members_iter)
|
|
|
|
data.append(
|
|
[
|
|
str(idx),
|
|
new_date.strftime("%H:%M"),
|
|
Paragraph(group_name, bodyText),
|
|
Paragraph(project_supervisor_fullname, bodyText),
|
|
Paragraph(team, bodyText),
|
|
Paragraph(committee, bodyText),
|
|
]
|
|
)
|
|
# print(data)
|
|
|
|
table = Table(
|
|
data=data,
|
|
style=[
|
|
("GRID", (0, 0), (-1, -1), 0.5, colors.black),
|
|
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#A6F1A6")),
|
|
("BACKGROUND", (0, 0), (1, -1), colors.HexColor("#A6F1A6")),
|
|
],
|
|
colWidths=[
|
|
0.25 * inch,
|
|
0.7 * inch,
|
|
1.6 * inch,
|
|
1.5 * inch,
|
|
2.5 * inch,
|
|
2.2 * inch,
|
|
2 * inch,
|
|
],
|
|
)
|
|
flowables.append(table)
|
|
flowables.append(PageBreak())
|
|
|
|
my_doc.build(flowables)
|
|
pdf_value = pdf_buffer.getvalue()
|
|
pdf_buffer.close()
|
|
return pdf_value
|
|
|
|
|
|
def get_duration_time(mode: str) -> int:
|
|
duration_time = None
|
|
if mode == ModeGroups.NON_STATIONARY.value:
|
|
duration_time = 20
|
|
elif mode in [
|
|
ModeGroups.STATIONARY.value,
|
|
ModeGroups.ENGLISH_SPEAKING_STATIONARY.value,
|
|
]:
|
|
duration_time = 30
|
|
return duration_time
|
|
|
|
|
|
def load_weight_for_project_grade_sheet() -> Union[dict, None]:
|
|
base_dir = current_app.config.get("BASE_DIR")
|
|
config_dir = base_dir / "config"
|
|
|
|
with open(config_dir / "weights_project_grade_sheet.json") as f:
|
|
data = json.load(f)
|
|
|
|
return data
|
|
|
|
|
|
def calculate_points_for_one_term(
|
|
weights: dict, project_grade_sheets: List[ProjectGradeSheet]
|
|
) -> list:
|
|
terms = []
|
|
for pgs in project_grade_sheets:
|
|
if pgs is None:
|
|
terms.append((0, 0))
|
|
continue
|
|
|
|
first_term_points = {
|
|
"nominator": 0,
|
|
"denominator": 0,
|
|
}
|
|
second_term_points = {
|
|
"nominator": 0,
|
|
"denominator": 0,
|
|
}
|
|
for weight_key, weight_value in weights.items():
|
|
points = (
|
|
first_term_points if weight_key.endswith("1") else second_term_points
|
|
)
|
|
try:
|
|
attribute_value = getattr(pgs, weight_key)
|
|
except AttributeError:
|
|
attribute_value = 0
|
|
points["nominator"] += attribute_value * weight_value * 1 / 4
|
|
points["denominator"] += weight_value
|
|
|
|
try:
|
|
fp = first_term_points["nominator"] / first_term_points["denominator"]
|
|
except ZeroDivisionError:
|
|
fp = 0
|
|
try:
|
|
sp = second_term_points["nominator"] / second_term_points["denominator"]
|
|
except ZeroDivisionError:
|
|
sp = 0
|
|
|
|
terms.append((round(fp, 2) * 100, round(sp, 2) * 100))
|
|
|
|
return terms
|
|
|
|
|
|
def attach_points_for_first_and_second_term_to_group_models(items: List[Group]) -> None:
|
|
weights = load_weight_for_project_grade_sheet()
|
|
pgs = []
|
|
for g in items:
|
|
if len(g.project_grade_sheet) == 0:
|
|
pgs.append(None)
|
|
else:
|
|
pgs.append(g.project_grade_sheet[0])
|
|
calculated_points = calculate_points_for_one_term(weights, pgs)
|
|
|
|
for group, points in zip(items, calculated_points):
|
|
group.points_for_first_term = points[0]
|
|
group.points_for_second_term = points[1]
|