import datetime from itertools import islice from apiflask import APIBlueprint from flask import abort, current_app from sqlalchemy import or_, and_ from flask_sqlalchemy import get_debug_queries from ..schemas import MessageSchema, TermOfDefenceSchema, TermOfDefenceListSchema, \ TemporaryAvailabilityListSchema, AssignedGroupToTermOfDefenceListSchema, GroupIdSchema from ...examination_schedule.models import ExaminationSchedule, TermOfDefence, TemporaryAvailability from ...students.models import YearGroup from ...project_supervisor.models import ProjectSupervisor from ...dependencies import db from ..utils import generate_range_dates from ..query.enrollments import get_term_of_defence_by_id_and_examination_schedule_id, set_new_group_to_term_of_defence, \ get_examination_schedule_by_id bp = APIBlueprint("enrollments", __name__, url_prefix="/enrollments") @bp.post('//generate') @bp.output(MessageSchema) def generate_term_of_defence_for_this_examination_schedule(examination_schedule_id: int) -> dict: ex = get_examination_schedule_by_id(examination_schedule_id) # print(ex.start_date.date()) # print(ex.end_date.date()) dates = generate_range_dates(ex.start_date, ex.end_date, ex.duration_time) # print(dates) limit = current_app.config.get('LIMIT_MEMBERS_PER_COMMITTEE', 3) # in future optimize sql query # project_supervisor cache in memory (flyweight) # while True: # sliced_dates = islice(dates, 1) # list_of_dates = list(sliced_dates) # # if len(list_of_dates) == 0: # break for d in dates: e = d + datetime.timedelta(minutes=ex.duration_time) # print(f"{d} - {e}") temporary_availabilities = TemporaryAvailability.query.filter( TemporaryAvailability.examination_schedule_id == examination_schedule_id). \ filter(TemporaryAvailability.start_date <= d, TemporaryAvailability.end_date >= e).all() if len(temporary_availabilities) >= limit: # create term of defence # check term of defence exist in this date range td = TermOfDefence.query.filter(or_(and_(TermOfDefence.start_date <= d, TermOfDefence.end_date >= e), and_(TermOfDefence.start_date > d, TermOfDefence.end_date >= e, TermOfDefence.start_date < e), and_(TermOfDefence.start_date <= d, TermOfDefence.end_date > d, TermOfDefence.end_date < e) )).first() if td is None: term_of_defence = TermOfDefence(start_date=d, end_date=e, examination_schedule_id=examination_schedule_id) db.session.add(term_of_defence) db.session.commit() project_supervisors_ids = [] for i in range(limit): ta = temporary_availabilities[i] project_supervisors_ids.append(ta.project_supervisor_id) project_supervisors = ProjectSupervisor.query.filter( or_(*[ProjectSupervisor.id == idx for idx in project_supervisors_ids])).all() for p in project_supervisors: term_of_defence.members_of_committee.append(p) db.session.commit() # print(temporary_availabilities) return {"message": "Term of defences was generated!"} @bp.post('//add') @bp.input(TermOfDefenceSchema) @bp.output(MessageSchema) def create_term_of_defence(examination_schedule_id: int, data: dict) -> dict: if not data: abort(400, "You have passed empty data!") ex = get_examination_schedule_by_id(examination_schedule_id) yg_id = ex.year_group_id project_supervisors_ids = data.pop('project_supervisors') project_supervisors = ProjectSupervisor.query.filter( or_(*[ProjectSupervisor.id == i for i in project_supervisors_ids])).filter(YearGroup.id == yg_id).all() if len(project_supervisors) != len(project_supervisors_ids): abort(404, "Project Supervisors didn't exist!") start_date = data['start_date'] end_date = data['end_date'] if not (ex.start_date.timestamp() <= start_date.timestamp() and ex.end_date.timestamp() >= end_date.timestamp()): abort(400, "Invalid date range!") if end_date <= start_date: abort(400, "End date must be greater than start date!") delta_time = end_date - start_date delta_time_in_minutes = delta_time.total_seconds() / 60 if delta_time_in_minutes != ex.duration_time: abort(400, "Invalid duration time!") td = TermOfDefence.query.filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \ filter( or_(and_(TermOfDefence.start_date >= start_date, TermOfDefence.start_date < end_date, TermOfDefence.end_date >= end_date), and_(TermOfDefence.start_date <= start_date, TermOfDefence.end_date > start_date, TermOfDefence.end_date <= end_date))).first() if td is not None: abort(400, "This term of defence is taken! You choose other date!") td = TermOfDefence(**data, examination_schedule_id=examination_schedule_id) db.session.add(td) db.session.commit() for p in project_supervisors: td.members_of_committee.append(p) db.session.commit() return {"message": "Term of defence was created!"} @bp.put('//update//') @bp.input(TermOfDefenceSchema) @bp.output(MessageSchema) def update_term_of_defence(examination_schedule_id: int, term_of_defence_id: int, data: dict) -> dict: if not data: abort(400, "You have passed empty data!") td_query = TermOfDefence.query.filter(TermOfDefence.id == term_of_defence_id, TermOfDefence.examination_schedule_id == examination_schedule_id) td = td_query.first() if td is None: abort(404, "Not found term of defence!") ex = td.examination_schedule yg_id = ex.year_group_id project_supervisors_ids = data.pop('project_supervisors') project_supervisors = ProjectSupervisor.query.filter( or_(*[ProjectSupervisor.id == i for i in project_supervisors_ids])).filter(YearGroup.id == yg_id).all() if len(project_supervisors) != len(project_supervisors_ids): abort(404, "Project Supervisors didn't exist!") start_date = data['start_date'] end_date = data['end_date'] if not (ex.start_date.timestamp() <= start_date.timestamp() and ex.end_date.timestamp() >= end_date.timestamp()): abort(400, "Invalid date range!") if end_date <= start_date: abort(400, "End date must be greater than start date!") delta_time = end_date - start_date delta_time_in_minutes = delta_time.total_seconds() / 60 if delta_time_in_minutes != ex.duration_time: abort(400, "Invalid duration time!") term_of_defence = TermOfDefence.query.filter(TermOfDefence.id != term_of_defence_id, TermOfDefence.examination_schedule_id == examination_schedule_id). \ filter( or_(and_(TermOfDefence.start_date >= start_date, TermOfDefence.start_date < end_date, TermOfDefence.end_date >= end_date), and_(TermOfDefence.start_date <= start_date, TermOfDefence.end_date > start_date, TermOfDefence.end_date <= end_date))).first() if term_of_defence is not None: abort(400, "This term of defence is taken! You choose other date!") td_query.update(data) td.members_of_committee = [] db.session.commit() for p in project_supervisors: td.members_of_committee.append(p) db.session.commit() return {"message": "Term of defence was updated!"} @bp.delete('//delete//') @bp.output(MessageSchema) def delete_term_of_defence(examination_schedule_id: int, term_of_defence_id: int) -> dict: td = get_term_of_defence_by_id_and_examination_schedule_id(examination_schedule_id, term_of_defence_id) db.session.delete(td) db.session.commit() return {"message": "Term of defence was deleted!"} @bp.get('//term-of-defences/') @bp.output(TermOfDefenceListSchema) def list_of_term_of_defences(examination_schedule_id: int) -> dict: get_examination_schedule_by_id(examination_schedule_id) td = TermOfDefence.query. \ filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \ join(TermOfDefence.members_of_committee, isouter=True). \ all() return {"term_of_defences": td} @bp.get('//temporary-availabilities/') @bp.output(TemporaryAvailabilityListSchema) def list_of_temporary_availability(examination_schedule_id: int) -> dict: get_examination_schedule_by_id(examination_schedule_id) td = TemporaryAvailability.query. \ filter(TemporaryAvailability.examination_schedule_id == examination_schedule_id). \ join(TemporaryAvailability.project_supervisor). \ all() return {"temporary_availabilities": td} @bp.get('//assigned-group-to-term-of-defences/') @bp.output(AssignedGroupToTermOfDefenceListSchema) def list_of_assigned_group_to_term_of_defences(examination_schedule_id: int) -> dict: get_examination_schedule_by_id(examination_schedule_id) td = TermOfDefence.query. \ join(TermOfDefence.members_of_committee, isouter=True). \ join(TermOfDefence.group). \ filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \ filter(TermOfDefence.group_id.isnot(None)). \ all() return {"term_of_defences": td} @bp.post('//term-of-defence//group/') @bp.input(GroupIdSchema) @bp.output(MessageSchema) def add_group_to_term_of_defence(examination_schedule_id: int, term_of_defence_id: int, data: dict) -> dict: set_new_group_to_term_of_defence(examination_schedule_id, term_of_defence_id, data.get("group_id")) db.session.commit() return {"message": "Group was added to term of defences!"} @bp.delete('//term-of-defence//group/') @bp.output(MessageSchema) def delete_group_to_term_of_defence(examination_schedule_id: int, term_of_defence_id: int) -> dict: td = get_term_of_defence_by_id_and_examination_schedule_id(examination_schedule_id, term_of_defence_id) td.group_id = None db.session.commit() return {"message": "Group was deleted from term of defences!"} @bp.put('//term-of-defence//group/') @bp.input(GroupIdSchema) @bp.output(MessageSchema) def update_group_for_term_of_defence(examination_schedule_id: int, term_of_defence_id: int, data: dict) -> dict: set_new_group_to_term_of_defence(examination_schedule_id, term_of_defence_id, data.get("group_id")) db.session.commit() return {"message": "Group for term of defence was updated!"}