import datetime from apiflask import APIBlueprint from flask import abort, current_app from sqlalchemy import or_, and_ from flask_sqlalchemy import get_debug_queries from ..schemas import MessageSchema, TermOfDefenceSchema, TermOfDefenceListSchema, \ TemporaryAvailabilityListSchema, AssignedGroupToTermOfDefenceListSchema, GroupIdSchema from ...examination_schedule.models import TermOfDefence, TemporaryAvailability, committee from ...students.models import YearGroup from ...project_supervisor.models import ProjectSupervisor from ...dependencies import db from ..utils import generate_range_dates from ..query.enrollments import get_term_of_defence_by_id_and_examination_schedule_id, set_new_group_to_term_of_defence, \ get_examination_schedule_by_id bp = APIBlueprint("enrollments", __name__, url_prefix="/enrollments") @bp.post('//generate') @bp.output(MessageSchema) def generate_term_of_defence_for_this_examination_schedule(examination_schedule_id: int) -> dict: ex = get_examination_schedule_by_id(examination_schedule_id) limit = current_app.config.get('LIMIT_MEMBERS_PER_COMMITTEE', 3) term_of_defences = TermOfDefence.query. \ filter(TermOfDefence.examination_schedule_id == examination_schedule_id).first() if term_of_defences is not None: abort(400, "First you have to delete all term of defences for this examination schedule to generate them again!") temporary_availabilities = TemporaryAvailability.query. \ filter(TemporaryAvailability.examination_schedule_id == examination_schedule_id). \ join(TemporaryAvailability.project_supervisor). \ all() if len(temporary_availabilities) == 0: abort(404, "Not found temporary availabilities for project supervisors") dates = generate_range_dates(ex.start_date, ex.end_date, ex.duration_time) term_of_defences = [] for d in dates: e = d + datetime.timedelta(minutes=ex.duration_time) t = list(filter(lambda ta: ta.start_date <= d and ta.end_date >= e, temporary_availabilities)) if len(t) >= limit: projects_supervisors = [t[i].project_supervisor for i in range(limit)] term_of_defence = TermOfDefence(start_date=d, end_date=e, examination_schedule_id=examination_schedule_id) term_of_defence.members_of_committee = projects_supervisors term_of_defences.append(term_of_defence) db.session.add_all(term_of_defences) db.session.commit() return {"message": "Term of defences was generated!"} @bp.post('//clear-term-of-defences/') @bp.output(MessageSchema) def clear_generated_term_of_defences(examination_schedule_id: int) -> dict: get_examination_schedule_by_id(examination_schedule_id) # count_defences = db.func.count(ProjectSupervisor.id) # td = db.session.query(TermOfDefence, count_defences). \ # join(ProjectSupervisor, isouter=True). \ # filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \ # group_by(TermOfDefence.id).having(count_defences > 0).first() # print(td) # if td is not None: # abort(400, "First you remove group assigned to term of defence to clear term of defences!") td = TermOfDefence.query. \ filter(TermOfDefence.examination_schedule_id == examination_schedule_id).all() while True: for t in td[0:10]: db.session.delete(t) db.session.commit() if len(td) == 0: break td[0:10] = [] # print(len(get_debug_queries())) return {"message": "Term of defences was deleted!"} @bp.post('//add') @bp.input(TermOfDefenceSchema) @bp.output(MessageSchema) def create_term_of_defence(examination_schedule_id: int, data: dict) -> dict: if not data: abort(400, "You have passed empty data!") ex = get_examination_schedule_by_id(examination_schedule_id) yg_id = ex.year_group_id project_supervisors_ids = data.pop('project_supervisors') project_supervisors = ProjectSupervisor.query.filter( or_(*[ProjectSupervisor.id == i for i in project_supervisors_ids])).filter(YearGroup.id == yg_id).all() if len(project_supervisors) != len(project_supervisors_ids): abort(404, "Project Supervisors didn't exist!") start_date = data['start_date'] end_date = data['end_date'] if not (ex.start_date.timestamp() <= start_date.timestamp() and ex.end_date.timestamp() >= end_date.timestamp()): abort(400, "Invalid date range!") if end_date <= start_date: abort(400, "End date must be greater than start date!") delta_time = end_date - start_date delta_time_in_minutes = delta_time.total_seconds() / 60 if delta_time_in_minutes != ex.duration_time: abort(400, "Invalid duration time!") td = TermOfDefence.query.filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \ filter( or_(and_(TermOfDefence.start_date >= start_date, TermOfDefence.start_date < end_date, TermOfDefence.end_date >= end_date), and_(TermOfDefence.start_date <= start_date, TermOfDefence.end_date > start_date, TermOfDefence.end_date <= end_date))).first() if td is not None: abort(400, "This term of defence is taken! You choose other date!") td = TermOfDefence(**data, examination_schedule_id=examination_schedule_id) db.session.add(td) db.session.commit() for p in project_supervisors: td.members_of_committee.append(p) db.session.commit() return {"message": "Term of defence was created!"} @bp.post('//add-term-of-defences/') @bp.input(TermOfDefenceSchema) @bp.output(MessageSchema) def create_many_term_of_defences(examination_schedule_id: int, data: dict) -> dict: if not data: abort(400, "You have passed empty data!") ex = get_examination_schedule_by_id(examination_schedule_id) yg_id = ex.year_group_id project_supervisors_ids = data.pop('project_supervisors') project_supervisors = ProjectSupervisor.query.filter( or_(*[ProjectSupervisor.id == i for i in project_supervisors_ids])).filter(YearGroup.id == yg_id).all() if len(project_supervisors) != len(project_supervisors_ids): abort(404, "Project Supervisors didn't exist!") start_date = data['start_date'] end_date = data['end_date'] if not (ex.start_date.timestamp() <= start_date.timestamp() and ex.end_date.timestamp() >= end_date.timestamp()): abort(400, "Invalid date range!") if end_date <= start_date: abort(400, "End date must be greater than start date!") delta_time = end_date - start_date delta_time_in_minutes = delta_time.total_seconds() / 60 if delta_time_in_minutes % ex.duration_time != 0: abort(400, "Invalid duration time!") td = TermOfDefence.query.filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \ filter( or_(and_(TermOfDefence.start_date >= start_date, TermOfDefence.start_date < end_date, TermOfDefence.end_date >= end_date), and_(TermOfDefence.start_date <= start_date, TermOfDefence.end_date > start_date, TermOfDefence.end_date <= end_date))).first() if td is not None: abort(400, "This term of defence is taken! You choose other date!") # create many here dates = generate_range_dates(start_date, end_date, ex.duration_time) for start_date in dates: end_date = start_date + datetime.timedelta(minutes=ex.duration_time) td = TermOfDefence(start_date=start_date, end_date=end_date, examination_schedule_id=examination_schedule_id) td.members_of_committee = project_supervisors db.session.add(td) db.session.commit() return {"message": "Term of defences was created!"} @bp.put('//update//') @bp.input(TermOfDefenceSchema) @bp.output(MessageSchema) def update_term_of_defence(examination_schedule_id: int, term_of_defence_id: int, data: dict) -> dict: if not data: abort(400, "You have passed empty data!") td_query = TermOfDefence.query.filter(TermOfDefence.id == term_of_defence_id, TermOfDefence.examination_schedule_id == examination_schedule_id) td = td_query.first() if td is None: abort(404, "Not found term of defence!") ex = td.examination_schedule yg_id = ex.year_group_id project_supervisors_ids = data.pop('project_supervisors') project_supervisors = ProjectSupervisor.query.filter( or_(*[ProjectSupervisor.id == i for i in project_supervisors_ids])).filter(YearGroup.id == yg_id).all() if len(project_supervisors) != len(project_supervisors_ids): abort(404, "Project Supervisors didn't exist!") start_date = data['start_date'] end_date = data['end_date'] if not (ex.start_date.timestamp() <= start_date.timestamp() and ex.end_date.timestamp() >= end_date.timestamp()): abort(400, "Invalid date range!") if end_date <= start_date: abort(400, "End date must be greater than start date!") delta_time = end_date - start_date delta_time_in_minutes = delta_time.total_seconds() / 60 if delta_time_in_minutes != ex.duration_time: abort(400, "Invalid duration time!") term_of_defence = TermOfDefence.query.filter(TermOfDefence.id != term_of_defence_id, TermOfDefence.examination_schedule_id == examination_schedule_id). \ filter( or_(and_(TermOfDefence.start_date >= start_date, TermOfDefence.start_date < end_date, TermOfDefence.end_date >= end_date), and_(TermOfDefence.start_date <= start_date, TermOfDefence.end_date > start_date, TermOfDefence.end_date <= end_date))).first() if term_of_defence is not None: abort(400, "This term of defence is taken! You choose other date!") td_query.update(data) td.members_of_committee = [] db.session.commit() for p in project_supervisors: td.members_of_committee.append(p) db.session.commit() return {"message": "Term of defence was updated!"} @bp.delete('//delete//') @bp.output(MessageSchema) def delete_term_of_defence(examination_schedule_id: int, term_of_defence_id: int) -> dict: td = get_term_of_defence_by_id_and_examination_schedule_id(examination_schedule_id, term_of_defence_id) db.session.delete(td) db.session.commit() return {"message": "Term of defence was deleted!"} @bp.get('//term-of-defences/') @bp.output(AssignedGroupToTermOfDefenceListSchema) def list_of_term_of_defences(examination_schedule_id: int) -> dict: get_examination_schedule_by_id(examination_schedule_id) td = TermOfDefence.query. \ join(TermOfDefence.members_of_committee, isouter=True). \ filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \ all() return {"term_of_defences": td} @bp.get('//temporary-availabilities/') @bp.output(TemporaryAvailabilityListSchema) def list_of_temporary_availability(examination_schedule_id: int) -> dict: get_examination_schedule_by_id(examination_schedule_id) td = TemporaryAvailability.query. \ filter(TemporaryAvailability.examination_schedule_id == examination_schedule_id). \ join(TemporaryAvailability.project_supervisor). \ all() return {"temporary_availabilities": td} @bp.get('//assigned-group-to-term-of-defences/') @bp.output(AssignedGroupToTermOfDefenceListSchema) def list_of_assigned_group_to_term_of_defences(examination_schedule_id: int) -> dict: get_examination_schedule_by_id(examination_schedule_id) td = TermOfDefence.query. \ join(TermOfDefence.members_of_committee, isouter=True). \ join(TermOfDefence.group). \ filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \ filter(TermOfDefence.group_id.isnot(None)). \ all() return {"term_of_defences": td} @bp.post('//term-of-defence//group/') @bp.input(GroupIdSchema) @bp.output(MessageSchema) def add_group_to_term_of_defence(examination_schedule_id: int, term_of_defence_id: int, data: dict) -> dict: set_new_group_to_term_of_defence(examination_schedule_id, term_of_defence_id, data.get("group_id")) db.session.commit() return {"message": "Group was added to term of defences!"} @bp.delete('//term-of-defence//group/') @bp.output(MessageSchema) def delete_group_to_term_of_defence(examination_schedule_id: int, term_of_defence_id: int) -> dict: td = get_term_of_defence_by_id_and_examination_schedule_id(examination_schedule_id, term_of_defence_id) td.group_id = None db.session.commit() return {"message": "Group was deleted from term of defences!"} @bp.put('//term-of-defence//group/') @bp.input(GroupIdSchema) @bp.output(MessageSchema) def update_group_for_term_of_defence(examination_schedule_id: int, term_of_defence_id: int, data: dict) -> dict: set_new_group_to_term_of_defence(examination_schedule_id, term_of_defence_id, data.get("group_id")) db.session.commit() return {"message": "Group for term of defence was updated!"}