1146 lines
42 KiB
Python
1146 lines
42 KiB
Python
from flask import Flask, render_template, request, json, session, redirect, Blueprint, url_for, flash, Response, send_from_directory, stream_with_context, g
|
|
from flask_wtf import Form
|
|
from wtforms.fields import StringField, SubmitField
|
|
from wtforms.validators import Required
|
|
#from flaskext.mysql import MySQL
|
|
#from pony.orm import *
|
|
import cv2
|
|
import logging as log
|
|
import datetime as dt
|
|
from time import sleep
|
|
from flask_socketio import SocketIO
|
|
from flask_socketio import emit, join_room, leave_room
|
|
import smtplib
|
|
from oauth2client import file, client, tools
|
|
from googleapiclient.discovery import build
|
|
from googleapiclient.http import MediaFileUpload
|
|
from httplib2 import Http
|
|
from oauth2client.service_account import ServiceAccountCredentials
|
|
import os
|
|
from werkzeug.utils import secure_filename
|
|
from camera import VideoCamera
|
|
import pyaudio
|
|
import sqlite3
|
|
from flask_mail import Mail, Message
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.header import Header
|
|
from datetime import datetime
|
|
from dateutil import parser
|
|
|
|
|
|
main = Blueprint('main', __name__)
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = '523tvb6h3nio6r21cc4r1qx'
|
|
app.debug = True
|
|
socketio = SocketIO()
|
|
app.register_blueprint(main)
|
|
socketio.init_app(app)
|
|
|
|
me = "wirtualna.uczelnia.2018@gmail.com"
|
|
my_password = "korniszon"
|
|
|
|
|
|
app.config.update(MAIL_SERVER = "smtp.gmail.com",
|
|
MAIL_PORT = 465,
|
|
MAIL_USE_SSL = True,
|
|
MAIL_USERNAME = "wirtualna.uczelnia.2018@gmail.com",
|
|
MAIL_PASSWORD = "korniszon")
|
|
|
|
mail = Mail(app)
|
|
|
|
# MySQL configurations
|
|
#app.config['MYSQL_DATABASE_USER'] = 'root'
|
|
#app.config['MYSQL_DATABASE_PASSWORD'] = 'localhost'
|
|
#app.config['MYSQL_DATABASE_DB'] = 'wu'
|
|
#app.config['MYSQL_DATABASE_HOST'] = 'localhost'
|
|
#mysql = MySQL()
|
|
#mysql.init_app(app)
|
|
|
|
UPLOAD_FOLDER = './uploads'
|
|
ALLOWED_EXTENSIONS = set(['txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'])
|
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
|
|
|
|
DATABASE = 'database.db'
|
|
|
|
# connection = sqlite3.connect(DATABASE, isolation_level = None)
|
|
|
|
def allowed_file(filename):
|
|
return '.' in filename and \
|
|
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
@app.route('/groupChat', methods=['GET', 'POST'])
|
|
def indexChat():
|
|
if "user" in session:
|
|
"""Login form to enter a room."""
|
|
form = LoginForm()
|
|
if form.validate_on_submit():
|
|
session['name'] = session["user"].split('@')[0] #form.name.data
|
|
session['room'] = "Czat grupowy" #form.room.data
|
|
return redirect(url_for('.chat'))
|
|
elif request.method == 'GET':
|
|
form.name.data = session.get('name', '')
|
|
form.room.data = session.get('room', '')
|
|
return render_template('indexChat.html', form=form)
|
|
else:
|
|
return render_template("error.html", error = "Brak dostępu!")
|
|
|
|
@app.route('/chat')
|
|
def chat():
|
|
if "user" in session:
|
|
"""Chat room. The user's name and room must be stored in
|
|
the session."""
|
|
name = session["user"] #session.get('name', '')
|
|
room = "Czat grupowy" #session.get('room', '')
|
|
if name == '' or room == '' or name == None:
|
|
return redirect("index.html")
|
|
return render_template('chat.html', name=name, room=room)
|
|
else:
|
|
return render_template("error.html", error = "Brak dostępu!")
|
|
|
|
@socketio.on('joined', namespace='/chat')
|
|
def joined(message):
|
|
"""Sent by clients when they enter a room.
|
|
A status message is broadcast to all people in the room."""
|
|
room = "Czat grupowy" #session.get('room')
|
|
join_room(room)
|
|
emit('status', {'msg': session.get('user').split('@')[0] + ' wszedł na czat'}, room=room)
|
|
|
|
|
|
@socketio.on('text', namespace='/chat')
|
|
def text(message):
|
|
"""Sent by a client when the user entered a new message.
|
|
The message is sent to all people in the room."""
|
|
room = "Czat grupowy" #session.get('room')
|
|
emit('message', {'msg': session.get('user').split('@')[0] + ':' + message['msg']}, room=room)
|
|
|
|
|
|
@socketio.on('left', namespace='/chat')
|
|
def left(message):
|
|
"""Sent by clients when they leave a room.
|
|
A status message is broadcast to all people in the room."""
|
|
room = "Czat grupowy" #session.get('room')
|
|
leave_room(room)
|
|
emit('status', {'msg': session.get('user').split('@')[0] + ' opuścił czat'}, room=room)
|
|
|
|
class LoginForm(Form):
|
|
"""Accepts a nickname and a room."""
|
|
name = StringField('Name', validators=[Required()])
|
|
room = StringField('Room', validators=[Required()])
|
|
submit = SubmitField('Wejdź na czat')
|
|
|
|
faces_list = []
|
|
testing_face = 0
|
|
service = None
|
|
drive_service = None
|
|
CALENDAR_SCOPES = "https://www.googleapis.com/auth/calendar.events"
|
|
DRIVE_SCOPES = "https://www.googleapis.com/auth/drive"
|
|
|
|
|
|
@socketio.on('message')
|
|
def handleMessage(msg):
|
|
print('Message: ' + msg.decode("utf-8"))
|
|
send(msg, broadcast=True)
|
|
|
|
@app.route("/")
|
|
def main():
|
|
store = file.Storage('token.json')
|
|
creds = store.get()
|
|
if not creds or creds.invalid:
|
|
flow = client.flow_from_clientsecrets('credentials.json', CALENDAR_SCOPES)
|
|
creds = tools.run_flow(flow, store)
|
|
service = build('calendar', 'v3', http=creds.authorize(Http()))
|
|
|
|
if not creds or creds.invalid:
|
|
flow = client.flow_from_clientsecrets('credentials.json', DRIVE_SCOPES)
|
|
creds = tools.run_flow(flow, store)
|
|
drive_service = build('drive', 'v3', http=creds.authorize(Http()))
|
|
|
|
connection = sqlite3.connect('database.db', isolation_level = None)
|
|
cur = connection.cursor()
|
|
cursor = cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='logins'")
|
|
results = cursor.fetchall()
|
|
if len(results) > 0:
|
|
pass
|
|
else:
|
|
cur.execute("CREATE TABLE logins (login TEXT PRIMARY KEY)")
|
|
connection.close()
|
|
return render_template('index.html', info="Witaj!")
|
|
|
|
@app.route("/indexENG")
|
|
def indexENG():
|
|
return render_template('indexENG.html', info="Hello!")
|
|
|
|
@app.route('/showSignUp')
|
|
def showSignUp():
|
|
return render_template('signup.html')
|
|
|
|
@app.route('/showSignUpENG')
|
|
def showSignUpENG():
|
|
return render_template('signupENG.html')
|
|
|
|
@app.route('/showSignIn')
|
|
def showSignIn():
|
|
return render_template('signin.html')
|
|
|
|
@app.route('/showSignInENG')
|
|
def showSignInENG():
|
|
return render_template('signinENG.html')
|
|
|
|
@app.route('/signUp',methods=['POST','GET'])
|
|
def signUp():
|
|
try:
|
|
_login = request.form['inputLogin']
|
|
_password = request.form['inputPassword']
|
|
|
|
if _login and _password:
|
|
conn = mysql.connect()
|
|
cursor = conn.cursor()
|
|
#_hashed_password = generate_password_hash(_password)
|
|
|
|
args = (_login, _password,)
|
|
cursor.execute("INSERT into Users(login, password) values(?, ?)", args)
|
|
data = cursor.fetchall()
|
|
|
|
if len(data) is 0:
|
|
conn.commit()
|
|
return render_template("index.html", info="Pomyślnie zarejestrowano!")
|
|
#return json.dumps({'message':'User created successfully !'})
|
|
else:
|
|
return render_template("error.html", error="Użytkownik już istnieje!")
|
|
#return json.dumps({'error':str(data[0])})
|
|
else:
|
|
return json.dumps({'html':'<span>Enter the required fields</span>'})
|
|
|
|
#except Exception as e:
|
|
# return json.dumps({'error':str(e)})
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
@app.route('/signUpENG',methods=['POST','GET'])
|
|
def signUpENG():
|
|
try:
|
|
_login = request.form['inputLogin']
|
|
_password = request.form['inputPassword']
|
|
|
|
if _login and _password:
|
|
conn = mysql.connect()
|
|
cursor = conn.cursor()
|
|
#_hashed_password = generate_password_hash(_password)
|
|
|
|
args = (_login, _password,)
|
|
cursor.execute("INSERT into Users(login, password) values(?, ?)", args)
|
|
data = cursor.fetchall()
|
|
|
|
if len(data) is 0:
|
|
conn.commit()
|
|
return render_template("indexENG.html", info="Successfully registered!")
|
|
#return json.dumps({'message':'User created successfully !'})
|
|
else:
|
|
return render_template("errorENG.html", error="The user already exists!")
|
|
#return json.dumps({'error':str(data[0])})
|
|
else:
|
|
return json.dumps({'html':'<span>Enter the required fields</span>'})
|
|
|
|
#except Exception as e:
|
|
# return json.dumps({'error':str(e)})
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
@app.route('/validateLogin', methods=['POST'])
|
|
def validateLogin():
|
|
#con, cursor = None, None
|
|
#try:
|
|
|
|
#_username = request.form['inputLogin']
|
|
gmail_username = request.form['inputLogin']
|
|
#_password = request.form['inputPassword']
|
|
gmail_password = request.form['inputPassword']
|
|
|
|
try:
|
|
server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
|
|
server.ehlo()
|
|
server.login(gmail_username, gmail_password)
|
|
if "@gmail.com" not in gmail_username:
|
|
session['user'] = gmail_username + "@gmail.com" #.split('@')[0]
|
|
else:
|
|
session["user"] = gmail_username
|
|
if "wirtualna.uczelnia.2018" in gmail_username:
|
|
session["role"] = "admin"
|
|
else:
|
|
session["role"] = "user"
|
|
connection = sqlite3.connect('database.db', isolation_level = None)
|
|
cur = connection.cursor()
|
|
cursor = cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='tests'")
|
|
results = cursor.fetchall()
|
|
if results is not None and session["role"] == "admin" and len(results) != 0:
|
|
cursor = cur.execute("SELECT title from tests")
|
|
results = cursor.fetchone()
|
|
if results is not None and len(results) != 0:
|
|
test = True
|
|
else:
|
|
test = False
|
|
else:
|
|
test = False
|
|
cursor = cur.execute("SELECT login from logins")
|
|
logins = cursor.fetchall()
|
|
print (logins)
|
|
emails = []
|
|
for login in logins:
|
|
emails.append(login[0])
|
|
print (emails)
|
|
if session["user"].split("@")[0] not in emails:
|
|
cur.execute("INSERT INTO logins(login) values(?)", (session["user"].split("@")[0],))
|
|
cursor = cur.execute("SELECT login from logins")
|
|
logins = cursor.fetchall()
|
|
connection.close()
|
|
return render_template('userHome.html', user=session['user'].split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = test, logins = logins)
|
|
# else:
|
|
# return render_template('error.html',error = 'Zły email lub hasło!')
|
|
except smtplib.SMTPAuthenticationError:
|
|
return render_template('error.html',error = 'Zły email lub hasło!')
|
|
|
|
|
|
|
|
@app.route('/validateLoginENG',methods=['POST'])
|
|
def validateLoginENG():
|
|
#con, cursor = None, None
|
|
#try:
|
|
|
|
#_username = request.form['inputLogin']
|
|
gmail_username = request.form['inputLogin']
|
|
#_password = request.form['inputPassword']
|
|
gmail_password = request.form['inputPassword']
|
|
|
|
try:
|
|
server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
|
|
server.ehlo()
|
|
server.login(gmail_username, gmail_password)
|
|
session['user'] = gmail_username#.split('@')[0]
|
|
return render_template('userHomeENG.html', user=gmail_username.split('@')[0], test = False)
|
|
except:
|
|
return render_template('errorENG.html',error = 'Wrong email or password!')
|
|
|
|
'''
|
|
con = mysql.connect()
|
|
cursor = con.cursor()
|
|
#cursor.callproc('logowanie',(_username,))
|
|
args = (_username, _password)
|
|
query = "SELECT * from Users where login = ? and password = ?"
|
|
cursor.execute(query, args)
|
|
data = cursor.fetchall()
|
|
|
|
if len(data) > 0:
|
|
#if check_password_hash(str(data[0][3]),_password):
|
|
#if str(data[0][3]) == _password:
|
|
if str(data[0][1]) == _password:
|
|
session['user'] = data[0][0]
|
|
#return redirect('/userHomeENG', user=session['user'])
|
|
return render_template('userHomeENG.html', user=session['user'])
|
|
else:
|
|
return render_template('errorENG.html',error = 'Wrong email or password!')
|
|
else:
|
|
return render_template('errorENG.html',error = 'Wrong email or password!')
|
|
|
|
except Exception as e:
|
|
return render_template('errorENG.html', error = str(e))
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
if con:
|
|
con.close()
|
|
'''
|
|
|
|
@app.route('/userHome')
|
|
def userHome():
|
|
if session.get('user'):
|
|
connection = sqlite3.connect('database.db', isolation_level = None)
|
|
cur = connection.cursor()
|
|
cursor = cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='tests'")
|
|
results = cursor.fetchall()
|
|
cursor = cur.execute("SELECT login from logins")
|
|
logins = cursor.fetchall()
|
|
connection.close()
|
|
if len(results) > 0:
|
|
return render_template('userHome.html', user=session['user'].split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = True, logins = logins)
|
|
else:
|
|
return render_template('userHome.html', user=session['user'].split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = False, logins = logins)
|
|
else:
|
|
return render_template('error.html',error = 'Nieautoryzowany dostęp!')
|
|
|
|
@app.route('/userHomeENG')
|
|
def userHomeENG():
|
|
if session.get('user'):
|
|
return render_template('userHomeENG.html', user=session['user'].split('@')[0], test = False)
|
|
else:
|
|
return render_template('errorENG.html',error = 'Unauthorized access!')
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
session.pop('user', None)
|
|
session.pop('role', None)
|
|
return redirect('/')
|
|
|
|
@app.route('/logoutENG')
|
|
def logoutENG():
|
|
session.pop('user',None)
|
|
return redirect('/indexENG')
|
|
|
|
@app.route('/signUpWithFace')
|
|
def signUpWithFace():
|
|
cascPath = "haarcascade_frontalface_default.xml"
|
|
faceCascade = cv2.CascadeClassifier(cascPath)
|
|
log.basicConfig(filename='webcam.log',level=log.INFO)
|
|
|
|
video_capture = cv2.VideoCapture(0)
|
|
anterior = 0
|
|
faces = None
|
|
i = 0
|
|
|
|
while True:
|
|
if not video_capture.isOpened():
|
|
print('Unable to load camera.')
|
|
sleep(5)
|
|
pass
|
|
|
|
# Capture frame-by-frame
|
|
ret, frame = video_capture.read()
|
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
|
faces = faceCascade.detectMultiScale(
|
|
gray,
|
|
scaleFactor=1.1,
|
|
minNeighbors=5,
|
|
minSize=(30, 30)
|
|
)
|
|
|
|
# Draw a rectangle around the faces
|
|
for (x, y, w, h) in faces:
|
|
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
|
|
|
|
if anterior != len(faces):
|
|
anterior = len(faces)
|
|
log.info("faces: "+str(len(faces))+" at "+str(dt.datetime.now()))
|
|
|
|
|
|
# Display the resulting frame
|
|
cv2.imshow('Video', frame)
|
|
|
|
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
break
|
|
|
|
# Display the resulting frame
|
|
cv2.imshow('Video', frame)
|
|
|
|
if type(faces) is not tuple:
|
|
i += 1
|
|
face = 0
|
|
for f in faces[0]:
|
|
face += f
|
|
faces_list.append(face)
|
|
if i == 5:
|
|
break
|
|
|
|
# When everything is done, release the capture
|
|
video_capture.release()
|
|
cv2.destroyAllWindows()
|
|
print (faces_list)
|
|
return render_template('index.html', info="Zapisano twarz")
|
|
|
|
@app.route('/signUpWithFaceENG')
|
|
def signUpWithFaceENG():
|
|
cascPath = "haarcascade_frontalface_default.xml"
|
|
faceCascade = cv2.CascadeClassifier(cascPath)
|
|
log.basicConfig(filename='webcam.log',level=log.INFO)
|
|
|
|
video_capture = cv2.VideoCapture(0)
|
|
anterior = 0
|
|
faces = None
|
|
i = 0
|
|
|
|
while True:
|
|
if not video_capture.isOpened():
|
|
print('Unable to load camera.')
|
|
sleep(5)
|
|
pass
|
|
|
|
# Capture frame-by-frame
|
|
ret, frame = video_capture.read()
|
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
|
faces = faceCascade.detectMultiScale(
|
|
gray,
|
|
scaleFactor=1.1,
|
|
minNeighbors=5,
|
|
minSize=(30, 30)
|
|
)
|
|
|
|
# Draw a rectangle around the faces
|
|
for (x, y, w, h) in faces:
|
|
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
|
|
|
|
if anterior != len(faces):
|
|
anterior = len(faces)
|
|
log.info("faces: "+str(len(faces))+" at "+str(dt.datetime.now()))
|
|
|
|
|
|
# Display the resulting frame
|
|
cv2.imshow('Video', frame)
|
|
|
|
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
break
|
|
|
|
# Display the resulting frame
|
|
cv2.imshow('Video', frame)
|
|
|
|
if type(faces) is not tuple:
|
|
i += 1
|
|
face = 0
|
|
for f in faces[0]:
|
|
face += f
|
|
faces_list.append(face)
|
|
if i == 5:
|
|
break
|
|
|
|
# When everything is done, release the capture
|
|
video_capture.release()
|
|
cv2.destroyAllWindows()
|
|
print (faces_list)
|
|
return render_template('indexENG.html', info="A face has been saved")
|
|
|
|
@app.route('/signInWithFace')
|
|
def signInWithFace():
|
|
cascPath = "haarcascade_frontalface_default.xml"
|
|
faceCascade = cv2.CascadeClassifier(cascPath)
|
|
log.basicConfig(filename='webcam.log',level=log.INFO)
|
|
|
|
video_capture = cv2.VideoCapture(0)
|
|
anterior = 0
|
|
faces = None
|
|
testing_face = 825
|
|
i = 0
|
|
|
|
while True:
|
|
if not video_capture.isOpened():
|
|
print('Unable to load camera.')
|
|
sleep(5)
|
|
pass
|
|
|
|
# Capture frame-by-frame
|
|
ret, frame = video_capture.read()
|
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
|
faces = faceCascade.detectMultiScale(
|
|
gray,
|
|
scaleFactor=1.1,
|
|
minNeighbors=5,
|
|
minSize=(30, 30)
|
|
)
|
|
|
|
# Draw a rectangle around the faces
|
|
for (x, y, w, h) in faces:
|
|
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
|
|
|
|
if anterior != len(faces):
|
|
anterior = len(faces)
|
|
log.info("faces: "+str(len(faces))+" at "+str(dt.datetime.now()))
|
|
|
|
|
|
# Display the resulting frame
|
|
cv2.imshow('Video', frame)
|
|
|
|
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
break
|
|
|
|
# Display the resulting frame
|
|
cv2.imshow('Video', frame)
|
|
|
|
#print (faces_list)
|
|
if type(faces) is not tuple:
|
|
face = 0
|
|
i += 1
|
|
for f in faces[0]:
|
|
face += f
|
|
if face in faces_list:
|
|
break
|
|
elif i == 20:
|
|
return render_template("error.html", error="Brak dostępu!")
|
|
|
|
# When everything is done, release the capture
|
|
video_capture.release()
|
|
cv2.destroyAllWindows()
|
|
connection = sqlite3.connect('database.db', isolation_level = None)
|
|
cur = connection.cursor()
|
|
cursor = cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='tests'")
|
|
results = cursor.fetchall()
|
|
if len(results) != 0:
|
|
cursor = cur.execute("SELECT title from tests")
|
|
results = cursor.fetchone()
|
|
if len(results) != 0:
|
|
test = True
|
|
else:
|
|
test = False
|
|
else:
|
|
test = False
|
|
cursor = cur.execute("SELECT login from logins")
|
|
logins = cursor.fetchall()
|
|
connection.close()
|
|
return render_template('userHome.html', info="Zapisano twarz", user=session.get('user').split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = test, logins = logins)
|
|
|
|
@app.route('/signInWithFaceENG')
|
|
def signInWithFaceENG():
|
|
cascPath = "haarcascade_frontalface_default.xml"
|
|
faceCascade = cv2.CascadeClassifier(cascPath)
|
|
log.basicConfig(filename='webcam.log',level=log.INFO)
|
|
|
|
video_capture = cv2.VideoCapture(0)
|
|
anterior = 0
|
|
faces = None
|
|
testing_face = 825
|
|
i = 0
|
|
|
|
while True:
|
|
if not video_capture.isOpened():
|
|
print('Unable to load camera.')
|
|
sleep(5)
|
|
pass
|
|
|
|
# Capture frame-by-frame
|
|
ret, frame = video_capture.read()
|
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
|
faces = faceCascade.detectMultiScale(
|
|
gray,
|
|
scaleFactor=1.1,
|
|
minNeighbors=5,
|
|
minSize=(30, 30)
|
|
)
|
|
|
|
# Draw a rectangle around the faces
|
|
for (x, y, w, h) in faces:
|
|
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
|
|
|
|
if anterior != len(faces):
|
|
anterior = len(faces)
|
|
log.info("faces: "+str(len(faces))+" at "+str(dt.datetime.now()))
|
|
|
|
|
|
# Display the resulting frame
|
|
cv2.imshow('Video', frame)
|
|
|
|
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
break
|
|
|
|
# Display the resulting frame
|
|
cv2.imshow('Video', frame)
|
|
|
|
#print (faces_list)
|
|
if type(faces) is not tuple:
|
|
face = 0
|
|
i += 1
|
|
for f in faces[0]:
|
|
face += f
|
|
if face in faces_list:
|
|
break
|
|
elif i == 20:
|
|
return render_template("errorENG.html", error="Lack of access!")
|
|
|
|
|
|
# When everything is done, release the capture
|
|
video_capture.release()
|
|
cv2.destroyAllWindows()
|
|
return render_template('userHomeENG.html', info="A face has been saved", user=session.get('user').split('@')[0], test = False)
|
|
|
|
@app.route('/addEvent', methods=['POST'])
|
|
def addEvent():
|
|
summary = request.form["inputSummary"]
|
|
location = request.form["inputLocation"]
|
|
description = request.form["inputDescription"]
|
|
start = request.form["inputStart"]
|
|
end = request.form["inputEnd"]
|
|
attendees = request.form["inputAttendees"].split(',')
|
|
|
|
datetime_start = parser.parse(start)
|
|
datetime_end = parser.parse(end)
|
|
|
|
print (datetime_start)
|
|
print (datetime_end)
|
|
|
|
if datetime_start > datetime_end:
|
|
return render_template('error.html',error = 'Nieprawidłowy termin')
|
|
|
|
processed_attendees = []
|
|
for attendee in attendees:
|
|
attendee = attendee.strip()
|
|
if "@gmail.com" not in attendee:
|
|
attendee += "@gmail.com"
|
|
processed_attendees.append({'email': attendee})
|
|
|
|
event = { 'summary': summary,
|
|
'location': location,
|
|
'description': description,
|
|
'start': {
|
|
'dateTime': start + ":00+01:00",
|
|
'timeZone': 'Europe/Warsaw'
|
|
},
|
|
'end': {
|
|
'dateTime': end + ":00+01:00",
|
|
'timeZone': 'Europe/Warsaw'
|
|
},
|
|
'attendees': processed_attendees
|
|
}
|
|
|
|
#credentials = ServiceAccountCredentials.from_json_keyfile_name(
|
|
# 'credentials.json',
|
|
# scopes="https://www.googleapis.com/auth/calendar.events")
|
|
#http = httplib2.Http()
|
|
#http = credentials.authorize(http)
|
|
#service = build("calendar", "v3", http=http)
|
|
|
|
#store = file.Storage('token.json')
|
|
#creds = store.get()
|
|
#if not creds or creds.invalid:
|
|
# flow = client.flow_from_clientsecrets('credentials.json', SCOPES)
|
|
# creds = tools.run_flow(flow, store)
|
|
#service = build('calendar', 'v3', http=creds.authorize(Http()))
|
|
|
|
store = file.Storage('token.json')
|
|
creds = store.get()
|
|
if not creds or creds.invalid:
|
|
flow = client.flow_from_clientsecrets('credentials.json', "https://www.googleapis.com/auth/calendar.events")
|
|
creds = tools.run_flow(flow, store)
|
|
service = build('calendar', 'v3', http=creds.authorize(Http()))
|
|
service.events().insert(calendarId='primary', body=event).execute()
|
|
connection = sqlite3.connect('database.db', isolation_level = None)
|
|
cur = connection.cursor()
|
|
cursor = cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='tests'")
|
|
results = cursor.fetchall()
|
|
if len(results) != 0:
|
|
cursor = cur.execute("SELECT title from tests")
|
|
results = cursor.fetchone()
|
|
if len(results) != 0:
|
|
test = True
|
|
else:
|
|
test = False
|
|
else:
|
|
test = False
|
|
cursor = cur.execute("SELECT login from logins")
|
|
logins = cursor.fetchall()
|
|
connection.close()
|
|
return render_template('userHome.html', user=session['user'].split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = test, logins = logins)
|
|
|
|
@app.route('/streaming')
|
|
def streaming():
|
|
return render_template('streaming.html')
|
|
|
|
@app.route('/toAddEvent')
|
|
def toAddEvent():
|
|
if session.get('user'):
|
|
return render_template('addEvent.html')
|
|
else:
|
|
return render_template('error.html',error = 'Nieautoryzowany dostęp!')
|
|
|
|
def gen(camera):
|
|
while True:
|
|
frame = camera.get_frame()
|
|
yield (b'--frame\r\n'
|
|
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')
|
|
|
|
@app.route('/stream')
|
|
def stream():
|
|
connection = sqlite3.connect('database.db', isolation_level = None)
|
|
cur = connection.cursor()
|
|
cursor = cur.execute("SELECT login from logins")
|
|
logins = cursor.fetchall()
|
|
emails = []
|
|
for login in logins:
|
|
emails.append(login[0] + "@gmail.com")
|
|
print (emails)
|
|
msg = mail.send_message(
|
|
'Nowy stream',
|
|
sender='wirtualna.uczelnia.2018@gmail.com',
|
|
recipients=emails,
|
|
body="Został uruchomiony nowy stream pod adresem:\n" + "http://192.168.8.100:5000/streaming"
|
|
)
|
|
return Response(gen(VideoCamera()),
|
|
mimetype='multipart/x-mixed-replace; boundary=frame')
|
|
|
|
def make_tree(path=app.config['UPLOAD_FOLDER']):
|
|
tree = dict(name=path, children=[])
|
|
try: lst = os.listdir(path)
|
|
except OSError:
|
|
pass #ignore errors
|
|
else:
|
|
for name in lst:
|
|
fn = os.path.join(path, name)
|
|
if os.path.isdir(fn):
|
|
tree['children'].append(make_tree(fn))
|
|
else:
|
|
tree['children'].append(dict(name=fn))
|
|
return tree
|
|
|
|
@app.route('/uploads/<filename>')
|
|
def download_file(filename):
|
|
return send_from_directory(app.config['UPLOAD_FOLDER'],
|
|
filename, as_attachment=True)
|
|
|
|
@app.route('/toResources')
|
|
def toResources():
|
|
if session.get('user'):
|
|
files = make_tree()
|
|
# list_of_files = {}
|
|
# path = app.config['UPLOAD_FOLDER']
|
|
# for filename in os.listdir(path):
|
|
# print (filename)
|
|
# print ()
|
|
# list_of_files[filename] = filename.file.name
|
|
print (files)
|
|
# items_keys = []
|
|
# for key, value in files.items():
|
|
# if key == "children":
|
|
# for k, v in value[0].items():
|
|
# items_keys.append(v)
|
|
# items_values = []
|
|
# for item in items_keys:
|
|
# items_values.append(item.indexOf("\\"))
|
|
|
|
return render_template('resources.html', items=files)
|
|
else:
|
|
return render_template('error.html',error = 'Nieautoryzowany dostęp!')
|
|
|
|
@app.route('/resources', methods=['POST'])
|
|
def resources():
|
|
if request.method == 'POST':
|
|
# check if the post request has the file part
|
|
#if 'file' not in request.files:
|
|
# flash('No file part')
|
|
# return redirect(request.url)
|
|
if "fileToUpload" in request.files:
|
|
file = request.files['fileToUpload']
|
|
#file = request.form["fileToUpload"]
|
|
filename = secure_filename(file.filename)
|
|
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
|
connection = sqlite3.connect('database.db', isolation_level = None)
|
|
cur = connection.cursor()
|
|
cursor = cur.execute("SELECT login from logins")
|
|
logins = cursor.fetchall()
|
|
return render_template('userHome.html', user=session['user'].split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = False, logins = logins)
|
|
else:
|
|
return redirect(url_for('toResources'))
|
|
|
|
|
|
@app.route('/showResources')
|
|
def showResources():
|
|
path = app.config['UPLOAD_FOLDER']
|
|
for filename in os.listdir(path):
|
|
list_of_files[filename] = filename
|
|
#return render_template("")
|
|
|
|
|
|
#input_file = UPLOAD_FOLDER + filename
|
|
#uploads = []
|
|
#uploads.append(input_file)
|
|
#return render_template('resources.html', resources = uploads)
|
|
# if user does not select file, browser also
|
|
# submit an empty part without filename
|
|
#if file.filename == '':
|
|
# flash('No selected file')
|
|
# return redirect(request.url)
|
|
|
|
#if file and allowed_file(file.filename):
|
|
#filename = secure_filename(file.filename)
|
|
#filename = app.config['UPLOAD_FOLDER'] + file.filename
|
|
#file.save(filename) #os.path.join(app.config['UPLOAD_FOLDER'], file.filename))
|
|
#return redirect(url_for('uploaded_file', filename=filename))
|
|
'''if request.method == 'GET':
|
|
uploads = app.config['UPLOAD_FOLDER'] #os.path.join(current_app.root_path, app.config['UPLOAD_FOLDER'])
|
|
return render_template('resources.html', resources = os.getcwd() + os.listdir(uploads))
|
|
|
|
<!doctype html>
|
|
<title>Upload new File</title>
|
|
<h1>Upload new File</h1>
|
|
<form method=post enctype=multipart/form-data>
|
|
<input type=file name=file>
|
|
<input type=submit value=Upload>
|
|
</form>
|
|
|
|
''' '''
|
|
@app.route('/addResources', methods=['GET', 'POST'])
|
|
def addResources():
|
|
if request.method == 'POST':
|
|
# check if the post request has the file part
|
|
#if 'file' not in request.files:
|
|
# flash('No file part')
|
|
# return redirect(request.url)
|
|
file = request.files['fileToUpload']
|
|
file_metadata = {'name': file}
|
|
path = request.files['fileToUpload'].file.name
|
|
print (os.path.basename(file.filename))
|
|
media = MediaFileUpload(path)
|
|
file = drive_service.files().create(body=file_metadata, media_body=media, fields='id').execute()
|
|
results = drive_service.files().list(pageSize=10, fields="nextPageToken, files(id, name)").execute()
|
|
items = results.get('files', [])
|
|
return render_template('allResources.html', resources = items)
|
|
# file = request.files['fileToUpload']
|
|
# file = request.form["fileToUpload"]
|
|
# filename = secure_filename(file.filename)
|
|
# file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
|
# input_file = UPLOAD_FOLDER + filename
|
|
# uploads = []
|
|
# uploads.append(input_file)
|
|
# return render_template('addResources.html', resources = uploads)
|
|
|
|
@app.route('/allResources')
|
|
def allResources():
|
|
results = drive_service.files().list(pageSize=10, fields="nextPageToken, files(id, name)").execute()
|
|
items = results.get('files', [])
|
|
return render_template('allResources.html', resources = items)
|
|
|
|
@app.route('/toAllResources')
|
|
def toAllResources():
|
|
if session.get('user'):
|
|
uploads = app.config['UPLOAD_FOLDER'] #os.path.join(current_app.root_path, app.config['UPLOAD_FOLDER'])
|
|
path = os.getcwd()
|
|
return render_template('allResources.html', resources = os.path.join(path, uploads))
|
|
#return render_template('resources.html')
|
|
else:
|
|
return render_template('error.html',error = 'Nieautoryzowany dostęp!')
|
|
|
|
@app.route('/toAddResources')
|
|
def toAddResources():
|
|
if session.get('user'):
|
|
uploads = app.config['UPLOAD_FOLDER'] #os.path.join(current_app.root_path, app.config['UPLOAD_FOLDER'])
|
|
path = os.getcwd()
|
|
return render_template('addResources.html', resources = os.path.join(path, uploads))
|
|
#return render_template('resources.html')
|
|
else:
|
|
return render_template('error.html',error = 'Nieautoryzowany dostęp!')
|
|
|
|
@app.route('/downloadResource')
|
|
def downloadResource():
|
|
if session.get('user'):
|
|
file_id = request.files["files"]
|
|
return render_template('addResources.html', resources = os.path.join(path, uploads))
|
|
#return render_template('resources.html')
|
|
else:
|
|
return render_template('error.html',error = 'Nieautoryzowany dostęp!')
|
|
'''
|
|
|
|
|
|
@app.route('/create_test', methods = ["GET", "POST"])
|
|
def create_test():
|
|
if session["role"] == "admin":
|
|
test_name = request.form["inputTestName"]
|
|
users = request.form["inputUsers"]
|
|
splitted_users = users.split(",")
|
|
emails = []
|
|
for splitted_user in splitted_users:
|
|
if "@gmail.com" not in splitted_user:
|
|
emails.append(splitted_user.strip() + "@gmail.com")
|
|
else:
|
|
emails.append(splitted_user.strip())
|
|
# msg = Message("Zostałeś przydzielony do nowego testu",
|
|
# sender = "wirtualna.uczelnia.2018@gmail.com",
|
|
# recipients = emails)
|
|
print(users)
|
|
# msg.body = "Link do testu:\n" + "http://150.254.78.161:5000/solve_test/1"
|
|
# msg.body = "Pojawił się nowy test, zaloguj się i go rozwiąż:\n" + "http://150.254.78.161:5000/showSignIn"
|
|
# msg = MIMEMultipart('alternative')
|
|
# msg.set_charset('utf8')
|
|
# msg['Subject'] = Header("Zostałeś przydzielony do nowego testu", "utf-8")
|
|
# msg['From'] = me
|
|
# html = "Pojawił się nowy test, zaloguj się i go rozwiąż:\n" + "http://150.254.78.161:5000/showSignIn"
|
|
# part2 = MIMEText(html, 'html', _charset="UTF-8")
|
|
# msg.attach(part2)
|
|
# s = smtplib.SMTP_SSL('smtp.gmail.com')
|
|
# s.login(me, my_password)
|
|
# for email in emails:
|
|
# msg['To'] = email
|
|
# s.sendmail(me, email, msg.as_string())
|
|
# mail.send(msg)
|
|
msg = mail.send_message(
|
|
'Zostałeś przydzielony do nowego testu',
|
|
sender='wirtualna.uczelnia.2018@gmail.com',
|
|
recipients=emails,
|
|
body="Pojawił się nowy test, zaloguj się i go rozwiąż:\n" + "http://192.168.8.100:5000/showSignIn"
|
|
)
|
|
connection = sqlite3.connect('database.db', isolation_level = None)
|
|
cur = connection.cursor()
|
|
cur.execute('DROP TABLE IF EXISTS tests')
|
|
cur.execute('DROP TABLE IF EXISTS questions')
|
|
cur.execute('DROP TABLE IF EXISTS users')
|
|
cur.execute('DROP TABLE IF EXISTS results')
|
|
cur.execute('CREATE TABLE tests (title TEXT PRIMARY KEY)')
|
|
cur.execute('CREATE TABLE users (nickname TEXT PRIMARY KEY)')
|
|
cur.execute('CREATE TABLE questions (id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, answer_a TEXT, answer_b TEXT, answer_c TEXT, answer_d TEXT, correct_answer TEXT)')
|
|
cur.execute('CREATE TABLE results (nickname TEXT PRIMARY KEY, points INTEGER)')
|
|
print (test_name)
|
|
cur.execute('INSERT INTO tests(title) values(?)', (test_name,))
|
|
for splitted_user in splitted_users:
|
|
user = splitted_user.split("@")
|
|
cur.execute('INSERT INTO users values(?)', (user[0].strip(),))
|
|
cursor = cur.execute('SELECT * from tests')
|
|
connection.close()
|
|
# return render_template("add_question.html", test_title = test_name)
|
|
return redirect("show_test_owner")
|
|
else:
|
|
return render_template("error.html", error = "Brak dostępu!")
|
|
|
|
@app.route("/show_test_owner")
|
|
def show_test_owner():
|
|
connection = sqlite3.connect('database.db', isolation_level = None)
|
|
cur = connection.cursor()
|
|
cursor = cur.execute("SELECT title from tests")
|
|
test_name = cursor.fetchone()
|
|
connection.close()
|
|
return render_template("add_question.html", test_title = test_name[0])
|
|
|
|
@app.route('/add_question', methods = ["POST"])
|
|
def add_question():
|
|
if session["role"] == "admin":
|
|
if request.form["submit"] == "Dodaj kolejne pytanie":
|
|
name = request.form["inputTitle"]
|
|
answer_a = request.form["inputA"]
|
|
answer_b = request.form["inputB"]
|
|
answer_c = request.form["inputC"]
|
|
answer_d = request.form["inputD"]
|
|
correct_answer = request.form["inputCorrect"]
|
|
connection = sqlite3.connect('database.db', isolation_level = None)
|
|
cur = connection.cursor()
|
|
cur.execute("INSERT INTO questions(title, answer_a, answer_b, answer_c, answer_d, correct_answer) values(?, ?, ?, ?, ?, ?)", (name, answer_a, answer_b, answer_c, answer_d, correct_answer,))
|
|
connection.close()
|
|
return redirect("show_test_owner")
|
|
elif request.form["submit"] == "Zatwierdź test":
|
|
name = request.form["inputTitle"]
|
|
answer_a = request.form["inputA"]
|
|
answer_b = request.form["inputB"]
|
|
answer_c = request.form["inputC"]
|
|
answer_d = request.form["inputD"]
|
|
correct_answer = request.form["inputCorrect"]
|
|
connection = sqlite3.connect('database.db', isolation_level = None)
|
|
cur = connection.cursor()
|
|
print (name)
|
|
# print (answer_a)
|
|
# print (correct_answer)
|
|
cursor = cur.execute("INSERT INTO questions(title, answer_a, answer_b, answer_c, answer_d, correct_answer) values(?, ?, ?, ?, ?, ?)", (name, answer_a, answer_b, answer_c, answer_d, correct_answer,))
|
|
cursor = cur.execute("SELECT * from questions")
|
|
print (cursor.fetchone())
|
|
# print (cursor)
|
|
cursor = cur.execute("SELECT login from logins")
|
|
logins = cursor.fetchall()
|
|
connection.close()
|
|
return render_template("userHome.html", user=session['user'].split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = True, logins = logins)
|
|
else:
|
|
return render_template("error.html", error = "Brak dostępu!")
|
|
|
|
@app.route("/show_test", methods = ["GET", "POST"])
|
|
def show_test():
|
|
connection = sqlite3.connect('database.db', isolation_level = None)
|
|
cur = connection.cursor()
|
|
cursor = cur.execute("SELECT nickname from results")
|
|
users = cursor.fetchall()
|
|
# if session["user"] + "@gmail.com" in users:
|
|
print (users)
|
|
if (session["user"].split("@")[0],) not in users:
|
|
cursor = cur.execute("SELECT * from questions")
|
|
questions = cursor.fetchall()
|
|
print (questions)
|
|
cursor = cur.execute("SELECT title from tests")
|
|
title = cursor.fetchone()
|
|
print (title)
|
|
connection.close()
|
|
return render_template("test.html", questions = questions, test = title[0])
|
|
else:
|
|
return render_template("error.html", error = "Nie możesz wypełnić tego testu!")
|
|
|
|
|
|
@app.route("/test", methods = ["POST"])
|
|
def test():
|
|
# if session["role"] == "admin":
|
|
# return render_template("end_test_owner.html")
|
|
# if session["user"] in users:
|
|
if session["role"] == "user":
|
|
points = 0
|
|
answers = []
|
|
connection = sqlite3.connect('database.db', isolation_level = None)
|
|
cursor = connection.execute('SELECT correct_answer from questions')
|
|
correct_answers = cursor.fetchall()
|
|
for i in range(len(correct_answers)):
|
|
answers.append(request.form["inputAnswer"+str(i)])
|
|
print (correct_answers)
|
|
for j in range(len(correct_answers)):
|
|
if answers[j] == correct_answers[j][0]:
|
|
points += 1
|
|
cursor.execute("INSERT INTO results values(?, ?)", (session["user"].split("@")[0], points))
|
|
if points == 1:
|
|
sentence = "punkt"
|
|
elif points == 2 or points == 3 or points == 4:
|
|
sentence = "punkty"
|
|
elif points > 4 or points == 0:
|
|
sentence = "punktów"
|
|
cur = connection.cursor()
|
|
c = cur.execute("SELECT login from logins")
|
|
logins = c.fetchall()
|
|
connection.close()
|
|
return render_template("userHome.html", user=session['user'].split('@')[0] + ". Twój wynik wyniósł " + str(points) + sentence, calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = False, logins = logins)
|
|
|
|
# if answer == correct_answer:
|
|
# cursor = connection.execute('SELECT points from results where nickname = ?', session["user"])
|
|
# points = cursor.fetchone()
|
|
# connection.execute('UPDATE TABLE results SET points = ?', points + 1)
|
|
# connection.close()
|
|
# if question_number != count_questions:
|
|
# return redirect(url_for("solve_test", question_number + 1))
|
|
# else:
|
|
# return render_template("end_test.html", points = points)
|
|
else:
|
|
return render_template("error.html", error = "Brak dostępu!")
|
|
|
|
@app.route("/end_test", methods = ["GET", "POST"])
|
|
def end_test():
|
|
connection = sqlite3.connect('database.db', isolation_level = None)
|
|
cur = connection.cursor()
|
|
if session["role"] == "admin":
|
|
cursor = cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='results'")
|
|
if len(cursor.fetchall()) > 0:
|
|
cursor = cur.execute('SELECT MAX(points), nickname from results group by nickname')
|
|
results = cursor.fetchall()
|
|
cur.execute('DROP TABLE IF EXISTS tests')
|
|
cur.execute('DROP TABLE IF EXISTS questions')
|
|
cur.execute('DROP TABLE IF EXISTS users')
|
|
cur.execute('DROP TABLE IF EXISTS results')
|
|
cursor = cur.execute("SELECT login from logins")
|
|
logins = cursor.fetchall()
|
|
connection.close()
|
|
if len(results) != 0:
|
|
print (results)
|
|
if results[0][0] == 1:
|
|
sentence = "punkt, a uzyskał go: "
|
|
elif results[0][0] == 2 or results[0][0] == 3 or results[0][0] == 4:
|
|
sentence = "punkty, a uzyskał go: "
|
|
elif results[0][0] > 4 or results[0][0] == 0:
|
|
sentence = "punktów, a uzyskał go: "
|
|
return render_template("userHome.html", user=session['user'].split('@')[0] + "\nNajlepszy wynik z ostatniego testu wyniósł " + str(results[0][0]) + sentence + str(results[0][1]), calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = False, logins = logins)
|
|
else:
|
|
return render_template("userHome.html", user=session['user'].split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = False, logins = logins)
|
|
else:
|
|
return render_template("error.html", error = "Brak dostępu!")
|
|
|
|
if __name__ == "__main__":
|
|
#app.run(host='0.0.0.0')
|
|
# Port: 5000
|
|
#socketio.run(app, debug=True)
|
|
socketio.run(app, host="0.0.0.0")#, ssl_context=("cert.pem", "key.pem"))
|
|
|