system-pri/backend/app/coordinator/utils.py

324 lines
10 KiB
Python

import copy
import json
from collections import defaultdict
from datetime import datetime, timedelta
from io import BytesIO
from pathlib import Path
from typing import Any, Generator, List, TextIO, Tuple, Union
import pandas as pd
from flask import current_app
from reportlab.lib import colors
from reportlab.lib.enums import TA_CENTER
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib.units import inch, mm
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.platypus import PageBreak, Paragraph, SimpleDocTemplate, Table
from werkzeug.datastructures import FileStorage
from ..base.mode import ModeGroups
from ..examination_schedule.models import TermOfDefence
from ..students.models import Group, ProjectGradeSheet, Student
from .exceptions import InvalidNameOrTypeHeaderException
def check_columns(df: pd.DataFrame) -> bool:
headers = set(df.keys().values)
column_names = ["NAZWISKO", "IMIE", "INDEKS", "EMAIL"]
column_types = ["object", "object", "int", "object"]
return all((column_name in headers for column_name in column_names)) and all(
(
str(df.dtypes[column_name]).startswith(column_type)
for column_name, column_type in zip(column_names, column_types)
)
)
def parse_csv(
file: Union[FileStorage, TextIO], year_group_id: int
) -> Generator[Student, Any, None]:
df = pd.read_csv(file)
if not check_columns(df):
raise InvalidNameOrTypeHeaderException
students = (
Student(
last_name=dict(item.items())["NAZWISKO"],
first_name=dict(item.items())["IMIE"],
index=dict(item.items())["INDEKS"],
email=dict(item.items())["EMAIL"],
year_group_id=year_group_id,
)
for _, item in df.iterrows()
)
return students
def map_project_supervisors(groups: List[Group]) -> dict:
i = 1
mapped_project_supervisors = {}
for group in groups:
if group.project_supervisor_id not in mapped_project_supervisors.keys():
mapped_project_supervisors[group.project_supervisor_id] = i
i += 1
return mapped_project_supervisors
def generate_csv(students_and_groups: List[Tuple[Student, Group]]) -> str:
headers = [
"INDEKS",
"IMIE",
"NAZWISKO",
"EMAIL",
"CDYD_KOD",
"PRZ_KOD",
"TZAJ_KOD",
"GR_NR",
"PRG_KOD",
]
mapped_project_supervisors_id = map_project_supervisors(
[group for _, group in students_and_groups]
)
data = [
(
student.index,
student.first_name,
student.last_name,
student.email,
group.cdyd_kod,
group.prz_kod,
group.tzaj_kod,
mapped_project_supervisors_id[group.project_supervisor_id],
None,
)
for student, group in students_and_groups
]
dataframe = defaultdict(list)
for row in data:
for idx, item in enumerate(row):
dataframe[headers[idx]].append(item)
df = pd.DataFrame(dataframe)
return df.to_csv(index=False)
def generate_range_dates(
start_date: datetime, end_date: datetime, step_in_minutes: int
) -> Generator[Union[datetime, timedelta], Any, None]:
current_date = copy.copy(start_date)
while True:
next_date = current_date + timedelta(minutes=step_in_minutes)
if next_date > end_date:
break
yield current_date
current_date = copy.copy(next_date)
def generate_examination_schedule_pdf_file(
title: str, nested_term_of_defences: List[List[TermOfDefence]], base_dir: Path
) -> bytes:
pagesize = (297 * mm, 210 * mm)
headers = [
"lp.",
"Godzina",
"Nazwa projektu",
"Opiekun",
"Zespol",
"Komisja",
"Uwagi",
]
pdf_buffer = BytesIO()
my_doc = SimpleDocTemplate(
pdf_buffer,
pagesize=pagesize,
topMargin=1 * inch,
leftMargin=1 * inch,
rightMargin=1 * inch,
bottomMargin=1 * inch,
title=title,
)
pdfmetrics.registerFont(TTFont("Lato", base_dir / "fonts" / "Lato.ttf"))
style = getSampleStyleSheet()
bodyText = style["BodyText"]
bodyText.fontName = "Lato"
normal = style["Heading1"]
normal.alignment = TA_CENTER
flowables = []
# print(nested_enrollments)
for term_of_defences in nested_term_of_defences:
if len(term_of_defences) == 0:
continue
date = datetime.strftime(term_of_defences[0].start_date.date(), "%d.%m.%Y")
paragraph_1 = Paragraph(f"{title} ~ {date}", normal)
flowables.append(paragraph_1)
data = [headers]
for idx, td in enumerate(term_of_defences, start=1):
new_date = td.start_date + timedelta(hours=2)
group_name = td.group.name if td.group is not None else ""
if group_name != "":
ps = td.group.project_supervisor
project_supervisor_fullname = f"{ps.first_name[0]}. {ps.last_name}"
students = td.group.students
team = ", ".join([f"{s.first_name} {s.last_name}" for s in students])
else:
project_supervisor_fullname = ""
team = ""
members = td.members_of_committee
if len(members) == 0:
committee = ""
else:
members_iter = (f"{m.first_name[0]}. {m.last_name}" for m in members)
committee = ", ".join(members_iter)
data.append(
[
str(idx),
new_date.strftime("%H:%M"),
Paragraph(group_name, bodyText),
Paragraph(project_supervisor_fullname, bodyText),
Paragraph(team, bodyText),
Paragraph(committee, bodyText),
]
)
# print(data)
table = Table(
data=data,
style=[
("GRID", (0, 0), (-1, -1), 0.5, colors.black),
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#A6F1A6")),
("BACKGROUND", (0, 0), (1, -1), colors.HexColor("#A6F1A6")),
],
colWidths=[
0.25 * inch,
0.7 * inch,
1.6 * inch,
1.5 * inch,
2.5 * inch,
2.2 * inch,
2 * inch,
],
)
flowables.append(table)
flowables.append(PageBreak())
my_doc.build(flowables)
pdf_value = pdf_buffer.getvalue()
pdf_buffer.close()
return pdf_value
def get_duration_time(mode: str) -> int:
duration_time = None
if mode == ModeGroups.NON_STATIONARY.value:
duration_time = 20
elif mode in [
ModeGroups.STATIONARY.value,
ModeGroups.ENGLISH_SPEAKING_STATIONARY.value,
]:
duration_time = 30
return duration_time
def load_weight_for_project_grade_sheet() -> Union[dict, None]:
base_dir = current_app.config.get("BASE_DIR")
config_dir = base_dir / "config"
with open(config_dir / "weights_project_grade_sheet.json") as f:
data = json.load(f)
return data
def get_criterion_by_weight_key(weight_key: str) -> str:
if weight_key.startswith("presentation"):
return "presentation"
if weight_key.startswith("documentation"):
return "documentation"
if weight_key.startswith("group_work"):
return "group_work"
return "product_project"
def grade_in_percentage(term_key: str, term_points: dict) -> str:
try:
criterions = {
"presentation": current_app.config.get(f"PRESENTATION_WEIGHT_{term_key}"),
"group_work": current_app.config.get(f"GROUP_WORK_WEIGHT_{term_key}"),
"documentation": current_app.config.get(f"DOCUMENTATION_WEIGHT_{term_key}"),
"product_project": current_app.config.get(
f"PRODUCT_PROJECT_WEIGHT_{term_key}"
),
}
result = 0
for criterion_key, criterion_weight in criterions.items():
result += (
term_points[criterion_key]["gained_points"]
/ term_points[criterion_key]["all_points"]
* criterion_weight
)
result /= sum(criterions.values())
except ZeroDivisionError:
result = 0
return result
def calculate_points_for_both_terms(
weights: dict, project_grade_sheets: List[ProjectGradeSheet]
) -> list:
terms = []
for pgs in project_grade_sheets:
if pgs is None:
terms.append((0, 0))
continue
first_term_points = {
"presentation": {"gained_points": 0, "all_points": 0},
"documentation": {"gained_points": 0, "all_points": 0},
"group_work": {"gained_points": 0, "all_points": 0},
"product_project": {"gained_points": 0, "all_points": 0},
}
second_term_points = copy.deepcopy(first_term_points)
for weight_key, weight_value in weights.items():
points = (
first_term_points if weight_key.endswith("1") else second_term_points
)
criterion = get_criterion_by_weight_key(weight_key)
try:
attribute_value = getattr(pgs, weight_key)
except AttributeError:
attribute_value = 0
points[criterion]["gained_points"] += attribute_value / 4 * weight_value
points[criterion]["all_points"] += weight_value
points_1 = round(grade_in_percentage("FIRST_TERM", first_term_points) * 100, 1)
points_2 = round(
grade_in_percentage("SECOND_TERM", second_term_points) * 100, 1
)
terms.append((points_1, points_2))
return terms
def attach_points_for_first_and_second_term_to_group_models(items: List[Group]) -> None:
weights = load_weight_for_project_grade_sheet()
pgs = []
for g in items:
if len(g.project_grade_sheet) == 0:
pgs.append(None)
else:
pgs.append(g.project_grade_sheet[0])
calculated_points = calculate_points_for_both_terms(weights, pgs)
for group, points in zip(items, calculated_points):
group.points_for_first_term = points[0]
group.points_for_second_term = points[1]