137 lines
4.6 KiB
Python
137 lines
4.6 KiB
Python
from random import randint
|
|
from itertools import islice
|
|
from typing import List
|
|
|
|
from flask import Response, abort
|
|
from apiflask import APIBlueprint
|
|
from sqlalchemy.exc import IntegrityError
|
|
from flask_sqlalchemy import get_debug_queries
|
|
|
|
from ...students.models import Student, Group
|
|
from ...project_supervisor.models import ProjectSupervisor
|
|
from ..schemas import StudentSchema, StudentEditSchema, StudentsPaginationSchema, \
|
|
StudentCreateSchema, MessageSchema, FileSchema, StudentQuerySchema
|
|
from ...dependencies import db
|
|
from ..utils import parse_csv, generate_csv
|
|
from ..exceptions import InvalidNameOrTypeHeaderException
|
|
from ...base.utils import paginate_models, is_allowed_extensions
|
|
|
|
bp = APIBlueprint("students", __name__, url_prefix="/students")
|
|
|
|
|
|
@bp.route("/", methods=["GET"])
|
|
@bp.input(StudentQuerySchema, location='query')
|
|
@bp.output(StudentsPaginationSchema)
|
|
def list_students(query: dict) -> dict:
|
|
fullname = query.get('fullname')
|
|
order_by_first_name = query.get('order_by_first_name')
|
|
order_by_last_name = query.get('order_by_last_name')
|
|
mode = query.get('mode')
|
|
page = query.get('page')
|
|
per_page = query.get('per_page')
|
|
|
|
student_query = Student.search_by_fullname_and_mode_and_order_by_first_name_or_last_name(
|
|
fullname, mode, order_by_first_name, order_by_last_name)
|
|
|
|
data = paginate_models(page, student_query, per_page)
|
|
# print(get_debug_queries()[0])
|
|
return {
|
|
"students": data['items'],
|
|
"max_pages": data['max_pages']
|
|
}
|
|
|
|
|
|
@bp.route("/<int:index>/", methods=["GET"])
|
|
@bp.output(StudentSchema)
|
|
def detail_student(index: int) -> Student:
|
|
student = Student.query.filter_by(index=index).first()
|
|
if student is None:
|
|
abort(404, f"Student with {index} index doesn't exist!")
|
|
return student
|
|
|
|
|
|
@bp.route("/<int:index>/", methods=["DELETE"])
|
|
@bp.output(MessageSchema, status_code=202)
|
|
def delete_student(index: int) -> dict:
|
|
student = Student.query.filter_by(index=index).first()
|
|
if student is None:
|
|
abort(404, f"Student with {index} index doesn't exist!")
|
|
db.session.delete(student)
|
|
db.session.commit()
|
|
return {"message": "Student was deleted!"}
|
|
|
|
|
|
@bp.route("/<int:index>/", methods=["PUT"])
|
|
@bp.input(StudentEditSchema)
|
|
@bp.output(MessageSchema)
|
|
def edit_student(index: int, data: dict) -> dict:
|
|
if not data:
|
|
abort(400, 'You have passed empty data!')
|
|
|
|
student_query = Student.query.filter_by(index=index)
|
|
student = student_query.first()
|
|
|
|
if student is None:
|
|
abort(404, 'Not found student!')
|
|
|
|
student_query.update(data)
|
|
db.session.commit()
|
|
|
|
return {"message": "Student was updated!"}
|
|
|
|
|
|
@bp.route("/", methods=["POST"])
|
|
@bp.input(StudentCreateSchema)
|
|
@bp.output(MessageSchema)
|
|
def create_student(data: dict) -> dict:
|
|
index = data['index']
|
|
student = Student.query.filter_by(index=index).first()
|
|
if student is not None:
|
|
abort(400, "Student has already exists!")
|
|
|
|
dummy_email = f'student{randint(1, 300_000)}@gmail.com'
|
|
student = Student(**data, email=dummy_email)
|
|
db.session.add(student)
|
|
db.session.commit()
|
|
|
|
return {"message": "Student was created!"}
|
|
|
|
|
|
@bp.route("/upload/", methods=["POST"])
|
|
@bp.input(FileSchema, location='form_and_files')
|
|
@bp.output(MessageSchema)
|
|
def upload_students(file: dict) -> dict:
|
|
uploaded_file = file.get('file')
|
|
if uploaded_file and is_allowed_extensions(uploaded_file.filename):
|
|
try:
|
|
students = parse_csv(uploaded_file, mode=True)
|
|
while True:
|
|
sliced_students = islice(students, 5)
|
|
list_of_students = list(sliced_students)
|
|
|
|
if len(list_of_students) == 0:
|
|
break
|
|
db.session.add_all(list_of_students)
|
|
db.session.commit()
|
|
except InvalidNameOrTypeHeaderException:
|
|
abort(400, "Invalid format of csv file!")
|
|
except IntegrityError as e:
|
|
# in the future create sql query checks index and add only these students, which didn't exist in db
|
|
abort(400, "These students have already exist!")
|
|
|
|
else:
|
|
abort(400, "Invalid extension of file")
|
|
|
|
return {"message": "Students was created by uploading csv file!"}
|
|
|
|
|
|
@bp.route("/download/", methods=["POST"])
|
|
def download_students() -> Response:
|
|
students = db.session.query(Student).join(Group).join(ProjectSupervisor).all()
|
|
if len(students) == 0:
|
|
abort(404, "Not found students, which are assigned to group!")
|
|
csv_file = generate_csv(students)
|
|
response = Response(csv_file, mimetype='text/csv')
|
|
response.headers.set("Content-Disposition", "attachment", filename="students_list.csv")
|
|
return response
|