from random import randint from itertools import islice from flask import abort from apiflask import APIBlueprint from marshmallow import ValidationError from sqlalchemy.exc import IntegrityError from flask_sqlalchemy import get_debug_queries from ...students.models import Student from ..schemas import StudentSchema, StudentEditSchema, StudentsPaginationSchema, \ StudentCreateSchema, MessageSchema, FileSchema, StudentQuerySchema from ...dependencies import db from ..utils import parse_csv from ..exceptions import InvalidNameOrTypeHeaderException from ...base.utils import paginate_models, is_allowed_extensions bp = APIBlueprint("students", __name__, url_prefix="/students") @bp.route("/", methods=["GET"]) @bp.input(StudentQuerySchema, location='query') @bp.output(StudentsPaginationSchema) def list_students(query: dict) -> dict: fullname = query.get('fullname') order_by_first_name = query.get('order_by_first_name') order_by_last_name = query.get('order_by_last_name') page = query.get('page') student_query = Student.search_by_fullname_and_order_by_first_name_or_last_name(fullname, order_by_first_name, order_by_last_name) response = paginate_models(page, student_query) if (message := response.get('message')) is not None: abort(response['status_code'], message) # print(get_debug_queries()[0]) return { "students": response['items'], "max_pages": response['max_pages'] } @bp.route("//", methods=["GET"]) @bp.output(StudentSchema) def detail_student(index: int) -> Student: student = Student.query.filter_by(index=index).first() if student is None: abort(404, f"Student with {index} index doesn't exist!") return student @bp.route("//", methods=["DELETE"]) @bp.output(MessageSchema, status_code=202) def delete_student(index: int) -> dict: student = Student.query.filter_by(index=index).first() if student is None: abort(404, f"Student with {index} index doesn't exist!") db.session.delete(student) db.session.commit() return {"message": "Student was deleted!"} @bp.route("//", methods=["PUT"]) @bp.input(StudentEditSchema) @bp.output(MessageSchema) def edit_student(index: int, data: dict) -> dict: try: if not data: abort(400, 'You have passed empty data!') student_query = Student.query.filter_by(index=index) student = student_query.first() if student is None: abort(404, 'Not found student!') student_query.update(data) db.session.commit() except ValidationError as e: abort(422, e.messages) return {"message": "Student was updated!"} @bp.route("/", methods=["POST"]) @bp.input(StudentCreateSchema) @bp.output(MessageSchema) def create_student(data: dict) -> dict: try: index = data['index'] student = Student.query.filter_by(index=index).first() if student is not None: abort(400, "Student has already exists!") dummy_email = f'student{randint(1, 300_000)}@gmail.com' student = Student(**data, email=dummy_email) db.session.add(student) db.session.commit() except ValidationError as e: abort(422, e.messages) return {"message": "Student was created!"} @bp.route("/upload/", methods=["POST"]) @bp.input(FileSchema, location='form_and_files') @bp.output(MessageSchema) def upload_students(file: dict) -> dict: uploaded_file = file.get('file') if uploaded_file and is_allowed_extensions(uploaded_file.filename): try: students = parse_csv(uploaded_file) while True: sliced_students = islice(students, 5) list_of_students = list(sliced_students) if len(list_of_students) == 0: break db.session.add_all(list_of_students) db.session.commit() except InvalidNameOrTypeHeaderException: abort(400, "Invalid format of csv file!") except IntegrityError as e: # print(e) # in the future create sql query checks index and add only these students, which didn't exist in db abort(400, "These students have already exist!") else: abort(400, "Invalid extension of file") return {"message": "Students was created by uploading csv file!"}