from itertools import islice from random import randint from apiflask import APIBlueprint from flask import Response, abort from sqlalchemy import or_ from ...base.schemas import MessageSchema from ...base.utils import is_allowed_extensions, paginate_models from ...dependencies import db from ...project_supervisor.models import ProjectSupervisor from ...students.models import Group, Student, YearGroup from ..exceptions import InvalidNameOrTypeHeaderException from ..schemas.students import ( FileSchema, StudentCreateSchema, StudentEditSchema, StudentListFileDownloaderSchema, StudentQuerySchema, StudentSchema, StudentsPaginationSchema, YearGroupInfoQuery, ) from ..utils import generate_csv, parse_csv bp = APIBlueprint("students", __name__, url_prefix="/students") @bp.get("//") @bp.input(StudentQuerySchema, location="query") @bp.output(StudentsPaginationSchema) def list_students(year_group_id: int, query: dict) -> dict: # add filter by year group fullname = query.get("fullname") order_by_first_name = query.get("order_by_first_name") order_by_last_name = query.get("order_by_last_name") page = query.get("page") per_page = query.get("per_page") student_query = ( Student.search_by_fullname_and_mode_and_order_by_first_name_or_last_name( year_group_id, fullname, order_by_first_name, order_by_last_name ) ) data = paginate_models(page, student_query, per_page) # print(get_debug_queries()[0]) return {"students": data["items"], "max_pages": data["max_pages"]} @bp.get("//detail/") @bp.output(StudentSchema) def detail_student(student_id: int) -> Student: student = Student.query.filter_by(id=student_id).first() if student is None: abort(404, "Not found student!") return student @bp.delete("//") @bp.output(MessageSchema, status_code=202) def delete_student(student_id: int) -> dict: student = Student.query.filter_by(id=student_id).first() if student is None: abort(404, "Not found student!") db.session.delete(student) db.session.commit() return {"message": "Student was deleted!"} @bp.put("//") @bp.input(StudentEditSchema) @bp.output(MessageSchema) def edit_student(student_id: int, data: dict) -> dict: if not data: abort(400, "You have passed empty data!") student_query = Student.query.filter(Student.id == student_id) student = student_query.first() if student is None: abort(404, "Not found student!") student_query.update(data) db.session.commit() return {"message": "Student was updated!"} @bp.post("/") @bp.input(StudentCreateSchema) @bp.output(MessageSchema) def create_student(data: dict) -> dict: index = data["index"] yg_id = data["year_group_id"] del data["year_group_id"] student = Student.query.filter( Student.index == index, Student.year_group_id == yg_id ).first() if student is not None: abort(400, "Student has already assigned to this year group!") year_group = YearGroup.query.filter(YearGroup.id == yg_id).first() if year_group is None: abort(404, "Not found year group!") dummy_email = f"student{randint(1, 300_000)}@gmail.com" student = Student(**data, email=dummy_email, year_group_id=yg_id) db.session.add(student) db.session.commit() return {"message": "Student was created!"} @bp.post("/upload/") @bp.input(YearGroupInfoQuery, location="query") @bp.input(FileSchema, location="form_and_files") @bp.output(MessageSchema) def upload_students(query: dict, file: dict) -> dict: """Add only Students to chosen year group if students exist in db and assigned to correct year group, they will be omitted""" year_group_id = query.get("year_group_id") yg = YearGroup.query.filter(YearGroup.id == year_group_id).first() if yg is None: abort(404, "Not found year group!") uploaded_file = file.get("file") if uploaded_file and is_allowed_extensions(uploaded_file.filename): try: students = parse_csv(uploaded_file, year_group_id) while True: sliced_students = islice(students, 5) list_of_students = list(sliced_students) if len(list_of_students) == 0: break students_in_db = ( Student.query.filter( or_(Student.index == s.index for s in list_of_students) ) .filter(Student.year_group_id == year_group_id) .all() ) student_index_in_db = [s.index for s in students_in_db] students_not_exists_in_db = list( filter( lambda s: s.index not in student_index_in_db, list_of_students ) ) db.session.add_all(students_not_exists_in_db) db.session.commit() except InvalidNameOrTypeHeaderException: abort(400, "Invalid format of csv file!") else: abort(400, "Invalid extension of file") return {"message": "Students was created by uploading csv file!"} @bp.post("/download/") @bp.input(StudentListFileDownloaderSchema, location="query") def download_students(query: dict) -> Response: year_group_id = query.get("year_group_id") students_and_groups = ( db.session.query(Student, Group) .join(Group, Student.groups) .filter(Group.year_group_id == year_group_id) .join(ProjectSupervisor) .all() ) if len(students_and_groups) == 0: abort(404, "Not found students!") csv_file = generate_csv(students_and_groups) response = Response(csv_file, mimetype="text/csv") response.headers.set( "Content-Disposition", "attachment", filename="students_list.csv" ) return response