system-pri/backend/app/coordinator/routes/enrollments.py

319 lines
14 KiB
Python

import datetime
from apiflask import APIBlueprint
from flask import abort, current_app
from sqlalchemy import or_, and_
from flask_sqlalchemy import get_debug_queries
from ..schemas import MessageSchema, TermOfDefenceSchema, TermOfDefenceListSchema, \
TemporaryAvailabilityListSchema, AssignedGroupToTermOfDefenceListSchema, GroupIdSchema
from ...examination_schedule.models import TermOfDefence, TemporaryAvailability, committee
from ...students.models import YearGroup
from ...project_supervisor.models import ProjectSupervisor
from ...dependencies import db
from ..utils import generate_range_dates
from ..query.enrollments import get_term_of_defence_by_id_and_examination_schedule_id, set_new_group_to_term_of_defence, \
get_examination_schedule_by_id
bp = APIBlueprint("enrollments", __name__, url_prefix="/enrollments")
@bp.post('/<int:examination_schedule_id>/generate')
@bp.output(MessageSchema)
def generate_term_of_defence_for_this_examination_schedule(examination_schedule_id: int) -> dict:
ex = get_examination_schedule_by_id(examination_schedule_id)
limit = current_app.config.get('LIMIT_MEMBERS_PER_COMMITTEE', 3)
term_of_defences = TermOfDefence.query. \
filter(TermOfDefence.examination_schedule_id == examination_schedule_id).first()
if term_of_defences is not None:
abort(400,
"First you have to delete all term of defences for this examination schedule to generate them again!")
temporary_availabilities = TemporaryAvailability.query. \
filter(TemporaryAvailability.examination_schedule_id == examination_schedule_id). \
join(TemporaryAvailability.project_supervisor). \
all()
if len(temporary_availabilities) == 0:
abort(404, "Not found temporary availabilities for project supervisors")
dates = generate_range_dates(ex.start_date, ex.end_date, ex.duration_time)
term_of_defences = []
for d in dates:
e = d + datetime.timedelta(minutes=ex.duration_time)
t = list(filter(lambda ta: ta.start_date <= d and ta.end_date >= e, temporary_availabilities))
if len(t) >= limit:
projects_supervisors = [t[i].project_supervisor for i in range(limit)]
term_of_defence = TermOfDefence(start_date=d, end_date=e, examination_schedule_id=examination_schedule_id)
term_of_defence.members_of_committee = projects_supervisors
term_of_defences.append(term_of_defence)
db.session.add_all(term_of_defences)
db.session.commit()
return {"message": "Term of defences was generated!"}
@bp.post('/<int:examination_schedule_id>/clear-term-of-defences/')
@bp.output(MessageSchema)
def clear_generated_term_of_defences(examination_schedule_id: int) -> dict:
get_examination_schedule_by_id(examination_schedule_id)
# count_defences = db.func.count(ProjectSupervisor.id)
# td = db.session.query(TermOfDefence, count_defences). \
# join(ProjectSupervisor, isouter=True). \
# filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \
# group_by(TermOfDefence.id).having(count_defences > 0).first()
# print(td)
# if td is not None:
# abort(400, "First you remove group assigned to term of defence to clear term of defences!")
td = TermOfDefence.query. \
filter(TermOfDefence.examination_schedule_id == examination_schedule_id).all()
while True:
for t in td[0:10]:
db.session.delete(t)
db.session.commit()
if len(td) == 0:
break
td[0:10] = []
# print(len(get_debug_queries()))
return {"message": "Term of defences was deleted!"}
@bp.post('/<int:examination_schedule_id>/add')
@bp.input(TermOfDefenceSchema)
@bp.output(MessageSchema)
def create_term_of_defence(examination_schedule_id: int, data: dict) -> dict:
if not data:
abort(400, "You have passed empty data!")
ex = get_examination_schedule_by_id(examination_schedule_id)
yg_id = ex.year_group_id
project_supervisors_ids = data.pop('project_supervisors')
project_supervisors = ProjectSupervisor.query.filter(
or_(*[ProjectSupervisor.id == i for i in project_supervisors_ids])).filter(YearGroup.id == yg_id).all()
if len(project_supervisors) != len(project_supervisors_ids):
abort(404, "Project Supervisors didn't exist!")
start_date = data['start_date']
end_date = data['end_date']
if not (ex.start_date.timestamp() <= start_date.timestamp() and ex.end_date.timestamp() >= end_date.timestamp()):
abort(400, "Invalid date range!")
if end_date <= start_date:
abort(400, "End date must be greater than start date!")
delta_time = end_date - start_date
delta_time_in_minutes = delta_time.total_seconds() / 60
if delta_time_in_minutes != ex.duration_time:
abort(400, "Invalid duration time!")
td = TermOfDefence.query.filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \
filter(
or_(and_(TermOfDefence.start_date >= start_date,
TermOfDefence.start_date < end_date,
TermOfDefence.end_date >= end_date),
and_(TermOfDefence.start_date <= start_date,
TermOfDefence.end_date > start_date,
TermOfDefence.end_date <= end_date))).first()
if td is not None:
abort(400, "This term of defence is taken! You choose other date!")
td = TermOfDefence(**data, examination_schedule_id=examination_schedule_id)
db.session.add(td)
db.session.commit()
for p in project_supervisors:
td.members_of_committee.append(p)
db.session.commit()
return {"message": "Term of defence was created!"}
@bp.post('/<int:examination_schedule_id>/add-term-of-defences/')
@bp.input(TermOfDefenceSchema)
@bp.output(MessageSchema)
def create_many_term_of_defences(examination_schedule_id: int, data: dict) -> dict:
if not data:
abort(400, "You have passed empty data!")
ex = get_examination_schedule_by_id(examination_schedule_id)
yg_id = ex.year_group_id
project_supervisors_ids = data.pop('project_supervisors')
project_supervisors = ProjectSupervisor.query.filter(
or_(*[ProjectSupervisor.id == i for i in project_supervisors_ids])).filter(YearGroup.id == yg_id).all()
if len(project_supervisors) != len(project_supervisors_ids):
abort(404, "Project Supervisors didn't exist!")
start_date = data['start_date']
end_date = data['end_date']
if not (ex.start_date.timestamp() <= start_date.timestamp() and ex.end_date.timestamp() >= end_date.timestamp()):
abort(400, "Invalid date range!")
if end_date <= start_date:
abort(400, "End date must be greater than start date!")
delta_time = end_date - start_date
delta_time_in_minutes = delta_time.total_seconds() / 60
if delta_time_in_minutes % ex.duration_time != 0:
abort(400, "Invalid duration time!")
td = TermOfDefence.query.filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \
filter(
or_(and_(TermOfDefence.start_date >= start_date,
TermOfDefence.start_date < end_date,
TermOfDefence.end_date >= end_date),
and_(TermOfDefence.start_date <= start_date,
TermOfDefence.end_date > start_date,
TermOfDefence.end_date <= end_date))).first()
if td is not None:
abort(400, "This term of defence is taken! You choose other date!")
# create many here
dates = generate_range_dates(start_date, end_date, ex.duration_time)
for start_date in dates:
end_date = start_date + datetime.timedelta(minutes=ex.duration_time)
td = TermOfDefence(start_date=start_date, end_date=end_date, examination_schedule_id=examination_schedule_id)
td.members_of_committee = project_supervisors
db.session.add(td)
db.session.commit()
return {"message": "Term of defences was created!"}
@bp.put('/<int:examination_schedule_id>/update/<int:term_of_defence_id>/')
@bp.input(TermOfDefenceSchema)
@bp.output(MessageSchema)
def update_term_of_defence(examination_schedule_id: int, term_of_defence_id: int, data: dict) -> dict:
if not data:
abort(400, "You have passed empty data!")
td_query = TermOfDefence.query.filter(TermOfDefence.id == term_of_defence_id,
TermOfDefence.examination_schedule_id == examination_schedule_id)
td = td_query.first()
if td is None:
abort(404, "Not found term of defence!")
ex = td.examination_schedule
yg_id = ex.year_group_id
project_supervisors_ids = data.pop('project_supervisors')
project_supervisors = ProjectSupervisor.query.filter(
or_(*[ProjectSupervisor.id == i for i in project_supervisors_ids])).filter(YearGroup.id == yg_id).all()
if len(project_supervisors) != len(project_supervisors_ids):
abort(404, "Project Supervisors didn't exist!")
start_date = data['start_date']
end_date = data['end_date']
if not (ex.start_date.timestamp() <= start_date.timestamp() and ex.end_date.timestamp() >= end_date.timestamp()):
abort(400, "Invalid date range!")
if end_date <= start_date:
abort(400, "End date must be greater than start date!")
delta_time = end_date - start_date
delta_time_in_minutes = delta_time.total_seconds() / 60
if delta_time_in_minutes != ex.duration_time:
abort(400, "Invalid duration time!")
term_of_defence = TermOfDefence.query.filter(TermOfDefence.id != term_of_defence_id,
TermOfDefence.examination_schedule_id == examination_schedule_id). \
filter(
or_(and_(TermOfDefence.start_date >= start_date,
TermOfDefence.start_date < end_date,
TermOfDefence.end_date >= end_date),
and_(TermOfDefence.start_date <= start_date,
TermOfDefence.end_date > start_date,
TermOfDefence.end_date <= end_date))).first()
if term_of_defence is not None:
abort(400, "This term of defence is taken! You choose other date!")
td_query.update(data)
td.members_of_committee = []
db.session.commit()
for p in project_supervisors:
td.members_of_committee.append(p)
db.session.commit()
return {"message": "Term of defence was updated!"}
@bp.delete('/<int:examination_schedule_id>/delete/<int:term_of_defence_id>/')
@bp.output(MessageSchema)
def delete_term_of_defence(examination_schedule_id: int, term_of_defence_id: int) -> dict:
td = get_term_of_defence_by_id_and_examination_schedule_id(examination_schedule_id, term_of_defence_id)
db.session.delete(td)
db.session.commit()
return {"message": "Term of defence was deleted!"}
@bp.get('/<int:examination_schedule_id>/term-of-defences/')
@bp.output(AssignedGroupToTermOfDefenceListSchema)
def list_of_term_of_defences(examination_schedule_id: int) -> dict:
get_examination_schedule_by_id(examination_schedule_id)
td = TermOfDefence.query. \
join(TermOfDefence.members_of_committee, isouter=True). \
filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \
all()
return {"term_of_defences": td}
@bp.get('/<int:examination_schedule_id>/temporary-availabilities/')
@bp.output(TemporaryAvailabilityListSchema)
def list_of_temporary_availability(examination_schedule_id: int) -> dict:
get_examination_schedule_by_id(examination_schedule_id)
td = TemporaryAvailability.query. \
filter(TemporaryAvailability.examination_schedule_id == examination_schedule_id). \
join(TemporaryAvailability.project_supervisor). \
all()
return {"temporary_availabilities": td}
@bp.get('/<int:examination_schedule_id>/assigned-group-to-term-of-defences/')
@bp.output(AssignedGroupToTermOfDefenceListSchema)
def list_of_assigned_group_to_term_of_defences(examination_schedule_id: int) -> dict:
get_examination_schedule_by_id(examination_schedule_id)
td = TermOfDefence.query. \
join(TermOfDefence.members_of_committee, isouter=True). \
join(TermOfDefence.group). \
filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \
filter(TermOfDefence.group_id.isnot(None)). \
all()
return {"term_of_defences": td}
@bp.post('/<int:examination_schedule_id>/term-of-defence/<int:term_of_defence_id>/group/')
@bp.input(GroupIdSchema)
@bp.output(MessageSchema)
def add_group_to_term_of_defence(examination_schedule_id: int, term_of_defence_id: int, data: dict) -> dict:
set_new_group_to_term_of_defence(examination_schedule_id, term_of_defence_id, data.get("group_id"))
db.session.commit()
return {"message": "Group was added to term of defences!"}
@bp.delete('/<int:examination_schedule_id>/term-of-defence/<int:term_of_defence_id>/group/')
@bp.output(MessageSchema)
def delete_group_to_term_of_defence(examination_schedule_id: int, term_of_defence_id: int) -> dict:
td = get_term_of_defence_by_id_and_examination_schedule_id(examination_schedule_id, term_of_defence_id)
td.group_id = None
db.session.commit()
return {"message": "Group was deleted from term of defences!"}
@bp.put('/<int:examination_schedule_id>/term-of-defence/<int:term_of_defence_id>/group/')
@bp.input(GroupIdSchema)
@bp.output(MessageSchema)
def update_group_for_term_of_defence(examination_schedule_id: int, term_of_defence_id: int, data: dict) -> dict:
set_new_group_to_term_of_defence(examination_schedule_id, term_of_defence_id, data.get("group_id"))
db.session.commit()
return {"message": "Group for term of defence was updated!"}