system-pri/backend/app/coordinator/utils.py

335 lines
10 KiB
Python

import copy
import json
from collections import defaultdict
from datetime import datetime, timedelta
from io import BytesIO
from pathlib import Path
from typing import Any, Generator, List, TextIO, Tuple, Union
import pandas as pd
from flask import current_app
from reportlab.lib import colors
from reportlab.lib.enums import TA_CENTER
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib.units import inch, mm
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.platypus import PageBreak, Paragraph, SimpleDocTemplate, Table
from werkzeug.datastructures import FileStorage
from ..base.mode import ModeGroups
from ..examination_schedule.models import TermOfDefence
from ..students.models import Group, ProjectGradeSheet, Student
from .exceptions import InvalidNameOrTypeHeaderException
def check_columns(df: pd.DataFrame) -> bool:
headers = set(df.keys().values)
column_names = ["NAZWISKO", "IMIE", "INDEKS", "EMAIL"]
column_types = ["object", "object", "int", "object"]
return all((column_name in headers for column_name in column_names)) and all(
(
str(df.dtypes[column_name]).startswith(column_type)
for column_name, column_type in zip(column_names, column_types)
)
)
def parse_csv(
file: Union[FileStorage, TextIO], year_group_id: int
) -> Generator[Student, Any, None]:
df = pd.read_csv(file)
if not check_columns(df):
raise InvalidNameOrTypeHeaderException
students = (
Student(
last_name=dict(item.items())["NAZWISKO"],
first_name=dict(item.items())["IMIE"],
index=dict(item.items())["INDEKS"],
email=dict(item.items())["EMAIL"],
year_group_id=year_group_id,
)
for _, item in df.iterrows()
)
return students
def map_project_supervisors(groups: List[Group]) -> dict:
i = 1
mapped_project_supervisors = {}
for group in groups:
if group.project_supervisor_id not in mapped_project_supervisors.keys():
mapped_project_supervisors[group.project_supervisor_id] = i
i += 1
return mapped_project_supervisors
def generate_csv(students_and_groups: List[Tuple[Student, Group]]) -> str:
headers = [
"INDEKS",
"IMIE",
"NAZWISKO",
"EMAIL",
"CDYD_KOD",
"PRZ_KOD",
"TZAJ_KOD",
"GR_NR",
"PRG_KOD",
]
mapped_project_supervisors_id = map_project_supervisors(
[group for _, group in students_and_groups]
)
data = [
(
student.index,
student.first_name,
student.last_name,
student.email,
group.cdyd_kod,
group.prz_kod,
group.tzaj_kod,
mapped_project_supervisors_id[group.project_supervisor_id],
None,
)
for student, group in students_and_groups
]
dataframe = defaultdict(list)
for row in data:
for idx, item in enumerate(row):
dataframe[headers[idx]].append(item)
df = pd.DataFrame(dataframe)
return df.to_csv(index=False)
def generate_range_dates(
start_date: datetime, end_date: datetime, step_in_minutes: int
) -> Generator[Union[datetime, timedelta], Any, None]:
current_date = copy.copy(start_date)
while True:
next_date = current_date + timedelta(minutes=step_in_minutes)
if next_date > end_date:
break
yield current_date
current_date = copy.copy(next_date)
def generate_examination_schedule_pdf_file(
title: str, nested_term_of_defences: List[List[TermOfDefence]], base_dir: Path
) -> bytes:
pagesize = (297 * mm, 210 * mm)
headers = [
"lp.",
"Godzina",
"Nazwa projektu",
"Opiekun",
"Zespol",
"Komisja",
"Uwagi",
]
pdf_buffer = BytesIO()
my_doc = SimpleDocTemplate(
pdf_buffer,
pagesize=pagesize,
topMargin=1 * inch,
leftMargin=1 * inch,
rightMargin=1 * inch,
bottomMargin=1 * inch,
title=title,
)
pdfmetrics.registerFont(TTFont("Lato", base_dir / "fonts" / "Lato.ttf"))
style = getSampleStyleSheet()
bodyText = style["BodyText"]
bodyText.fontName = "Lato"
normal = style["Heading1"]
normal.alignment = TA_CENTER
flowables = []
# print(nested_enrollments)
for term_of_defences in nested_term_of_defences:
if len(term_of_defences) == 0:
continue
date = datetime.strftime(term_of_defences[0].start_date.date(), "%d.%m.%Y")
paragraph_1 = Paragraph(f"{title} ~ {date}", normal)
flowables.append(paragraph_1)
data = [headers]
for idx, td in enumerate(term_of_defences, start=1):
new_date = td.start_date + timedelta(hours=2)
group_name = td.group.name if td.group is not None else ""
if group_name != "":
ps = td.group.project_supervisor
project_supervisor_fullname = f"{ps.first_name[0]}. {ps.last_name}"
students = td.group.students
team = ", ".join([f"{s.first_name} {s.last_name}" for s in students])
else:
project_supervisor_fullname = ""
team = ""
members = td.members_of_committee
if len(members) == 0:
committee = ""
else:
members_iter = (f"{m.first_name[0]}. {m.last_name}" for m in members)
committee = ", ".join(members_iter)
data.append(
[
str(idx),
new_date.strftime("%H:%M"),
Paragraph(group_name, bodyText),
Paragraph(project_supervisor_fullname, bodyText),
Paragraph(team, bodyText),
Paragraph(committee, bodyText),
]
)
# print(data)
table = Table(
data=data,
style=[
("GRID", (0, 0), (-1, -1), 0.5, colors.black),
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#A6F1A6")),
("BACKGROUND", (0, 0), (1, -1), colors.HexColor("#A6F1A6")),
],
colWidths=[
0.25 * inch,
0.7 * inch,
1.6 * inch,
1.5 * inch,
2.5 * inch,
2.2 * inch,
2 * inch,
],
)
flowables.append(table)
flowables.append(PageBreak())
my_doc.build(flowables)
pdf_value = pdf_buffer.getvalue()
pdf_buffer.close()
return pdf_value
def get_duration_time(mode: str) -> int:
duration_time = None
if mode == ModeGroups.NON_STATIONARY.value:
duration_time = 20
elif mode in [
ModeGroups.STATIONARY.value,
ModeGroups.ENGLISH_SPEAKING_STATIONARY.value,
]:
duration_time = 30
return duration_time
def load_weight_for_project_grade_sheet() -> Union[dict, None]:
base_dir = current_app.config.get("BASE_DIR")
config_dir = base_dir / "config"
with open(config_dir / "weights_project_grade_sheet.json") as f:
data = json.load(f)
return data
def get_criterion_by_weight_key(weight_key: str) -> str:
if weight_key.startswith("presentation"):
return "presentation"
if weight_key.startswith("documentation"):
return "documentation"
if weight_key.startswith("group_work"):
return "group_work"
return "product_project"
def grade_in_percentage(term_key: str, term_points: dict) -> str:
try:
criterions = {
"presentation": current_app.config.get(f"PRESENTATION_WEIGHT_{term_key}"),
"group_work": current_app.config.get(f"GROUP_WORK_WEIGHT_{term_key}"),
"documentation": current_app.config.get(f"DOCUMENTATION_WEIGHT_{term_key}"),
"product_project": current_app.config.get(
f"PRODUCT_PROJECT_WEIGHT_{term_key}"
),
}
result = 0
for criterion_key, criterion_weight in criterions.items():
result += (
term_points[criterion_key]["gained_points"]
/ term_points[criterion_key]["all_points"]
* criterion_weight
)
result /= sum(criterions.values())
except ZeroDivisionError:
result = 0
return result
def calculate_points_for_both_terms(
weights: dict, project_grade_sheet: ProjectGradeSheet
) -> Tuple[float, float]:
if project_grade_sheet is None:
return 0.0, 0.0
first_term_points = {
"presentation": {"gained_points": 0, "all_points": 0},
"documentation": {"gained_points": 0, "all_points": 0},
"group_work": {"gained_points": 0, "all_points": 0},
"product_project": {"gained_points": 0, "all_points": 0},
}
second_term_points = copy.deepcopy(first_term_points)
for weight_key, weight_value in weights.items():
points = first_term_points if weight_key.endswith("1") else second_term_points
criterion = get_criterion_by_weight_key(weight_key)
try:
attribute_value = getattr(project_grade_sheet, weight_key)
except AttributeError:
attribute_value = 0
points[criterion]["gained_points"] += attribute_value / 4 * weight_value
points[criterion]["all_points"] += weight_value
points_1 = round(grade_in_percentage("FIRST_TERM", first_term_points) * 100, 1)
points_2 = round(grade_in_percentage("SECOND_TERM", second_term_points) * 100, 1)
return points_1, points_2
def attach_points_for_first_and_second_term_to_group(group: Group) -> None:
weights = load_weight_for_project_grade_sheet()
pgs = group.project_grade_sheet
if len(pgs) == 0:
pgs = None
else:
pgs = pgs[0]
points = calculate_points_for_both_terms(weights, pgs)
group.points_for_first_term = points[0]
group.points_for_second_term = points[1]
def get_term_grade(point: float) -> float:
if point >= 91.0:
return 5
if point >= 81.0:
return 4.5
if point >= 71.0:
return 4
if point >= 61.0:
return 3.5
if point >= 51.0:
return 3
return 2
def attach_grade_to_group_models(groups: List[Group]) -> None:
for group in groups:
if group.grade_for_first_term == 0:
group.grade_for_first_term = get_term_grade(group.points_for_first_term)
if group.grade_for_second_term == 0:
group.grade_for_second_term = get_term_grade(group.points_for_second_term)