system-pri/backend/app/coordinator/routes/students.py

137 lines
4.6 KiB
Python

from random import randint
from itertools import islice
from typing import List
from flask import Response, abort
from apiflask import APIBlueprint
from sqlalchemy.exc import IntegrityError
from flask_sqlalchemy import get_debug_queries
from ...students.models import Student, Group
from ...project_supervisor.models import ProjectSupervisor
from ..schemas import StudentSchema, StudentEditSchema, StudentsPaginationSchema, \
StudentCreateSchema, MessageSchema, FileSchema, StudentQuerySchema
from ...dependencies import db
from ..utils import parse_csv, generate_csv
from ..exceptions import InvalidNameOrTypeHeaderException
from ...base.utils import paginate_models, is_allowed_extensions
bp = APIBlueprint("students", __name__, url_prefix="/students")
@bp.route("/", methods=["GET"])
@bp.input(StudentQuerySchema, location='query')
@bp.output(StudentsPaginationSchema)
def list_students(query: dict) -> dict:
fullname = query.get('fullname')
order_by_first_name = query.get('order_by_first_name')
order_by_last_name = query.get('order_by_last_name')
mode = query.get('mode')
page = query.get('page')
per_page = query.get('per_page')
student_query = Student.search_by_fullname_and_mode_and_order_by_first_name_or_last_name(
fullname, mode, order_by_first_name, order_by_last_name)
data = paginate_models(page, student_query, per_page)
# print(get_debug_queries()[0])
return {
"students": data['items'],
"max_pages": data['max_pages']
}
@bp.route("/<int:index>/", methods=["GET"])
@bp.output(StudentSchema)
def detail_student(index: int) -> Student:
student = Student.query.filter_by(index=index).first()
if student is None:
abort(404, f"Student with {index} index doesn't exist!")
return student
@bp.route("/<int:index>/", methods=["DELETE"])
@bp.output(MessageSchema, status_code=202)
def delete_student(index: int) -> dict:
student = Student.query.filter_by(index=index).first()
if student is None:
abort(404, f"Student with {index} index doesn't exist!")
db.session.delete(student)
db.session.commit()
return {"message": "Student was deleted!"}
@bp.route("/<int:index>/", methods=["PUT"])
@bp.input(StudentEditSchema)
@bp.output(MessageSchema)
def edit_student(index: int, data: dict) -> dict:
if not data:
abort(400, 'You have passed empty data!')
student_query = Student.query.filter_by(index=index)
student = student_query.first()
if student is None:
abort(404, 'Not found student!')
student_query.update(data)
db.session.commit()
return {"message": "Student was updated!"}
@bp.route("/", methods=["POST"])
@bp.input(StudentCreateSchema)
@bp.output(MessageSchema)
def create_student(data: dict) -> dict:
index = data['index']
student = Student.query.filter_by(index=index).first()
if student is not None:
abort(400, "Student has already exists!")
dummy_email = f'student{randint(1, 300_000)}@gmail.com'
student = Student(**data, email=dummy_email)
db.session.add(student)
db.session.commit()
return {"message": "Student was created!"}
@bp.route("/upload/", methods=["POST"])
@bp.input(FileSchema, location='form_and_files')
@bp.output(MessageSchema)
def upload_students(file: dict) -> dict:
uploaded_file = file.get('file')
if uploaded_file and is_allowed_extensions(uploaded_file.filename):
try:
students = parse_csv(uploaded_file, mode=True)
while True:
sliced_students = islice(students, 5)
list_of_students = list(sliced_students)
if len(list_of_students) == 0:
break
db.session.add_all(list_of_students)
db.session.commit()
except InvalidNameOrTypeHeaderException:
abort(400, "Invalid format of csv file!")
except IntegrityError as e:
# in the future create sql query checks index and add only these students, which didn't exist in db
abort(400, "These students have already exist!")
else:
abort(400, "Invalid extension of file")
return {"message": "Students was created by uploading csv file!"}
@bp.route("/download/", methods=["POST"])
def download_students() -> Response:
students = db.session.query(Student).join(Group).join(ProjectSupervisor).all()
if len(students) == 0:
abort(404, "Not found students, which are assigned to group!")
csv_file = generate_csv(students)
response = Response(csv_file, mimetype='text/csv')
response.headers.set("Content-Disposition", "attachment", filename="students_list.csv")
return response