189 lines
5.7 KiB
Python
189 lines
5.7 KiB
Python
import datetime
|
|
|
|
import pandas as pd
|
|
import pytest
|
|
from flask import current_app
|
|
|
|
from app.base.utils import is_allowed_extensions, order_by_column_name, paginate_models
|
|
from app.coordinator.exceptions import InvalidNameOrTypeHeaderException
|
|
from app.coordinator.utils import (
|
|
check_columns,
|
|
generate_csv,
|
|
generate_range_dates,
|
|
parse_csv,
|
|
)
|
|
from app.dependencies import db
|
|
from app.students.models import Group, Student
|
|
|
|
|
|
def test_is_allowed_extensions(test_app) -> None:
|
|
with test_app.app_context():
|
|
for ext in current_app.config.get("ALLOWED_EXTENSIONS"):
|
|
assert is_allowed_extensions(f"file.{ext}") is True
|
|
|
|
|
|
def test_is_allowed_extensions_with_invalid_extensions(test_app) -> None:
|
|
with test_app.app_context():
|
|
assert is_allowed_extensions("file.invalid_ext") is False
|
|
assert is_allowed_extensions("file") is False
|
|
|
|
|
|
def test_order_by_column_name_ascending_mode(test_app) -> None:
|
|
with test_app.app_context():
|
|
query = order_by_column_name(Student.query, "index", "desc")
|
|
assert 'ORDER BY students."index"' in str(query)
|
|
|
|
|
|
def test_order_by_column_name_descending_mode(test_app) -> None:
|
|
with test_app.app_context():
|
|
query = order_by_column_name(Student.query, "index", "desc")
|
|
assert 'ORDER BY students."index" DESC' in str(query)
|
|
|
|
|
|
def test_paginate_models(test_app_ctx_with_db) -> None:
|
|
with test_app_ctx_with_db:
|
|
st = Student(
|
|
index=123456,
|
|
first_name="Dominic",
|
|
last_name="Smith",
|
|
pesel="99010109876",
|
|
email="xxx@gmail.com",
|
|
)
|
|
st1 = Student(
|
|
index=123457,
|
|
first_name="John",
|
|
last_name="Newton",
|
|
pesel="99010109871",
|
|
email="zzz@gmail.com",
|
|
)
|
|
db.session.add_all([st, st1])
|
|
db.session.commit()
|
|
result = paginate_models(1, Student.query, 1)
|
|
|
|
items = result.get("items", [])
|
|
max_pages = result.get("max_pages", 0)
|
|
|
|
assert len(items) == 1
|
|
assert max_pages == 2
|
|
|
|
|
|
def test_check_columns() -> None:
|
|
dummy_data = {
|
|
"NAZWISKO": ["Smith"],
|
|
"IMIE": ["Dominic"],
|
|
"INDEKS": [343433],
|
|
"PESEL": [90020178654],
|
|
"EMAIL": ["domsmi@gmail.com"],
|
|
}
|
|
df = pd.DataFrame(data=dummy_data)
|
|
assert check_columns(df) is True
|
|
|
|
|
|
def test_check_columns_with_invalid_column_names() -> None:
|
|
dummy_data = {"col1": [1, 2], "col2": [2, 3]}
|
|
df = pd.DataFrame(data=dummy_data)
|
|
assert check_columns(df) is False
|
|
|
|
|
|
def test_check_columns_with_invalid_column_types() -> None:
|
|
dummy_data = {
|
|
"NAZWISKO": [999],
|
|
"IMIE": ["Dominic"],
|
|
"INDEKS": [343433],
|
|
"PESEL": [90020178654],
|
|
"EMAIL": ["domsmi@gmail.com"],
|
|
}
|
|
df = pd.DataFrame(data=dummy_data)
|
|
assert check_columns(df) is False
|
|
|
|
|
|
def get_path_to_fake_data(filename: str) -> str:
|
|
base_dir = current_app.config.get("BASE_DIR", "/")
|
|
return base_dir / "tmp_data" / filename
|
|
|
|
|
|
def test_parse_csv(test_app) -> None:
|
|
with test_app.app_context():
|
|
with open(get_path_to_fake_data("students.csv")) as f:
|
|
students = sorted(list(parse_csv(f)), key=lambda s: s.index)
|
|
indexes = [452790 + i for i in range(3)]
|
|
assert len(students) == len(indexes)
|
|
for st, idx in zip(students, indexes):
|
|
assert st.index == idx
|
|
|
|
|
|
def test_parse_csv_with_invalid_column_header_name_in_csv_file(test_app) -> None:
|
|
with test_app.app_context():
|
|
with open(get_path_to_fake_data("students_column_name.csv")) as f:
|
|
with pytest.raises(InvalidNameOrTypeHeaderException):
|
|
parse_csv(f)
|
|
|
|
|
|
def test_parse_csv_with_invalid_column_type_in_csv_file(test_app) -> None:
|
|
with test_app.app_context():
|
|
with open(get_path_to_fake_data("students_column_type.csv")) as f:
|
|
with pytest.raises(InvalidNameOrTypeHeaderException):
|
|
parse_csv(f)
|
|
|
|
|
|
def test_generate_range_dates() -> None:
|
|
start_date = datetime.datetime(2022, 2, 2, 8, 0, 0, 0)
|
|
end_date = datetime.datetime(2022, 2, 2, 12, 0, 0, 0)
|
|
step = 30
|
|
expected_dates_amount = (end_date - start_date).total_seconds() / 60.0 / step
|
|
dates = list(generate_range_dates(start_date, end_date, step))
|
|
|
|
assert expected_dates_amount == len(dates)
|
|
|
|
for date in dates:
|
|
assert start_date <= date < end_date
|
|
|
|
|
|
def test_generate_csv(test_app_ctx_with_db) -> None:
|
|
students_data = [
|
|
{
|
|
"first_name": "Dominic",
|
|
"last_name": "Smith",
|
|
"email": "xxe@gmail.com",
|
|
"index": 123456,
|
|
"pesel": "98070234293",
|
|
},
|
|
{
|
|
"first_name": "Matthew",
|
|
"last_name": "Cash",
|
|
"email": "zze@gmail.com",
|
|
"index": 123455,
|
|
"pesel": "98070234291",
|
|
},
|
|
{
|
|
"first_name": "Martin",
|
|
"last_name": "Rose",
|
|
"email": "nne@gmail.com",
|
|
"index": 123446,
|
|
"pesel": "98070234223",
|
|
},
|
|
]
|
|
|
|
with test_app_ctx_with_db:
|
|
students = [Student(**data) for data in students_data]
|
|
db.session.add_all(students)
|
|
db.session.commit()
|
|
|
|
gr1 = Group(name="new-project")
|
|
gr2 = Group(name="system-pri")
|
|
gr1.students.append(students[0])
|
|
gr1.students.append(students[1])
|
|
gr2.students.append(students[2])
|
|
db.session.add_all([gr1, gr2])
|
|
db.session.commit()
|
|
|
|
students_and_groups = [
|
|
(students[0], gr1),
|
|
(students[1], gr1),
|
|
(students[2], gr2),
|
|
]
|
|
generated_csv = generate_csv(students_and_groups)
|
|
for data in students_data:
|
|
for value in data.values():
|
|
assert str(value) in generated_csv
|