system-pri/backend/app/coordinator/routes/examination_schedule.py

144 lines
5.4 KiB
Python

import datetime
from apiflask import APIBlueprint
from flask import Response, abort, current_app, make_response
from ...base.mode import EnrollmentsMode
from ...base.schemas import MessageSchema
from ...base.utils import paginate_models
from ...dependencies import db
from ...examination_schedule.models import ExaminationSchedule, TermOfDefence
from ...project_supervisor.models import ProjectSupervisor
from ...students.models import Group, YearGroup
from ..query.enrollments import set_enrollments_mode
from ..schemas.examination_schedule import (
ExaminationScheduleSchema,
ExaminationSchedulesPaginationSchema,
ExaminationSchedulesQuerySchema,
)
from ..utils import generate_examination_schedule_pdf_file, get_duration_time
bp = APIBlueprint("examination_schedule", __name__, url_prefix="/examination_schedule")
@bp.get("/<int:year_group_id>/")
@bp.input(ExaminationSchedulesQuerySchema, location="query")
@bp.output(ExaminationSchedulesPaginationSchema)
def list_examination_schedule(year_group_id: int, query: dict) -> dict:
page = query.get("page")
per_page = query.get("per_page")
es_query = ExaminationSchedule.query.filter(
ExaminationSchedule.year_group_id == year_group_id
)
data = paginate_models(page, es_query, per_page)
return {"examination_schedules": data["items"], "max_pages": data["max_pages"]}
@bp.post("/<int:year_group_id>/")
@bp.input(ExaminationScheduleSchema)
@bp.output(MessageSchema, status_code=201)
def create_examination_schedule(year_group_id: int, data: dict) -> dict:
yg = YearGroup.query.filter(YearGroup.id == year_group_id).first()
if yg is None:
abort(404, "Year group doesn't exist!")
if data.get("start_date") > data.get("end_date"):
abort(400, "Invalid data! End date must be greater than start date!")
duration_time = get_duration_time(yg.mode)
if duration_time is None:
abort(400, "Invalid duration time!")
examination_schedule = ExaminationSchedule(
**data, year_group_id=year_group_id, duration_time=duration_time
)
db.session.add(examination_schedule)
db.session.commit()
return {"message": "Examination schedule was created!"}
@bp.put("/<int:examination_schedule_id>/")
@bp.input(ExaminationScheduleSchema)
@bp.output(MessageSchema)
def update_examination_schedule(examination_schedule_id: int, data: dict) -> dict:
examination_schedule_query = db.session.query(ExaminationSchedule).filter(
ExaminationSchedule.id == examination_schedule_id
)
examination_schedule = examination_schedule_query.first()
if examination_schedule is None:
abort(404, "Examination schedule doesn't exist!")
examination_schedule_query.update(data)
db.session.commit()
return {"message": "Examination schedule was updated!"}
@bp.delete("/<int:examination_schedule_id>/")
@bp.output(MessageSchema)
def delete_examination_schedule(examination_schedule_id: int) -> dict:
examination_schedule = (
db.session.query(ExaminationSchedule)
.filter(ExaminationSchedule.id == examination_schedule_id)
.first()
)
if examination_schedule is None:
abort(404, "Examination schedule doesn't exist!")
db.session.delete(examination_schedule)
db.session.commit()
return {"message": "Examination schedule was deleted!"}
@bp.put("/<int:examination_schedule_id>/open-enrollments/")
@bp.output(MessageSchema)
def open_enrollments(examination_schedule_id: int) -> dict:
return set_enrollments_mode(examination_schedule_id, EnrollmentsMode.OPEN, "open")
@bp.put("/<int:examination_schedule_id>/close-enrollments/")
@bp.output(MessageSchema)
def close_enrollments(examination_schedule_id: int) -> dict:
return set_enrollments_mode(examination_schedule_id, EnrollmentsMode.CLOSE, "close")
@bp.post("/<int:examination_schedule_id>/download/")
def download_examination_schedule(examination_schedule_id: int) -> Response:
examination_schedule = (
db.session.query(ExaminationSchedule)
.filter(ExaminationSchedule.id == examination_schedule_id)
.first()
)
if examination_schedule is None:
abort(404, "Examination schedule doesn't exist!")
distinct_dates = (
db.session.query(db.func.Date(TermOfDefence.start_date)).distinct().all()
)
nested_term_of_defences = []
for d in distinct_dates:
date_tmp = datetime.datetime.strptime(d[0], "%Y-%m-%d").date()
term_of_defences = (
db.session.query(TermOfDefence)
.join(Group, isouter=True)
.join(ProjectSupervisor, isouter=True)
.filter(TermOfDefence.examination_schedule_id == examination_schedule_id)
.filter(TermOfDefence.group_id.isnot(None))
.filter(db.func.Date(TermOfDefence.start_date) == date_tmp)
.all()
)
if len(term_of_defences) > 0:
nested_term_of_defences.append(term_of_defences)
# print(nested_term_of_defences)
base_dir = current_app.config.get("BASE_DIR")
pdf = generate_examination_schedule_pdf_file(
examination_schedule.title, nested_term_of_defences, base_dir
)
title = examination_schedule.title.replace("-", "_").split()
filename = "_".join(title)
response = make_response(pdf)
response.headers["Content-Type"] = "application/pdf"
response.headers["Content-Disposition"] = "attachment; filename=%s.pdf" % filename
return response