from random import randint from itertools import islice from flask import Response, abort from apiflask import APIBlueprint from sqlalchemy import or_ from flask_sqlalchemy import get_debug_queries from ...students.models import Student, Group, YearGroup, YearGroupStudents from ...project_supervisor.models import ProjectSupervisor from ..schemas import StudentSchema, StudentEditSchema, StudentsPaginationSchema, YearGroupInfoQuery, \ StudentCreateSchema, MessageSchema, FileSchema, StudentQuerySchema, StudentListFileDownloaderSchema from ...dependencies import db from ..utils import parse_csv, generate_csv from ..exceptions import InvalidNameOrTypeHeaderException from ...base.utils import paginate_models, is_allowed_extensions bp = APIBlueprint("students", __name__, url_prefix="/students") @bp.get("//") @bp.input(StudentQuerySchema, location='query') @bp.output(StudentsPaginationSchema) def list_students(year_group_id: int, query: dict) -> dict: # add filter by year group fullname = query.get('fullname') order_by_first_name = query.get('order_by_first_name') order_by_last_name = query.get('order_by_last_name') page = query.get('page') per_page = query.get('per_page') student_query = Student.search_by_fullname_and_mode_and_order_by_first_name_or_last_name( year_group_id, fullname, order_by_first_name, order_by_last_name) data = paginate_models(page, student_query, per_page) # print(get_debug_queries()[0]) return { "students": data['items'], "max_pages": data['max_pages'] } @bp.get("//detail/") @bp.output(StudentSchema) def detail_student(index: int) -> Student: student = Student.query.filter_by(index=index).first() if student is None: abort(404, f"Student with {index} index doesn't exist!") return student @bp.delete("//") @bp.output(MessageSchema, status_code=202) def delete_student(index: int) -> dict: student = Student.query.filter_by(index=index).first() if student is None: abort(404, f"Student with {index} index doesn't exist!") db.session.delete(student) db.session.commit() return {"message": "Student was deleted!"} @bp.put("//") @bp.input(StudentEditSchema) @bp.output(MessageSchema) def edit_student(index: int, data: dict) -> dict: if not data: abort(400, 'You have passed empty data!') student_query = Student.query.filter_by(index=index) student = student_query.first() if student is None: abort(404, 'Not found student!') student_query.update(data) db.session.commit() return {"message": "Student was updated!"} @bp.post("/") @bp.input(StudentCreateSchema) @bp.output(MessageSchema) def create_student(data: dict) -> dict: index = data['index'] yg_id = data['year_group_id'] del data['year_group_id'] student = Student.query.filter_by(index=index).join(Student.year_groups).first() if student is None: # abort(400, "Student has already exists!") dummy_email = f'student{randint(1, 300_000)}@gmail.com' student = Student(**data, email=dummy_email) db.session.add(student) # add student to the chosen year group year_group = YearGroup.query.filter(YearGroup.id == yg_id).first() if year_group is None: abort(400, "Year group doesn't exist!") if any((year_group.id == yg.id for yg in student.year_groups)): abort(400, "You are assigned to this year group!") ygs = YearGroupStudents(student_index=student.index, year_group_id=year_group.id) db.session.add(ygs) db.session.commit() return {"message": "Student was created!"} @bp.post("/upload/") @bp.input(YearGroupInfoQuery, location='query') @bp.input(FileSchema, location='form_and_files') @bp.output(MessageSchema) def upload_students(query: dict, file: dict) -> dict: """Add only Students to chosen year group if students exist in db and assigned to correct year group, they will be omitted""" year_group_id = query.get('id') yg = YearGroup.query.filter(YearGroup.id == year_group_id).first() if yg is None: abort(404, "Not found year group!") uploaded_file = file.get('file') if uploaded_file and is_allowed_extensions(uploaded_file.filename): try: students = parse_csv(uploaded_file) while True: sliced_students = islice(students, 5) list_of_students = list(sliced_students) if len(list_of_students) == 0: break students_in_db = Student.query.filter(or_(Student.index == s.index for s in list_of_students)).all() students_in_db_and_assigned_to_year_group = Student.query.join(YearGroupStudents, isouter=True). \ filter(YearGroupStudents.year_group_id == year_group_id). \ filter(or_(Student.index == s.index for s in list_of_students)).all() student_index_in_db = [s.index for s in students_in_db] student_index_in_year_group = [s.index for s in students_in_db_and_assigned_to_year_group] students_in_db_and_not_assigned_to_year_group = list( filter(lambda s: s.index not in student_index_in_year_group, students_in_db)) students_not_exists_in_db = list(filter(lambda s: s.index not in student_index_in_db, list_of_students)) db.session.add_all(students_not_exists_in_db) db.session.commit() ygs = [YearGroupStudents(year_group_id=year_group_id, student_index=student.index) for student in students_in_db_and_not_assigned_to_year_group + students_not_exists_in_db] db.session.add_all(ygs) db.session.commit() except InvalidNameOrTypeHeaderException: abort(400, "Invalid format of csv file!") else: abort(400, "Invalid extension of file") return {"message": "Students was created by uploading csv file!"} @bp.post("/download/") @bp.input(StudentListFileDownloaderSchema, location='query') def download_students(query: dict) -> Response: year_group_id = query.get('year_group_id') students_and_groups = db.session.query(Student, Group).join(Group, Student.groups). \ filter(Group.year_group_id == year_group_id). \ join(ProjectSupervisor).all() if len(students_and_groups) == 0: abort(404, "Not found students, which are assigned to group!") csv_file = generate_csv(students_and_groups) response = Response(csv_file, mimetype='text/csv') response.headers.set("Content-Disposition", "attachment", filename="students_list.csv") return response