pracownia-timefall/hr_module/views.py
2021-01-09 18:31:20 +01:00

351 lines
15 KiB
Python

from django.shortcuts import render, redirect
from django.contrib.auth.decorators import login_required
from .forms import UploadFileForm, NewUserForm
from hr_module.handling_functions.data_import_functions import read_and_parse_excel, insert_excel
from hr_module.handling_functions.misc import dictfetchall
from hr_module.handling_functions.insert_to_plan import UpdatePlan
from hr_module.handling_functions.monthly_planning_functions import create_planning_operation_report
import pandas as pd
import json
from .models import Employee, PlanCreationLog, Plan, TimeLog
from django.contrib.auth.models import User
from django.http import HttpResponse, JsonResponse, Http404, HttpResponseRedirect
from django.db import connection
import datetime
from django.urls import reverse
from django.conf import settings
login_url = '/hr_module/login'
# Create your views here.
@login_required(login_url=login_url)
def create_schedule(request):
session_user = User.objects.select_related('employee').get(username=request.user.username)
if not session_user.employee.manager_flag and not request.user.is_superuser:
return HttpResponseRedirect(reverse('employee_module:homepage'))
template_name = 'hr_module_create_schedule.html'
return render(request, template_name)
@login_required(login_url=login_url)
def homepage(request):
session_user = User.objects.select_related('employee').get(username=request.user.username)
if not session_user.employee.manager_flag and not request.user.is_superuser:
return HttpResponseRedirect(reverse('employee_module:homepage'))
template_name = 'hr_module_home.html'
return render(request, template_name)
@login_required(login_url=login_url)
def change_employee_data(request):
session_user = User.objects.select_related('employee').get(username=request.user.username)
if not session_user.employee.manager_flag and not request.user.is_superuser:
return HttpResponseRedirect(reverse('employee_module:homepage'))
template_name = 'hr_module_change_employee_data.html'
return render(request, template_name)
def change_employee_data_api(request):
if request.method == 'POST':
if request.user.is_authenticated:
session_user = User.objects.select_related('employee').get(username=request.user.username)
session_user_username = session_user.username
session_user_manager_flag = session_user.employee.manager_flag
body = json.loads(request.body)
username = body['username']
if request.user.is_superuser:
try:
empl = User.objects.select_related('employee').get(username=username)
except Exception as e:
print(e)
empl = None
elif session_user_manager_flag == True:
try:
print('checking manager flag')
empl = User.objects.select_related('employee').get(username=username)
if empl.employee.manager_username != session_user_username:
empl = None
except Exception as e:
print(e)
empl = None
else:
empl = None
if empl is None:
return JsonResponse({'error': 'no_access_or_no_username'})
record_employee = empl.employee.__dict__
record_user = empl.__dict__
response_dict = dict(record_user, **record_employee)
for i in ['_state', 'password']:
response_dict.pop(i)
print(response_dict)
return JsonResponse(response_dict, safe=False)
else:
return JsonResponse({'error': 'not_authenticated'})
@login_required(login_url=login_url)
def create_employees(request):
if not request.user.is_superuser:
return HttpResponseRedirect(reverse('hr_module:homepage'))
if request.method == 'POST':
if 'import_preview' in request.POST:
fileform = UploadFileForm(request.POST, request.FILES)
print(fileform.is_valid())
if fileform.is_valid():
uploaded_file = request.FILES['file']
df_dict = read_and_parse_excel(uploaded_file)
request.session['df_path'] = df_dict['df_path']
df_html = df_dict['df_html']
context = {'df_html': df_html}
template = 'hr_module_import_validation.html'
return render(request, template, context)
if 'import_insert' in request.POST:
df = pd.read_csv(request.session['df_path'])
insert_excel(df)
del request.session['df_path']
template = 'hr_module_import_success.html'
return render(request, template)
if 'import_single' in request.POST:
df_dict = {k: v[0] for k, v in dict(request.POST).items()}
for i in ('csrfmiddlewaretoken', 'import_single'):
df_dict.pop(i, None)
df = pd.DataFrame(df_dict, index=[0])
insert_excel(df)
print('done')
template = 'hr_module_import_success.html'
return render(request, template)
else:
columns_user = ['first_name',
'last_name',
'username',
'email',
'is_staff',
'is_active',
'is_superuser']
columns_empl = ['department',
'manager_username',
'time_model_id',
'manager_flag']
columns = columns_user + columns_empl
fileform = UploadFileForm()
userform = NewUserForm()
context = {'userform': userform,
'columns': columns,
'fileform': fileform
}
template = 'hr_module_import.html'
return render(request, template, context)
def search_users_api(request):
if request.method == 'POST':
body = json.loads(request.body)
searched_field = body['searched_field']
searched_string = body['searched_string']
print(searched_field, searched_string)
cursor = connection.cursor()
if searched_field == 'department':
cursor.execute('select department as caption, department as column_value from hr_module_employee '
'group by department')
elif searched_field == 'manager':
cursor.execute('select auth.first_name || %s || auth.last_name as caption, empl.manager_username as column_value '
'from hr_module_employee empl inner join auth_user auth on auth.username = empl.manager_username '
'group by auth.first_name || %s || auth.last_name, empl.manager_username', [' ', ' ']
)
elif searched_field == 'username':
cursor.execute('select auth.first_name || %s || auth.last_name as caption, empl.username as column_value from hr_module_employee empl '
'inner join auth_user auth on empl.username = auth.username ', [' ',]
)
response = dictfetchall(cursor)
print(response)
return JsonResponse(response, safe=False)
def load_employees_api(request):
if request.method == 'POST':
body = json.loads(request.body)
searched_field = body['searched_field']
searched_string = body['searched_string']
print(searched_field, searched_string)
cursor = connection.cursor()
if searched_field == 'department':
print('Deparment executed')
cursor.execute('select auth.first_name || %s || auth.last_name as name, auth.username as username, '
'empl.manager_username as manager_name, tm.daily_hours, '
'tm.mon, tm.tue, tm.wed, tm.thu, tm.fri, tm.sat, tm.fri '
'from hr_module_employee empl inner join auth_user auth on auth.username = empl.username '
'inner join hr_module_timemodel tm on empl.time_model_id = tm.time_model_id '
'where department = %s', [' ', searched_string])
elif searched_field == 'manager':
print('Manager executed')
cursor.execute('select auth.first_name || %s || auth.last_name as name, auth.username as username, '
'empl.manager_username as manager_name, tm.daily_hours, '
'tm.mon, tm.tue, tm.wed, tm.thu, tm.fri, tm.sat, tm.fri '
'from hr_module_employee empl inner join auth_user auth on auth.username = empl.manager_username '
'inner join hr_module_timemodel tm on empl.time_model_id = tm.time_model_id '
'where manager_username = %s', [' ', searched_string])
elif searched_field == 'username':
print('Username executed')
cursor.execute('select auth.first_name || %s || auth.last_name as name, auth.username as username, '
'empl.manager_username as manager_name, tm.daily_hours, '
'tm.mon, tm.tue, tm.wed, tm.thu, tm.fri, tm.sat, tm.sun '
'from hr_module_employee empl '
'inner join auth_user auth on auth.username = empl.username '
'inner join hr_module_timemodel tm on empl.time_model_id = tm.time_model_id '
'where empl.username = %s ', [' ', searched_string])
response = dictfetchall(cursor)
print(response)
return JsonResponse(response, safe=False)
def new_plan_api(request):
if request.method == 'POST':
body = json.loads(request.body)['json']
log = []
for item in body:
update_object = UpdatePlan(item)
insert_result = update_object.run_inserting()
log = log + insert_result
report_location = create_planning_operation_report(log, request.user.username)
user_obj = Employee.objects.get(pk=request.user)
plan_creation_log = PlanCreationLog(username=user_obj, report_location=report_location)
plan_creation_log.save()
return JsonResponse(log, safe=False)
@login_required(login_url=login_url)
def manage_schedule(request):
session_user = User.objects.select_related('employee').get(username=request.user.username)
if not session_user.employee.manager_flag and not request.user.is_superuser:
return HttpResponseRedirect(reverse('employee_module:homepage'))
template = 'hr_module_show_schedule.html'
return render(request, template)
def show_employee_plan_api(request):
if request.method == 'POST':
body = json.loads(request.body)
username = body['username']
start_date = body['start_date'].split('T')[0]
end_date = body['end_date'].split('T')[0]
start_date = datetime.datetime.strptime(start_date, '%Y-%m-%d')
end_date = datetime.datetime.strptime(end_date, '%Y-%m-%d')
user_obj = Employee.objects.get(pk=username)
query_result = Plan.objects.filter(username=user_obj, date__range=[start_date, end_date]).order_by('date', 'begin_time')
response = list(query_result.values())
return JsonResponse(response, safe=False)
def update_plan_api(request):
if request.method == 'POST':
body = json.loads(request.body)
print(body)
if body['action'] == 'delete':
for i in body['records']:
record_id = i['id']
Plan.objects.filter(id=record_id).delete()
if body['action'] == 'update':
for i in body['records']:
record_id = i['id']
record_start_time = i['begin_time']
record_end_time = i['end_time']
record = Plan.objects.get(pk=record_id)
record.begin_time = record_start_time
record.end_time = record_end_time
record.save()
response = {'true': 'true'}
return JsonResponse(response, safe=False)
@login_required(login_url=login_url)
def manage_timelog(request):
session_user = User.objects.select_related('employee').get(username=request.user.username)
if not session_user.employee.manager_flag and not request.user.is_superuser:
return HttpResponseRedirect(reverse('employee_module:homepage'))
template = 'hr_module_show_timelog.html'
return render(request, template)
def show_employee_timelog_api(request):
if request.method == 'POST':
body = json.loads(request.body)
username = body['username']
start_date = body['start_date'].split('T')[0]
end_date = body['end_date'].split('T')[0]
start_date = datetime.datetime.strptime(start_date, '%Y-%m-%d')
end_date = datetime.datetime.strptime(end_date, '%Y-%m-%d')
user_obj = Employee.objects.get(pk=username)
query_result = TimeLog.objects.filter(username=user_obj, date__range= [start_date, end_date]).order_by('date', 'begin_time')
response = list(query_result.values())
return JsonResponse(response, safe=False)
def update_timelog_api(request):
if request.method == 'POST':
body = json.loads(request.body)
print(body)
if body['action'] == 'delete':
for i in body['records']:
record_id = i['id']
TimeLog.objects.filter(id=record_id).delete()
if body['action'] == 'update':
for i in body['records']:
record_id = i['id']
record_start_time = i['begin_time']
record_end_time = i['end_time']
record = TimeLog.objects.get(pk=record_id)
record.begin_time = record_start_time
record.end_time = record_end_time
record.save()
response = {'true': 'true'}
return JsonResponse(response, safe=False)
@login_required(login_url=login_url)
def plan_creation_log(request):
session_user = User.objects.select_related('employee').get(username=request.user.username)
session_user_manager_flag = session_user.employee.manager_flag
if request.user.is_superuser:
creation_log = PlanCreationLog.objects.all().order_by('-creation_date')
elif session_user_manager_flag:
creation_log = PlanCreationLog.objects.filter(report_location__contains=request.user.username).order_by('-creation_date')
context = {'creation_log': creation_log,
'path': settings.IMPORT_REPORT_STORAGE}
template_name = 'hr_module_creation_log.html'
return render(request, context=context, template_name=template_name)