system-pri/backend/app/coordinator/routes/students.py

178 lines
6.6 KiB
Python

from random import randint
from itertools import islice
from flask import Response, abort
from apiflask import APIBlueprint
from sqlalchemy import or_
from flask_sqlalchemy import get_debug_queries
from ...students.models import Student, Group, YearGroup, YearGroupStudents
from ...project_supervisor.models import ProjectSupervisor
from ..schemas import StudentSchema, StudentEditSchema, StudentsPaginationSchema, YearGroupInfoQuery, \
StudentCreateSchema, MessageSchema, FileSchema, StudentQuerySchema, StudentListFileDownloaderSchema
from ...dependencies import db
from ..utils import parse_csv, generate_csv
from ..exceptions import InvalidNameOrTypeHeaderException
from ...base.utils import paginate_models, is_allowed_extensions
bp = APIBlueprint("students", __name__, url_prefix="/students")
@bp.get("/<int:year_group_id>/")
@bp.input(StudentQuerySchema, location='query')
@bp.output(StudentsPaginationSchema)
def list_students(year_group_id: int, query: dict) -> dict:
# add filter by year group
fullname = query.get('fullname')
order_by_first_name = query.get('order_by_first_name')
order_by_last_name = query.get('order_by_last_name')
page = query.get('page')
per_page = query.get('per_page')
student_query = Student.search_by_fullname_and_mode_and_order_by_first_name_or_last_name(
year_group_id, fullname, order_by_first_name, order_by_last_name)
data = paginate_models(page, student_query, per_page)
# print(get_debug_queries()[0])
return {
"students": data['items'],
"max_pages": data['max_pages']
}
@bp.get("/<int:index>/detail/")
@bp.output(StudentSchema)
def detail_student(index: int) -> Student:
student = Student.query.filter_by(index=index).first()
if student is None:
abort(404, "Not found student!")
return student
@bp.delete("/<int:index>/")
@bp.output(MessageSchema, status_code=202)
def delete_student(index: int) -> dict:
student = Student.query.filter_by(index=index).first()
if student is None:
abort(404, "Not found student!")
db.session.delete(student)
db.session.commit()
return {"message": "Student was deleted!"}
@bp.put("/<int:index>/")
@bp.input(StudentEditSchema)
@bp.output(MessageSchema)
def edit_student(index: int, data: dict) -> dict:
if not data:
abort(400, 'You have passed empty data!')
student_query = Student.query.filter_by(index=index)
student = student_query.first()
if student is None:
abort(404, 'Not found student!')
student_query.update(data)
db.session.commit()
return {"message": "Student was updated!"}
@bp.post("/")
@bp.input(StudentCreateSchema)
@bp.output(MessageSchema)
def create_student(data: dict) -> dict:
index = data['index']
yg_id = data['year_group_id']
del data['year_group_id']
student = Student.query.filter(Student.index == index).first()
# if student is not None:
# abort(400, "Student has already exists!")
if student is None:
dummy_email = f'student{randint(1, 300_000)}@gmail.com'
student = Student(**data, email=dummy_email)
db.session.add(student)
# add student to the chosen year group
year_group = YearGroup.query.filter(YearGroup.id == yg_id).first()
if year_group is None:
abort(404, "Not found year group!")
if any((year_group.id == yg.id for yg in student.year_groups)):
abort(400, "You are assigned to this year group!")
ygs = YearGroupStudents(student_index=student.index, year_group_id=year_group.id)
db.session.add(ygs)
db.session.commit()
return {"message": "Student was created!"}
@bp.post("/upload/")
@bp.input(YearGroupInfoQuery, location='query')
@bp.input(FileSchema, location='form_and_files')
@bp.output(MessageSchema)
def upload_students(query: dict, file: dict) -> dict:
"""Add only Students to chosen year group if students exist in db and assigned to correct year group,
they will be omitted"""
year_group_id = query.get('id')
yg = YearGroup.query.filter(YearGroup.id == year_group_id).first()
if yg is None:
abort(404, "Not found year group!")
uploaded_file = file.get('file')
if uploaded_file and is_allowed_extensions(uploaded_file.filename):
try:
students = parse_csv(uploaded_file)
while True:
sliced_students = islice(students, 5)
list_of_students = list(sliced_students)
if len(list_of_students) == 0:
break
students_in_db = Student.query.filter(or_(Student.index == s.index for s in list_of_students)).all()
students_in_db_and_assigned_to_year_group = Student.query.join(YearGroupStudents, isouter=True). \
filter(YearGroupStudents.year_group_id == year_group_id). \
filter(or_(Student.index == s.index for s in list_of_students)).all()
student_index_in_db = [s.index for s in students_in_db]
student_index_in_year_group = [s.index for s in students_in_db_and_assigned_to_year_group]
students_in_db_and_not_assigned_to_year_group = list(
filter(lambda s: s.index not in student_index_in_year_group, students_in_db))
students_not_exists_in_db = list(filter(lambda s: s.index not in student_index_in_db, list_of_students))
db.session.add_all(students_not_exists_in_db)
db.session.commit()
ygs = [YearGroupStudents(year_group_id=year_group_id, student_index=student.index) for student in
students_in_db_and_not_assigned_to_year_group + students_not_exists_in_db]
db.session.add_all(ygs)
db.session.commit()
except InvalidNameOrTypeHeaderException:
abort(400, "Invalid format of csv file!")
else:
abort(400, "Invalid extension of file")
return {"message": "Students was created by uploading csv file!"}
@bp.post("/download/")
@bp.input(StudentListFileDownloaderSchema, location='query')
def download_students(query: dict) -> Response:
year_group_id = query.get('year_group_id')
students_and_groups = db.session.query(Student, Group).join(Group, Student.groups). \
filter(Group.year_group_id == year_group_id). \
join(ProjectSupervisor).all()
if len(students_and_groups) == 0:
abort(404, "Not found students, which are assigned to group!")
csv_file = generate_csv(students_and_groups)
response = Response(csv_file, mimetype='text/csv')
response.headers.set("Content-Disposition", "attachment", filename="students_list.csv")
return response