from random import randint from itertools import islice from flask import Response, abort from apiflask import APIBlueprint from sqlalchemy.exc import IntegrityError from flask_sqlalchemy import get_debug_queries from ...students.models import Student, Group, YearGroup, YearGroupStudents from ...project_supervisor.models import ProjectSupervisor from ..schemas import StudentSchema, StudentEditSchema, StudentsPaginationSchema, \ StudentCreateSchema, MessageSchema, FileSchema, StudentQuerySchema, StudentListFileDownloaderSchema from ...dependencies import db from ..utils import parse_csv, generate_csv from ..exceptions import InvalidNameOrTypeHeaderException from ...base.utils import paginate_models, is_allowed_extensions bp = APIBlueprint("students", __name__, url_prefix="/students") @bp.get("//") @bp.input(StudentQuerySchema, location='query') @bp.output(StudentsPaginationSchema) def list_students(year_group_id: int, query: dict) -> dict: # add filter by year group fullname = query.get('fullname') order_by_first_name = query.get('order_by_first_name') order_by_last_name = query.get('order_by_last_name') page = query.get('page') per_page = query.get('per_page') student_query = Student.search_by_fullname_and_mode_and_order_by_first_name_or_last_name( year_group_id, fullname, order_by_first_name, order_by_last_name) data = paginate_models(page, student_query, per_page) # print(get_debug_queries()[0]) return { "students": data['items'], "max_pages": data['max_pages'] } @bp.get("//") @bp.output(StudentSchema) def detail_student(index: int) -> Student: student = Student.query.filter_by(index=index).first() if student is None: abort(404, f"Student with {index} index doesn't exist!") return student @bp.delete("//") @bp.output(MessageSchema, status_code=202) def delete_student(index: int) -> dict: student = Student.query.filter_by(index=index).first() if student is None: abort(404, f"Student with {index} index doesn't exist!") db.session.delete(student) db.session.commit() return {"message": "Student was deleted!"} @bp.put("//") @bp.input(StudentEditSchema) @bp.output(MessageSchema) def edit_student(index: int, data: dict) -> dict: if not data: abort(400, 'You have passed empty data!') student_query = Student.query.filter_by(index=index) student = student_query.first() if student is None: abort(404, 'Not found student!') student_query.update(data) db.session.commit() return {"message": "Student was updated!"} @bp.post("/") @bp.input(StudentCreateSchema) @bp.output(MessageSchema) def create_student(data: dict) -> dict: index = data['index'] yg_id = data['year_group_id'] del data['year_group_id'] student = Student.query.filter_by(index=index).first() if student is not None: abort(400, "Student has already exists!") dummy_email = f'student{randint(1, 300_000)}@gmail.com' student = Student(**data, email=dummy_email) db.session.add(student) # add student to the chosen year group year_group = YearGroup.query.filter(YearGroup.id == yg_id).first() if year_group is None: abort(400, "Year group doesn't exist!") ygs = YearGroupStudents(student_index=student.index, year_group_id=year_group.id) db.session.add(ygs) db.session.commit() return {"message": "Student was created!"} @bp.post("/upload/") @bp.input(FileSchema, location='form_and_files') @bp.output(MessageSchema) def upload_students(file: dict) -> dict: uploaded_file = file.get('file') if uploaded_file and is_allowed_extensions(uploaded_file.filename): try: students = parse_csv(uploaded_file, mode=True) while True: sliced_students = islice(students, 5) list_of_students = list(sliced_students) if len(list_of_students) == 0: break db.session.add_all(list_of_students) db.session.commit() except InvalidNameOrTypeHeaderException: abort(400, "Invalid format of csv file!") except IntegrityError as e: # in the future create sql query checks index and add only these students, which didn't exist in db abort(400, "These students have already exist!") else: abort(400, "Invalid extension of file") return {"message": "Students was created by uploading csv file!"} @bp.post("/download/") @bp.input(StudentListFileDownloaderSchema, location='query') def download_students(query: dict) -> Response: mode = query.get('mode') mode = mode if mode is not None else True students = db.session.query(Student).join(Group). \ join(ProjectSupervisor).filter(Student.mode == mode).all() if len(students) == 0: abort(404, "Not found students, which are assigned to group!") csv_file = generate_csv(students) response = Response(csv_file, mimetype='text/csv') response.headers.set("Content-Disposition", "attachment", filename="students_list.csv") return response