import datetime from apiflask import APIBlueprint from flask import abort, Response, make_response, current_app from ...base.utils import paginate_models from ...base.mode import ModeGroups from ...dependencies import db from ...examination_schedule.models import ExaminationSchedule, TermOfDefence from ...students.models import Group, YearGroup from ...project_supervisor.models import ProjectSupervisor from ..schemas import ExaminationScheduleSchema, ExaminationScheduleUpdateSchema, MessageSchema, \ ExaminationSchedulesQuerySchema, ExaminationSchedulesPaginationSchema from ..utils import generate_examination_schedule_pdf_file bp = APIBlueprint("examination_schedule", __name__, url_prefix="/examination_schedule") @bp.get('//') @bp.input(ExaminationSchedulesQuerySchema, location='query') @bp.output(ExaminationSchedulesPaginationSchema) def list_examination_schedule(year_group_id: int, query: dict) -> dict: page = query.get('page') per_page = query.get('per_page') es_query = ExaminationSchedule.query.filter(ExaminationSchedule.year_group_id == year_group_id) data = paginate_models(page, es_query, per_page) return {'examination_schedules': data['items'], 'max_pages': data['max_pages']} @bp.post('//') @bp.input(ExaminationScheduleSchema) @bp.output(MessageSchema) def create_examination_schedule(year_group_id: int, data: dict) -> dict: yg = YearGroup.query.filter(YearGroup.id == year_group_id).first() if yg is None: abort(404, "Year group doesn't exist!") if data['start_date'] > data['end_date']: abort(400, "Invalid data! End date must be greater than start date!") duration_time = None if yg.mode == ModeGroups.NON_STATIONARY.value: duration_time = 20 elif yg.mode in [ModeGroups.STATIONARY.value, ModeGroups.ENGLISH_SPEAKING_STATIONARY.value]: duration_time = 30 if duration_time is None: abort(400, "Invalid mode of year group!") examination_schedule = ExaminationSchedule(**data, year_group_id=year_group_id, duration_time=duration_time) db.session.add(examination_schedule) db.session.commit() return {"message": "Examination schedule was created!"} @bp.put('//') @bp.input(ExaminationScheduleSchema) @bp.output(MessageSchema) def update_title_examination_schedule(id: int, data: dict) -> dict: examination_schedule_query = db.session.query(ExaminationSchedule).filter(ExaminationSchedule.id == id) examination_schedule = examination_schedule_query.first() if examination_schedule is None: abort(404, "Examination schedule doesn't exist!") examination_schedule_query.update(data) db.session.commit() return {"message": "Examination schedule was updated!"} @bp.delete('//') @bp.output(MessageSchema) def delete_examination_schedule(id: int) -> dict: examination_schedule = db.session.query(ExaminationSchedule).filter(ExaminationSchedule.id == id).first() if examination_schedule is None: abort(404, "Examination schedule doesn't exist!") db.session.delete(examination_schedule) db.session.commit() return {"message": "Examination schedule was deleted!"} @bp.put('//date') @bp.input(ExaminationScheduleUpdateSchema) @bp.output(MessageSchema) def set_date_of_examination_schedule(id: int, data: dict) -> dict: examination_schedule_query = db.session.query(ExaminationSchedule).filter(ExaminationSchedule.id == id) examination_schedule = examination_schedule_query.first() if examination_schedule is None: abort(404, "Examination schedule doesn't exist!") if data['start_date_for_enrollment_students'] > data['end_date_for_enrollment_students']: abort(400, "Invalid data! End date must be greater than start date!") examination_schedule_query.update(data) db.session.commit() return {"message": "You set date of examination schedule!"} @bp.post('//download/') def download_examination_schedule(examination_schedule_id: int) -> Response: examination_schedule = db.session.query(ExaminationSchedule). \ filter(ExaminationSchedule.id == examination_schedule_id).first() if examination_schedule is None: abort(404, "Examination schedule doesn't exist!") distinct_dates = db.session.query(db.func.Date(TermOfDefence.start_date)).distinct().all() nested_term_of_defences = [] for d in distinct_dates: date_tmp = datetime.datetime.strptime(d[0], "%Y-%m-%d").date() term_of_defences = db.session.query(TermOfDefence). \ join(Group, isouter=True).join(ProjectSupervisor, isouter=True). \ filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \ filter(TermOfDefence.group_id.isnot(None)). \ filter(db.func.Date(TermOfDefence.start_date) == date_tmp). \ all() if len(term_of_defences) > 0: nested_term_of_defences.append(term_of_defences) # print(nested_term_of_defences) base_dir = current_app.config.get('BASE_DIR') pdf = generate_examination_schedule_pdf_file(examination_schedule.title, nested_term_of_defences, base_dir) title = examination_schedule.title.replace("-", "_").split() filename = "_".join(title) response = make_response(pdf) response.headers['Content-Type'] = 'application/pdf' response.headers['Content-Disposition'] = 'attachment; filename=%s.pdf' % filename return response