add swagger and update api endpoints
This commit is contained in:
parent
3d36569844
commit
30917eb7d7
@ -1,6 +1,6 @@
|
||||
import os
|
||||
|
||||
from flask import Flask
|
||||
from apiflask import APIFlask
|
||||
from flask_migrate import Migrate
|
||||
from flask_cors import CORS
|
||||
|
||||
@ -9,14 +9,14 @@ from .dependencies import db, ma
|
||||
from .commands.startapp import startapp
|
||||
from .utils import import_models
|
||||
from .api import api_bp
|
||||
from .errors import request_entity_too_large
|
||||
from .errors import request_entity_too_large, register_error_handlers
|
||||
|
||||
|
||||
def create_app(config_name: str = None) -> Flask:
|
||||
def create_app(config_name: str = None) -> APIFlask:
|
||||
if config_name is None:
|
||||
config_name = os.environ.get("FLASK_ENV")
|
||||
|
||||
app = Flask(__name__)
|
||||
app = APIFlask(__name__, docs_path='/')
|
||||
app.config.from_object(config.get(config_name) or config.get("development"))
|
||||
|
||||
if app.config['ENABLE_CORS']:
|
||||
@ -37,6 +37,7 @@ def create_app(config_name: str = None) -> Flask:
|
||||
app.cli.add_command(startapp)
|
||||
|
||||
# register errors
|
||||
app.register_error_handler(413, request_entity_too_large)
|
||||
register_error_handlers(app)
|
||||
# app.register_error_handler(413, request_entity_too_large)
|
||||
|
||||
return app
|
||||
|
@ -17,13 +17,16 @@ class Config:
|
||||
SQLALCHEMY_TRACK_MODIFICATIONS = False
|
||||
SQLALCHEMY_DATABASE_URI = f'sqlite:///{BASE_DIR / "db.sqlite"}'
|
||||
|
||||
DESCRIPTION = 'System PRI'
|
||||
OPENAPI_VERSION = '3.0.2'
|
||||
|
||||
|
||||
class ProductionConfig(Config):
|
||||
DB_SERVER = "0.0.0.0"
|
||||
|
||||
|
||||
class DevelopmentConfig(Config):
|
||||
pass
|
||||
DEBUG = True
|
||||
|
||||
|
||||
class TestingConfig(Config):
|
||||
|
@ -1,7 +1,7 @@
|
||||
from flask import Blueprint
|
||||
from apiflask import APIBlueprint
|
||||
|
||||
from .students import bp as students_bp
|
||||
|
||||
bp = Blueprint("coordinator", __name__, url_prefix="/coordinator")
|
||||
bp = APIBlueprint("coordinator", __name__, url_prefix="/coordinator")
|
||||
|
||||
bp.register_blueprint(students_bp)
|
||||
|
@ -1,117 +1,114 @@
|
||||
from typing import Tuple
|
||||
from random import randint
|
||||
from itertools import islice
|
||||
|
||||
from flask import Blueprint, request, Response
|
||||
from flask import abort
|
||||
from apiflask import APIBlueprint
|
||||
from marshmallow import ValidationError
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from flask_sqlalchemy import get_debug_queries
|
||||
|
||||
from ...students.models import Student
|
||||
from ..schemas import StudentSchema, StudentEditSchema, StudentCreateSchema
|
||||
from ..schemas import StudentSchema, StudentEditSchema, StudentsPaginationSchema, \
|
||||
StudentCreateSchema, MessageSchema, FileSchema, StudentQuerySchema
|
||||
from ...dependencies import db
|
||||
from ..utils import parse_csv
|
||||
from ..exceptions import InvalidNameOrTypeHeaderException
|
||||
from ...base.utils import paginate_models, is_allowed_extensions
|
||||
|
||||
bp = Blueprint("students", __name__, url_prefix="/students")
|
||||
bp = APIBlueprint("students", __name__, url_prefix="/students")
|
||||
|
||||
|
||||
@bp.route("/", methods=["GET"])
|
||||
def list_students() -> Tuple[dict, int]:
|
||||
students_schema = StudentSchema(many=True)
|
||||
|
||||
fullname = request.args.get('fullname')
|
||||
order_by_first_name = request.args.get('order_by_first_name')
|
||||
order_by_last_name = request.args.get('order_by_last_name')
|
||||
page = request.args.get('page')
|
||||
@bp.input(StudentQuerySchema, location='query')
|
||||
@bp.output(StudentsPaginationSchema)
|
||||
def list_students(query: dict) -> dict:
|
||||
fullname = query.get('fullname')
|
||||
order_by_first_name = query.get('order_by_first_name')
|
||||
order_by_last_name = query.get('order_by_last_name')
|
||||
page = query.get('page')
|
||||
|
||||
student_query = Student.search_by_fullname_and_order_by_first_name_or_last_name(fullname, order_by_first_name,
|
||||
order_by_last_name)
|
||||
|
||||
response = paginate_models(page, student_query)
|
||||
if isinstance(response, tuple):
|
||||
return response
|
||||
if (message := response.get('message')) is not None:
|
||||
abort(response['status_code'], message)
|
||||
# print(get_debug_queries()[0])
|
||||
return {"students": students_schema.dump(response['items']), "max_pages": response['max_pages']}, 200
|
||||
return {
|
||||
"students": response['items'],
|
||||
"max_pages": response['max_pages']
|
||||
}
|
||||
|
||||
|
||||
@bp.route("/<int:index>/", methods=["GET"])
|
||||
def detail_student(index: int) -> Tuple[dict, int]:
|
||||
student_schema = StudentSchema()
|
||||
@bp.output(StudentSchema)
|
||||
def detail_student(index: int) -> Student:
|
||||
student = Student.query.filter_by(index=index).first()
|
||||
if student is not None:
|
||||
return student_schema.dump(student), 200
|
||||
return {"error": f"Student with {index} index doesn't exist!"}, 404
|
||||
if student is None:
|
||||
abort(404, f"Student with {index} index doesn't exist!")
|
||||
return student
|
||||
|
||||
|
||||
@bp.route("/<int:index>/", methods=["DELETE"])
|
||||
def delete_student(index: int) -> Tuple[dict, int]:
|
||||
@bp.output(MessageSchema, status_code=202)
|
||||
def delete_student(index: int) -> dict:
|
||||
student = Student.query.filter_by(index=index).first()
|
||||
if student is not None:
|
||||
db.session.delete(student)
|
||||
db.session.commit()
|
||||
return {"message": "Student was deleted!"}, 202
|
||||
return {"error": f"Student with {index} index doesn't exist!"}, 404
|
||||
if student is None:
|
||||
abort(404, f"Student with {index} index doesn't exist!")
|
||||
db.session.delete(student)
|
||||
db.session.commit()
|
||||
return {"message": "Student was deleted!"}
|
||||
|
||||
|
||||
@bp.route("/<int:index>/", methods=["PUT"])
|
||||
def edit_student(index: int) -> Tuple[dict, int]:
|
||||
request_data = request.get_json()
|
||||
student_schema = StudentEditSchema()
|
||||
|
||||
@bp.input(StudentEditSchema)
|
||||
@bp.output(MessageSchema)
|
||||
def edit_student(index: int, data: dict) -> dict:
|
||||
try:
|
||||
data = student_schema.load(request_data)
|
||||
if not data:
|
||||
return {"error": "You have passed empty data!"}, 400
|
||||
abort(400, 'You have passed empty data!')
|
||||
|
||||
student_query = Student.query.filter_by(index=index)
|
||||
student = student_query.first()
|
||||
|
||||
if student is None:
|
||||
return {"error": "Not Found student!"}, 404
|
||||
abort(404, 'Not found student!')
|
||||
|
||||
student_query.update(data)
|
||||
db.session.commit()
|
||||
except ValidationError as e:
|
||||
return {"error": e.messages}, 422
|
||||
return {"message": "Student was updated!"}, 200
|
||||
abort(422, e.messages)
|
||||
return {"message": "Student was updated!"}
|
||||
|
||||
|
||||
@bp.route("/", methods=["POST"])
|
||||
def create_student() -> Tuple[dict, int] or Response:
|
||||
request_data = request.get_json()
|
||||
student_schema = StudentCreateSchema()
|
||||
|
||||
@bp.input(StudentCreateSchema)
|
||||
@bp.output(MessageSchema)
|
||||
def create_student(data: dict) -> dict:
|
||||
try:
|
||||
data = student_schema.load(request_data)
|
||||
|
||||
index = data['index']
|
||||
student = Student.query.filter_by(index=index).first()
|
||||
if student is not None:
|
||||
return {"error": "Student has already exists!"}, 400
|
||||
abort(400, "Student has already exists!")
|
||||
|
||||
dummy_email = f'student{randint(1, 300_000)}@gmail.com'
|
||||
student = Student(**data, email=dummy_email)
|
||||
db.session.add(student)
|
||||
db.session.commit()
|
||||
except ValidationError as e:
|
||||
return {'error': e.messages}, 422
|
||||
return {"message": "Student was created!"}, 200
|
||||
abort(422, e.messages)
|
||||
return {"message": "Student was created!"}
|
||||
|
||||
|
||||
@bp.route("/upload/", methods=["POST"])
|
||||
def upload_students() -> Tuple[dict, int]:
|
||||
"""Maybe in the future move to celery workers"""
|
||||
if (uploaded_file := request.files.get('file')) is None or uploaded_file.filename == '':
|
||||
return {"error": "You didn't attach a csv file!"}, 400
|
||||
@bp.input(FileSchema, location='form_and_files')
|
||||
@bp.output(MessageSchema)
|
||||
def upload_students(file: dict) -> dict:
|
||||
uploaded_file = file.get('file')
|
||||
|
||||
if uploaded_file and is_allowed_extensions(uploaded_file.filename):
|
||||
try:
|
||||
students = parse_csv(uploaded_file)
|
||||
except InvalidNameOrTypeHeaderException:
|
||||
return {"error": "Invalid format of csv file!"}, 400
|
||||
|
||||
try:
|
||||
while True:
|
||||
sliced_students = islice(students, 5)
|
||||
list_of_students = list(sliced_students)
|
||||
@ -120,11 +117,13 @@ def upload_students() -> Tuple[dict, int]:
|
||||
break
|
||||
db.session.add_all(list_of_students)
|
||||
db.session.commit()
|
||||
except InvalidNameOrTypeHeaderException:
|
||||
abort(400, "Invalid format of csv file!")
|
||||
except IntegrityError as e:
|
||||
# print(e)
|
||||
# in the future create sql query checks index and add only these students, which didn't exist in db
|
||||
return {"error": "These students have already exist!"}, 400
|
||||
abort(400, "These students have already exist!")
|
||||
else:
|
||||
return {"error": "Invalid extension of file"}, 400
|
||||
abort(400, "Invalid extension of file")
|
||||
|
||||
return {"message": "Students was created by uploading csv file!"}, 200
|
||||
return {"message": "Students was created by uploading csv file!"}
|
||||
|
@ -23,6 +23,11 @@ class StudentSchema(ma.SQLAlchemyAutoSchema):
|
||||
model = Student
|
||||
|
||||
|
||||
class StudentsPaginationSchema(ma.Schema):
|
||||
students = fields.List(fields.Nested(StudentSchema))
|
||||
max_pages = fields.Integer()
|
||||
|
||||
|
||||
class StudentCreateSchema(ma.Schema):
|
||||
first_name = fields.Str(validate=validate.Length(min=1, max=255), required=True)
|
||||
last_name = fields.Str(validate=validate.Length(min=1, max=255), required=True)
|
||||
@ -35,3 +40,18 @@ class StudentEditSchema(ma.Schema):
|
||||
last_name = fields.Str(validate=validate.Length(min=1, max=255))
|
||||
index = fields.Integer(validate=validate_index)
|
||||
mode = fields.Boolean()
|
||||
|
||||
|
||||
class MessageSchema(ma.Schema):
|
||||
message = fields.Str(required=True)
|
||||
|
||||
|
||||
class FileSchema(ma.Schema):
|
||||
file = fields.Raw(type='file', required=True)
|
||||
|
||||
|
||||
class StudentQuerySchema(ma.Schema):
|
||||
fullname = fields.Str()
|
||||
order_by_first_name = fields.Str()
|
||||
order_by_last_name = fields.Str()
|
||||
page = fields.Integer()
|
||||
|
@ -1,7 +1,18 @@
|
||||
import json
|
||||
from typing import Tuple
|
||||
|
||||
from werkzeug.exceptions import RequestEntityTooLarge
|
||||
from apiflask import APIFlask
|
||||
from werkzeug.exceptions import RequestEntityTooLarge, HTTPException
|
||||
|
||||
|
||||
def request_entity_too_large(error: RequestEntityTooLarge) -> Tuple[dict, int]:
|
||||
return {'error': 'File too large!'}, 413
|
||||
|
||||
|
||||
def register_error_handlers(app: APIFlask):
|
||||
@app.errorhandler(HTTPException)
|
||||
def handle_http_exception(e):
|
||||
response = e.get_response()
|
||||
response.data = json.dumps({'error': e.description})
|
||||
response.content_type = 'application/json'
|
||||
return response
|
||||
|
@ -35,3 +35,4 @@ SQLAlchemy==1.4.36
|
||||
tomli==2.0.1
|
||||
Werkzeug==2.1.2
|
||||
zipp==3.8.0
|
||||
apiflask==1.0.2
|
||||
|
Loading…
Reference in New Issue
Block a user