267 lines
11 KiB
Python
267 lines
11 KiB
Python
import datetime
|
|
|
|
from apiflask import APIBlueprint
|
|
from flask import abort, current_app
|
|
from sqlalchemy import or_, and_
|
|
from flask_sqlalchemy import get_debug_queries
|
|
|
|
from ..schemas import MessageSchema, TermOfDefenceSchema, TermOfDefenceListSchema, \
|
|
TemporaryAvailabilityListSchema, AssignedGroupToTermOfDefenceListSchema, GroupIdSchema
|
|
from ...examination_schedule.models import TermOfDefence, TemporaryAvailability, committee
|
|
from ...students.models import YearGroup
|
|
from ...project_supervisor.models import ProjectSupervisor
|
|
from ...dependencies import db
|
|
from ..utils import generate_range_dates
|
|
from ..query.enrollments import get_term_of_defence_by_id_and_examination_schedule_id, set_new_group_to_term_of_defence, \
|
|
get_examination_schedule_by_id
|
|
|
|
bp = APIBlueprint("enrollments", __name__, url_prefix="/enrollments")
|
|
|
|
|
|
@bp.post('/<int:examination_schedule_id>/generate')
|
|
@bp.output(MessageSchema)
|
|
def generate_term_of_defence_for_this_examination_schedule(examination_schedule_id: int) -> dict:
|
|
ex = get_examination_schedule_by_id(examination_schedule_id)
|
|
limit = current_app.config.get('LIMIT_MEMBERS_PER_COMMITTEE', 3)
|
|
|
|
term_of_defences = TermOfDefence.query. \
|
|
filter(TermOfDefence.examination_schedule_id == examination_schedule_id).first()
|
|
if term_of_defences is not None:
|
|
abort(400,
|
|
"First you have to delete all term of defences for this examination schedule to generate them again!")
|
|
|
|
temporary_availabilities = TemporaryAvailability.query. \
|
|
filter(TemporaryAvailability.examination_schedule_id == examination_schedule_id). \
|
|
join(TemporaryAvailability.project_supervisor). \
|
|
all()
|
|
if len(temporary_availabilities) == 0:
|
|
abort(404, "Not found temporary availabilities for project supervisors")
|
|
|
|
dates = generate_range_dates(ex.start_date, ex.end_date, ex.duration_time)
|
|
print(temporary_availabilities)
|
|
for d in dates:
|
|
e = d + datetime.timedelta(minutes=ex.duration_time)
|
|
t = list(filter(lambda ta: ta.start_date <= d and ta.end_date >= e, temporary_availabilities))
|
|
if len(t) >= limit:
|
|
print(f'{d} -- {e}')
|
|
print(t)
|
|
print("IN")
|
|
projects_supervisors = [t[i].project_supervisor for i in range(limit)]
|
|
term_of_defence = TermOfDefence(start_date=d, end_date=e, examination_schedule_id=examination_schedule_id)
|
|
term_of_defence.members_of_committee = projects_supervisors
|
|
db.session.add(term_of_defence)
|
|
db.session.commit()
|
|
|
|
return {"message": "Term of defences was generated!"}
|
|
|
|
|
|
@bp.post('/<int:examination_schedule_id>/clear-term-of-defences/')
|
|
@bp.output(MessageSchema)
|
|
def clear_generated_term_of_defences(examination_schedule_id: int) -> dict:
|
|
get_examination_schedule_by_id(examination_schedule_id)
|
|
|
|
# count_defences = db.func.count(ProjectSupervisor.id)
|
|
# td = db.session.query(TermOfDefence, count_defences). \
|
|
# join(ProjectSupervisor, isouter=True). \
|
|
# filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \
|
|
# group_by(TermOfDefence.id).having(count_defences > 0).first()
|
|
# print(td)
|
|
# if td is not None:
|
|
# abort(400, "First you remove group assigned to term of defence to clear term of defences!")
|
|
|
|
td = TermOfDefence.query. \
|
|
filter(TermOfDefence.examination_schedule_id == examination_schedule_id).all()
|
|
while True:
|
|
for t in td[0:10]:
|
|
db.session.delete(t)
|
|
db.session.commit()
|
|
if len(td) == 0:
|
|
break
|
|
td[0:10] = []
|
|
|
|
return {"message": "Term of defences was deleted!"}
|
|
|
|
|
|
@bp.post('/<int:examination_schedule_id>/add')
|
|
@bp.input(TermOfDefenceSchema)
|
|
@bp.output(MessageSchema)
|
|
def create_term_of_defence(examination_schedule_id: int, data: dict) -> dict:
|
|
if not data:
|
|
abort(400, "You have passed empty data!")
|
|
|
|
ex = get_examination_schedule_by_id(examination_schedule_id)
|
|
|
|
yg_id = ex.year_group_id
|
|
project_supervisors_ids = data.pop('project_supervisors')
|
|
project_supervisors = ProjectSupervisor.query.filter(
|
|
or_(*[ProjectSupervisor.id == i for i in project_supervisors_ids])).filter(YearGroup.id == yg_id).all()
|
|
|
|
if len(project_supervisors) != len(project_supervisors_ids):
|
|
abort(404, "Project Supervisors didn't exist!")
|
|
|
|
start_date = data['start_date']
|
|
end_date = data['end_date']
|
|
|
|
if not (ex.start_date.timestamp() <= start_date.timestamp() and ex.end_date.timestamp() >= end_date.timestamp()):
|
|
abort(400, "Invalid date range!")
|
|
|
|
if end_date <= start_date:
|
|
abort(400, "End date must be greater than start date!")
|
|
|
|
delta_time = end_date - start_date
|
|
delta_time_in_minutes = delta_time.total_seconds() / 60
|
|
if delta_time_in_minutes != ex.duration_time:
|
|
abort(400, "Invalid duration time!")
|
|
|
|
td = TermOfDefence.query.filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \
|
|
filter(
|
|
or_(and_(TermOfDefence.start_date >= start_date,
|
|
TermOfDefence.start_date < end_date,
|
|
TermOfDefence.end_date >= end_date),
|
|
and_(TermOfDefence.start_date <= start_date,
|
|
TermOfDefence.end_date > start_date,
|
|
TermOfDefence.end_date <= end_date))).first()
|
|
|
|
if td is not None:
|
|
abort(400, "This term of defence is taken! You choose other date!")
|
|
|
|
td = TermOfDefence(**data, examination_schedule_id=examination_schedule_id)
|
|
db.session.add(td)
|
|
db.session.commit()
|
|
for p in project_supervisors:
|
|
td.members_of_committee.append(p)
|
|
db.session.commit()
|
|
return {"message": "Term of defence was created!"}
|
|
|
|
|
|
@bp.put('/<int:examination_schedule_id>/update/<int:term_of_defence_id>/')
|
|
@bp.input(TermOfDefenceSchema)
|
|
@bp.output(MessageSchema)
|
|
def update_term_of_defence(examination_schedule_id: int, term_of_defence_id: int, data: dict) -> dict:
|
|
if not data:
|
|
abort(400, "You have passed empty data!")
|
|
|
|
td_query = TermOfDefence.query.filter(TermOfDefence.id == term_of_defence_id,
|
|
TermOfDefence.examination_schedule_id == examination_schedule_id)
|
|
td = td_query.first()
|
|
if td is None:
|
|
abort(404, "Not found term of defence!")
|
|
|
|
ex = td.examination_schedule
|
|
yg_id = ex.year_group_id
|
|
project_supervisors_ids = data.pop('project_supervisors')
|
|
project_supervisors = ProjectSupervisor.query.filter(
|
|
or_(*[ProjectSupervisor.id == i for i in project_supervisors_ids])).filter(YearGroup.id == yg_id).all()
|
|
|
|
if len(project_supervisors) != len(project_supervisors_ids):
|
|
abort(404, "Project Supervisors didn't exist!")
|
|
|
|
start_date = data['start_date']
|
|
end_date = data['end_date']
|
|
if not (ex.start_date.timestamp() <= start_date.timestamp() and ex.end_date.timestamp() >= end_date.timestamp()):
|
|
abort(400, "Invalid date range!")
|
|
|
|
if end_date <= start_date:
|
|
abort(400, "End date must be greater than start date!")
|
|
|
|
delta_time = end_date - start_date
|
|
delta_time_in_minutes = delta_time.total_seconds() / 60
|
|
if delta_time_in_minutes != ex.duration_time:
|
|
abort(400, "Invalid duration time!")
|
|
|
|
term_of_defence = TermOfDefence.query.filter(TermOfDefence.id != term_of_defence_id,
|
|
TermOfDefence.examination_schedule_id == examination_schedule_id). \
|
|
filter(
|
|
or_(and_(TermOfDefence.start_date >= start_date,
|
|
TermOfDefence.start_date < end_date,
|
|
TermOfDefence.end_date >= end_date),
|
|
and_(TermOfDefence.start_date <= start_date,
|
|
TermOfDefence.end_date > start_date,
|
|
TermOfDefence.end_date <= end_date))).first()
|
|
|
|
if term_of_defence is not None:
|
|
abort(400, "This term of defence is taken! You choose other date!")
|
|
|
|
td_query.update(data)
|
|
td.members_of_committee = []
|
|
db.session.commit()
|
|
for p in project_supervisors:
|
|
td.members_of_committee.append(p)
|
|
db.session.commit()
|
|
|
|
return {"message": "Term of defence was updated!"}
|
|
|
|
|
|
@bp.delete('/<int:examination_schedule_id>/delete/<int:term_of_defence_id>/')
|
|
@bp.output(MessageSchema)
|
|
def delete_term_of_defence(examination_schedule_id: int, term_of_defence_id: int) -> dict:
|
|
td = get_term_of_defence_by_id_and_examination_schedule_id(examination_schedule_id, term_of_defence_id)
|
|
db.session.delete(td)
|
|
db.session.commit()
|
|
return {"message": "Term of defence was deleted!"}
|
|
|
|
|
|
@bp.get('/<int:examination_schedule_id>/term-of-defences/')
|
|
@bp.output(TermOfDefenceListSchema)
|
|
def list_of_term_of_defences(examination_schedule_id: int) -> dict:
|
|
get_examination_schedule_by_id(examination_schedule_id)
|
|
|
|
td = TermOfDefence.query. \
|
|
filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \
|
|
join(TermOfDefence.members_of_committee, isouter=True). \
|
|
all()
|
|
return {"term_of_defences": td}
|
|
|
|
|
|
@bp.get('/<int:examination_schedule_id>/temporary-availabilities/')
|
|
@bp.output(TemporaryAvailabilityListSchema)
|
|
def list_of_temporary_availability(examination_schedule_id: int) -> dict:
|
|
get_examination_schedule_by_id(examination_schedule_id)
|
|
|
|
td = TemporaryAvailability.query. \
|
|
filter(TemporaryAvailability.examination_schedule_id == examination_schedule_id). \
|
|
join(TemporaryAvailability.project_supervisor). \
|
|
all()
|
|
return {"temporary_availabilities": td}
|
|
|
|
|
|
@bp.get('/<int:examination_schedule_id>/assigned-group-to-term-of-defences/')
|
|
@bp.output(AssignedGroupToTermOfDefenceListSchema)
|
|
def list_of_assigned_group_to_term_of_defences(examination_schedule_id: int) -> dict:
|
|
get_examination_schedule_by_id(examination_schedule_id)
|
|
|
|
td = TermOfDefence.query. \
|
|
join(TermOfDefence.members_of_committee, isouter=True). \
|
|
join(TermOfDefence.group). \
|
|
filter(TermOfDefence.examination_schedule_id == examination_schedule_id). \
|
|
filter(TermOfDefence.group_id.isnot(None)). \
|
|
all()
|
|
return {"term_of_defences": td}
|
|
|
|
|
|
@bp.post('/<int:examination_schedule_id>/term-of-defence/<int:term_of_defence_id>/group/')
|
|
@bp.input(GroupIdSchema)
|
|
@bp.output(MessageSchema)
|
|
def add_group_to_term_of_defence(examination_schedule_id: int, term_of_defence_id: int, data: dict) -> dict:
|
|
set_new_group_to_term_of_defence(examination_schedule_id, term_of_defence_id, data.get("group_id"))
|
|
db.session.commit()
|
|
return {"message": "Group was added to term of defences!"}
|
|
|
|
|
|
@bp.delete('/<int:examination_schedule_id>/term-of-defence/<int:term_of_defence_id>/group/')
|
|
@bp.output(MessageSchema)
|
|
def delete_group_to_term_of_defence(examination_schedule_id: int, term_of_defence_id: int) -> dict:
|
|
td = get_term_of_defence_by_id_and_examination_schedule_id(examination_schedule_id, term_of_defence_id)
|
|
td.group_id = None
|
|
db.session.commit()
|
|
return {"message": "Group was deleted from term of defences!"}
|
|
|
|
|
|
@bp.put('/<int:examination_schedule_id>/term-of-defence/<int:term_of_defence_id>/group/')
|
|
@bp.input(GroupIdSchema)
|
|
@bp.output(MessageSchema)
|
|
def update_group_for_term_of_defence(examination_schedule_id: int, term_of_defence_id: int, data: dict) -> dict:
|
|
set_new_group_to_term_of_defence(examination_schedule_id, term_of_defence_id, data.get("group_id"))
|
|
db.session.commit()
|
|
return {"message": "Group for term of defence was updated!"}
|