WU/app.py
2019-01-16 20:00:52 +01:00

1146 lines
42 KiB
Python

from flask import Flask, render_template, request, json, session, redirect, Blueprint, url_for, flash, Response, send_from_directory, stream_with_context, g
from flask_wtf import Form
from wtforms.fields import StringField, SubmitField
from wtforms.validators import Required
#from flaskext.mysql import MySQL
#from pony.orm import *
import cv2
import logging as log
import datetime as dt
from time import sleep
from flask_socketio import SocketIO
from flask_socketio import emit, join_room, leave_room
import smtplib
from oauth2client import file, client, tools
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from httplib2 import Http
from oauth2client.service_account import ServiceAccountCredentials
import os
from werkzeug.utils import secure_filename
from camera import VideoCamera
import pyaudio
import sqlite3
from flask_mail import Mail, Message
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header
from datetime import datetime
from dateutil import parser
main = Blueprint('main', __name__)
app = Flask(__name__)
app.secret_key = '523tvb6h3nio6r21cc4r1qx'
app.debug = True
socketio = SocketIO()
app.register_blueprint(main)
socketio.init_app(app)
me = "wirtualna.uczelnia.2018@gmail.com"
my_password = "korniszon"
app.config.update(MAIL_SERVER = "smtp.gmail.com",
MAIL_PORT = 465,
MAIL_USE_SSL = True,
MAIL_USERNAME = "wirtualna.uczelnia.2018@gmail.com",
MAIL_PASSWORD = "korniszon")
mail = Mail(app)
# MySQL configurations
#app.config['MYSQL_DATABASE_USER'] = 'root'
#app.config['MYSQL_DATABASE_PASSWORD'] = 'localhost'
#app.config['MYSQL_DATABASE_DB'] = 'wu'
#app.config['MYSQL_DATABASE_HOST'] = 'localhost'
#mysql = MySQL()
#mysql.init_app(app)
UPLOAD_FOLDER = './uploads'
ALLOWED_EXTENSIONS = set(['txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'])
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
DATABASE = 'database.db'
# connection = sqlite3.connect(DATABASE, isolation_level = None)
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/groupChat', methods=['GET', 'POST'])
def indexChat():
if "user" in session:
"""Login form to enter a room."""
form = LoginForm()
if form.validate_on_submit():
session['name'] = session["user"].split('@')[0] #form.name.data
session['room'] = "Czat grupowy" #form.room.data
return redirect(url_for('.chat'))
elif request.method == 'GET':
form.name.data = session.get('name', '')
form.room.data = session.get('room', '')
return render_template('indexChat.html', form=form)
else:
return render_template("error.html", error = "Brak dostępu!")
@app.route('/chat')
def chat():
if "user" in session:
"""Chat room. The user's name and room must be stored in
the session."""
name = session["user"] #session.get('name', '')
room = "Czat grupowy" #session.get('room', '')
if name == '' or room == '' or name == None:
return redirect("index.html")
return render_template('chat.html', name=name, room=room)
else:
return render_template("error.html", error = "Brak dostępu!")
@socketio.on('joined', namespace='/chat')
def joined(message):
"""Sent by clients when they enter a room.
A status message is broadcast to all people in the room."""
room = "Czat grupowy" #session.get('room')
join_room(room)
emit('status', {'msg': session.get('user').split('@')[0] + ' wszedł na czat'}, room=room)
@socketio.on('text', namespace='/chat')
def text(message):
"""Sent by a client when the user entered a new message.
The message is sent to all people in the room."""
room = "Czat grupowy" #session.get('room')
emit('message', {'msg': session.get('user').split('@')[0] + ':' + message['msg']}, room=room)
@socketio.on('left', namespace='/chat')
def left(message):
"""Sent by clients when they leave a room.
A status message is broadcast to all people in the room."""
room = "Czat grupowy" #session.get('room')
leave_room(room)
emit('status', {'msg': session.get('user').split('@')[0] + ' opuścił czat'}, room=room)
class LoginForm(Form):
"""Accepts a nickname and a room."""
name = StringField('Name', validators=[Required()])
room = StringField('Room', validators=[Required()])
submit = SubmitField('Wejdź na czat')
faces_list = []
testing_face = 0
service = None
drive_service = None
CALENDAR_SCOPES = "https://www.googleapis.com/auth/calendar.events"
DRIVE_SCOPES = "https://www.googleapis.com/auth/drive"
@socketio.on('message')
def handleMessage(msg):
print('Message: ' + msg.decode("utf-8"))
send(msg, broadcast=True)
@app.route("/")
def main():
store = file.Storage('token.json')
creds = store.get()
if not creds or creds.invalid:
flow = client.flow_from_clientsecrets('credentials.json', CALENDAR_SCOPES)
creds = tools.run_flow(flow, store)
service = build('calendar', 'v3', http=creds.authorize(Http()))
if not creds or creds.invalid:
flow = client.flow_from_clientsecrets('credentials.json', DRIVE_SCOPES)
creds = tools.run_flow(flow, store)
drive_service = build('drive', 'v3', http=creds.authorize(Http()))
connection = sqlite3.connect('database.db', isolation_level = None)
cur = connection.cursor()
cursor = cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='logins'")
results = cursor.fetchall()
if len(results) > 0:
pass
else:
cur.execute("CREATE TABLE logins (login TEXT PRIMARY KEY)")
connection.close()
return render_template('index.html', info="Witaj!")
@app.route("/indexENG")
def indexENG():
return render_template('indexENG.html', info="Hello!")
@app.route('/showSignUp')
def showSignUp():
return render_template('signup.html')
@app.route('/showSignUpENG')
def showSignUpENG():
return render_template('signupENG.html')
@app.route('/showSignIn')
def showSignIn():
return render_template('signin.html')
@app.route('/showSignInENG')
def showSignInENG():
return render_template('signinENG.html')
@app.route('/signUp',methods=['POST','GET'])
def signUp():
try:
_login = request.form['inputLogin']
_password = request.form['inputPassword']
if _login and _password:
conn = mysql.connect()
cursor = conn.cursor()
#_hashed_password = generate_password_hash(_password)
args = (_login, _password,)
cursor.execute("INSERT into Users(login, password) values(?, ?)", args)
data = cursor.fetchall()
if len(data) is 0:
conn.commit()
return render_template("index.html", info="Pomyślnie zarejestrowano!")
#return json.dumps({'message':'User created successfully !'})
else:
return render_template("error.html", error="Użytkownik już istnieje!")
#return json.dumps({'error':str(data[0])})
else:
return json.dumps({'html':'<span>Enter the required fields</span>'})
#except Exception as e:
# return json.dumps({'error':str(e)})
finally:
cursor.close()
conn.close()
@app.route('/signUpENG',methods=['POST','GET'])
def signUpENG():
try:
_login = request.form['inputLogin']
_password = request.form['inputPassword']
if _login and _password:
conn = mysql.connect()
cursor = conn.cursor()
#_hashed_password = generate_password_hash(_password)
args = (_login, _password,)
cursor.execute("INSERT into Users(login, password) values(?, ?)", args)
data = cursor.fetchall()
if len(data) is 0:
conn.commit()
return render_template("indexENG.html", info="Successfully registered!")
#return json.dumps({'message':'User created successfully !'})
else:
return render_template("errorENG.html", error="The user already exists!")
#return json.dumps({'error':str(data[0])})
else:
return json.dumps({'html':'<span>Enter the required fields</span>'})
#except Exception as e:
# return json.dumps({'error':str(e)})
finally:
cursor.close()
conn.close()
@app.route('/validateLogin', methods=['POST'])
def validateLogin():
#con, cursor = None, None
#try:
#_username = request.form['inputLogin']
gmail_username = request.form['inputLogin']
#_password = request.form['inputPassword']
gmail_password = request.form['inputPassword']
try:
server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
server.ehlo()
server.login(gmail_username, gmail_password)
if "@gmail.com" not in gmail_username:
session['user'] = gmail_username + "@gmail.com" #.split('@')[0]
else:
session["user"] = gmail_username
if "wirtualna.uczelnia.2018" in gmail_username:
session["role"] = "admin"
else:
session["role"] = "user"
connection = sqlite3.connect('database.db', isolation_level = None)
cur = connection.cursor()
cursor = cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='tests'")
results = cursor.fetchall()
if results is not None and session["role"] == "admin" and len(results) != 0:
cursor = cur.execute("SELECT title from tests")
results = cursor.fetchone()
if results is not None and len(results) != 0:
test = True
else:
test = False
else:
test = False
cursor = cur.execute("SELECT login from logins")
logins = cursor.fetchall()
print (logins)
emails = []
for login in logins:
emails.append(login[0])
print (emails)
if session["user"].split("@")[0] not in emails:
cur.execute("INSERT INTO logins(login) values(?)", (session["user"].split("@")[0],))
cursor = cur.execute("SELECT login from logins")
logins = cursor.fetchall()
connection.close()
return render_template('userHome.html', user=session['user'].split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = test, logins = logins)
# else:
# return render_template('error.html',error = 'Zły email lub hasło!')
except smtplib.SMTPAuthenticationError:
return render_template('error.html',error = 'Zły email lub hasło!')
@app.route('/validateLoginENG',methods=['POST'])
def validateLoginENG():
#con, cursor = None, None
#try:
#_username = request.form['inputLogin']
gmail_username = request.form['inputLogin']
#_password = request.form['inputPassword']
gmail_password = request.form['inputPassword']
try:
server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
server.ehlo()
server.login(gmail_username, gmail_password)
session['user'] = gmail_username#.split('@')[0]
return render_template('userHomeENG.html', user=gmail_username.split('@')[0], test = False)
except:
return render_template('errorENG.html',error = 'Wrong email or password!')
'''
con = mysql.connect()
cursor = con.cursor()
#cursor.callproc('logowanie',(_username,))
args = (_username, _password)
query = "SELECT * from Users where login = ? and password = ?"
cursor.execute(query, args)
data = cursor.fetchall()
if len(data) > 0:
#if check_password_hash(str(data[0][3]),_password):
#if str(data[0][3]) == _password:
if str(data[0][1]) == _password:
session['user'] = data[0][0]
#return redirect('/userHomeENG', user=session['user'])
return render_template('userHomeENG.html', user=session['user'])
else:
return render_template('errorENG.html',error = 'Wrong email or password!')
else:
return render_template('errorENG.html',error = 'Wrong email or password!')
except Exception as e:
return render_template('errorENG.html', error = str(e))
finally:
if cursor:
cursor.close()
if con:
con.close()
'''
@app.route('/userHome')
def userHome():
if session.get('user'):
connection = sqlite3.connect('database.db', isolation_level = None)
cur = connection.cursor()
cursor = cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='tests'")
results = cursor.fetchall()
cursor = cur.execute("SELECT login from logins")
logins = cursor.fetchall()
connection.close()
if len(results) > 0:
return render_template('userHome.html', user=session['user'].split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = True, logins = logins)
else:
return render_template('userHome.html', user=session['user'].split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = False, logins = logins)
else:
return render_template('error.html',error = 'Nieautoryzowany dostęp!')
@app.route('/userHomeENG')
def userHomeENG():
if session.get('user'):
return render_template('userHomeENG.html', user=session['user'].split('@')[0], test = False)
else:
return render_template('errorENG.html',error = 'Unauthorized access!')
@app.route('/logout')
def logout():
session.pop('user', None)
session.pop('role', None)
return redirect('/')
@app.route('/logoutENG')
def logoutENG():
session.pop('user',None)
return redirect('/indexENG')
@app.route('/signUpWithFace')
def signUpWithFace():
cascPath = "haarcascade_frontalface_default.xml"
faceCascade = cv2.CascadeClassifier(cascPath)
log.basicConfig(filename='webcam.log',level=log.INFO)
video_capture = cv2.VideoCapture(0)
anterior = 0
faces = None
i = 0
while True:
if not video_capture.isOpened():
print('Unable to load camera.')
sleep(5)
pass
# Capture frame-by-frame
ret, frame = video_capture.read()
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = faceCascade.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30)
)
# Draw a rectangle around the faces
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
if anterior != len(faces):
anterior = len(faces)
log.info("faces: "+str(len(faces))+" at "+str(dt.datetime.now()))
# Display the resulting frame
cv2.imshow('Video', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Display the resulting frame
cv2.imshow('Video', frame)
if type(faces) is not tuple:
i += 1
face = 0
for f in faces[0]:
face += f
faces_list.append(face)
if i == 5:
break
# When everything is done, release the capture
video_capture.release()
cv2.destroyAllWindows()
print (faces_list)
return render_template('index.html', info="Zapisano twarz")
@app.route('/signUpWithFaceENG')
def signUpWithFaceENG():
cascPath = "haarcascade_frontalface_default.xml"
faceCascade = cv2.CascadeClassifier(cascPath)
log.basicConfig(filename='webcam.log',level=log.INFO)
video_capture = cv2.VideoCapture(0)
anterior = 0
faces = None
i = 0
while True:
if not video_capture.isOpened():
print('Unable to load camera.')
sleep(5)
pass
# Capture frame-by-frame
ret, frame = video_capture.read()
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = faceCascade.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30)
)
# Draw a rectangle around the faces
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
if anterior != len(faces):
anterior = len(faces)
log.info("faces: "+str(len(faces))+" at "+str(dt.datetime.now()))
# Display the resulting frame
cv2.imshow('Video', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Display the resulting frame
cv2.imshow('Video', frame)
if type(faces) is not tuple:
i += 1
face = 0
for f in faces[0]:
face += f
faces_list.append(face)
if i == 5:
break
# When everything is done, release the capture
video_capture.release()
cv2.destroyAllWindows()
print (faces_list)
return render_template('indexENG.html', info="A face has been saved")
@app.route('/signInWithFace')
def signInWithFace():
cascPath = "haarcascade_frontalface_default.xml"
faceCascade = cv2.CascadeClassifier(cascPath)
log.basicConfig(filename='webcam.log',level=log.INFO)
video_capture = cv2.VideoCapture(0)
anterior = 0
faces = None
testing_face = 825
i = 0
while True:
if not video_capture.isOpened():
print('Unable to load camera.')
sleep(5)
pass
# Capture frame-by-frame
ret, frame = video_capture.read()
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = faceCascade.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30)
)
# Draw a rectangle around the faces
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
if anterior != len(faces):
anterior = len(faces)
log.info("faces: "+str(len(faces))+" at "+str(dt.datetime.now()))
# Display the resulting frame
cv2.imshow('Video', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Display the resulting frame
cv2.imshow('Video', frame)
#print (faces_list)
if type(faces) is not tuple:
face = 0
i += 1
for f in faces[0]:
face += f
if face in faces_list:
break
elif i == 20:
return render_template("error.html", error="Brak dostępu!")
# When everything is done, release the capture
video_capture.release()
cv2.destroyAllWindows()
connection = sqlite3.connect('database.db', isolation_level = None)
cur = connection.cursor()
cursor = cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='tests'")
results = cursor.fetchall()
if len(results) != 0:
cursor = cur.execute("SELECT title from tests")
results = cursor.fetchone()
if len(results) != 0:
test = True
else:
test = False
else:
test = False
cursor = cur.execute("SELECT login from logins")
logins = cursor.fetchall()
connection.close()
return render_template('userHome.html', info="Zapisano twarz", user=session.get('user').split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = test, logins = logins)
@app.route('/signInWithFaceENG')
def signInWithFaceENG():
cascPath = "haarcascade_frontalface_default.xml"
faceCascade = cv2.CascadeClassifier(cascPath)
log.basicConfig(filename='webcam.log',level=log.INFO)
video_capture = cv2.VideoCapture(0)
anterior = 0
faces = None
testing_face = 825
i = 0
while True:
if not video_capture.isOpened():
print('Unable to load camera.')
sleep(5)
pass
# Capture frame-by-frame
ret, frame = video_capture.read()
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = faceCascade.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30)
)
# Draw a rectangle around the faces
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
if anterior != len(faces):
anterior = len(faces)
log.info("faces: "+str(len(faces))+" at "+str(dt.datetime.now()))
# Display the resulting frame
cv2.imshow('Video', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Display the resulting frame
cv2.imshow('Video', frame)
#print (faces_list)
if type(faces) is not tuple:
face = 0
i += 1
for f in faces[0]:
face += f
if face in faces_list:
break
elif i == 20:
return render_template("errorENG.html", error="Lack of access!")
# When everything is done, release the capture
video_capture.release()
cv2.destroyAllWindows()
return render_template('userHomeENG.html', info="A face has been saved", user=session.get('user').split('@')[0], test = False)
@app.route('/addEvent', methods=['POST'])
def addEvent():
summary = request.form["inputSummary"]
location = request.form["inputLocation"]
description = request.form["inputDescription"]
start = request.form["inputStart"]
end = request.form["inputEnd"]
attendees = request.form["inputAttendees"].split(',')
datetime_start = parser.parse(start)
datetime_end = parser.parse(end)
print (datetime_start)
print (datetime_end)
if datetime_start > datetime_end:
return render_template('error.html',error = 'Nieprawidłowy termin')
processed_attendees = []
for attendee in attendees:
attendee = attendee.strip()
if "@gmail.com" not in attendee:
attendee += "@gmail.com"
processed_attendees.append({'email': attendee})
event = { 'summary': summary,
'location': location,
'description': description,
'start': {
'dateTime': start + ":00+01:00",
'timeZone': 'Europe/Warsaw'
},
'end': {
'dateTime': end + ":00+01:00",
'timeZone': 'Europe/Warsaw'
},
'attendees': processed_attendees
}
#credentials = ServiceAccountCredentials.from_json_keyfile_name(
# 'credentials.json',
# scopes="https://www.googleapis.com/auth/calendar.events")
#http = httplib2.Http()
#http = credentials.authorize(http)
#service = build("calendar", "v3", http=http)
#store = file.Storage('token.json')
#creds = store.get()
#if not creds or creds.invalid:
# flow = client.flow_from_clientsecrets('credentials.json', SCOPES)
# creds = tools.run_flow(flow, store)
#service = build('calendar', 'v3', http=creds.authorize(Http()))
store = file.Storage('token.json')
creds = store.get()
if not creds or creds.invalid:
flow = client.flow_from_clientsecrets('credentials.json', "https://www.googleapis.com/auth/calendar.events")
creds = tools.run_flow(flow, store)
service = build('calendar', 'v3', http=creds.authorize(Http()))
service.events().insert(calendarId='primary', body=event).execute()
connection = sqlite3.connect('database.db', isolation_level = None)
cur = connection.cursor()
cursor = cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='tests'")
results = cursor.fetchall()
if len(results) != 0:
cursor = cur.execute("SELECT title from tests")
results = cursor.fetchone()
if len(results) != 0:
test = True
else:
test = False
else:
test = False
cursor = cur.execute("SELECT login from logins")
logins = cursor.fetchall()
connection.close()
return render_template('userHome.html', user=session['user'].split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = test, logins = logins)
@app.route('/streaming')
def streaming():
return render_template('streaming.html')
@app.route('/toAddEvent')
def toAddEvent():
if session.get('user'):
return render_template('addEvent.html')
else:
return render_template('error.html',error = 'Nieautoryzowany dostęp!')
def gen(camera):
while True:
frame = camera.get_frame()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')
@app.route('/stream')
def stream():
connection = sqlite3.connect('database.db', isolation_level = None)
cur = connection.cursor()
cursor = cur.execute("SELECT login from logins")
logins = cursor.fetchall()
emails = []
for login in logins:
emails.append(login[0] + "@gmail.com")
print (emails)
msg = mail.send_message(
'Nowy stream',
sender='wirtualna.uczelnia.2018@gmail.com',
recipients=emails,
body="Został uruchomiony nowy stream pod adresem:\n" + "http://192.168.8.100:5000/streaming"
)
return Response(gen(VideoCamera()),
mimetype='multipart/x-mixed-replace; boundary=frame')
def make_tree(path=app.config['UPLOAD_FOLDER']):
tree = dict(name=path, children=[])
try: lst = os.listdir(path)
except OSError:
pass #ignore errors
else:
for name in lst:
fn = os.path.join(path, name)
if os.path.isdir(fn):
tree['children'].append(make_tree(fn))
else:
tree['children'].append(dict(name=fn))
return tree
@app.route('/uploads/<filename>')
def download_file(filename):
return send_from_directory(app.config['UPLOAD_FOLDER'],
filename, as_attachment=True)
@app.route('/toResources')
def toResources():
if session.get('user'):
files = make_tree()
# list_of_files = {}
# path = app.config['UPLOAD_FOLDER']
# for filename in os.listdir(path):
# print (filename)
# print ()
# list_of_files[filename] = filename.file.name
print (files)
# items_keys = []
# for key, value in files.items():
# if key == "children":
# for k, v in value[0].items():
# items_keys.append(v)
# items_values = []
# for item in items_keys:
# items_values.append(item.indexOf("\\"))
return render_template('resources.html', items=files)
else:
return render_template('error.html',error = 'Nieautoryzowany dostęp!')
@app.route('/resources', methods=['POST'])
def resources():
if request.method == 'POST':
# check if the post request has the file part
#if 'file' not in request.files:
# flash('No file part')
# return redirect(request.url)
if "fileToUpload" in request.files:
file = request.files['fileToUpload']
#file = request.form["fileToUpload"]
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
connection = sqlite3.connect('database.db', isolation_level = None)
cur = connection.cursor()
cursor = cur.execute("SELECT login from logins")
logins = cursor.fetchall()
return render_template('userHome.html', user=session['user'].split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = False, logins = logins)
else:
return redirect(url_for('toResources'))
@app.route('/showResources')
def showResources():
path = app.config['UPLOAD_FOLDER']
for filename in os.listdir(path):
list_of_files[filename] = filename
#return render_template("")
#input_file = UPLOAD_FOLDER + filename
#uploads = []
#uploads.append(input_file)
#return render_template('resources.html', resources = uploads)
# if user does not select file, browser also
# submit an empty part without filename
#if file.filename == '':
# flash('No selected file')
# return redirect(request.url)
#if file and allowed_file(file.filename):
#filename = secure_filename(file.filename)
#filename = app.config['UPLOAD_FOLDER'] + file.filename
#file.save(filename) #os.path.join(app.config['UPLOAD_FOLDER'], file.filename))
#return redirect(url_for('uploaded_file', filename=filename))
'''if request.method == 'GET':
uploads = app.config['UPLOAD_FOLDER'] #os.path.join(current_app.root_path, app.config['UPLOAD_FOLDER'])
return render_template('resources.html', resources = os.getcwd() + os.listdir(uploads))
<!doctype html>
<title>Upload new File</title>
<h1>Upload new File</h1>
<form method=post enctype=multipart/form-data>
<input type=file name=file>
<input type=submit value=Upload>
</form>
''' '''
@app.route('/addResources', methods=['GET', 'POST'])
def addResources():
if request.method == 'POST':
# check if the post request has the file part
#if 'file' not in request.files:
# flash('No file part')
# return redirect(request.url)
file = request.files['fileToUpload']
file_metadata = {'name': file}
path = request.files['fileToUpload'].file.name
print (os.path.basename(file.filename))
media = MediaFileUpload(path)
file = drive_service.files().create(body=file_metadata, media_body=media, fields='id').execute()
results = drive_service.files().list(pageSize=10, fields="nextPageToken, files(id, name)").execute()
items = results.get('files', [])
return render_template('allResources.html', resources = items)
# file = request.files['fileToUpload']
# file = request.form["fileToUpload"]
# filename = secure_filename(file.filename)
# file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
# input_file = UPLOAD_FOLDER + filename
# uploads = []
# uploads.append(input_file)
# return render_template('addResources.html', resources = uploads)
@app.route('/allResources')
def allResources():
results = drive_service.files().list(pageSize=10, fields="nextPageToken, files(id, name)").execute()
items = results.get('files', [])
return render_template('allResources.html', resources = items)
@app.route('/toAllResources')
def toAllResources():
if session.get('user'):
uploads = app.config['UPLOAD_FOLDER'] #os.path.join(current_app.root_path, app.config['UPLOAD_FOLDER'])
path = os.getcwd()
return render_template('allResources.html', resources = os.path.join(path, uploads))
#return render_template('resources.html')
else:
return render_template('error.html',error = 'Nieautoryzowany dostęp!')
@app.route('/toAddResources')
def toAddResources():
if session.get('user'):
uploads = app.config['UPLOAD_FOLDER'] #os.path.join(current_app.root_path, app.config['UPLOAD_FOLDER'])
path = os.getcwd()
return render_template('addResources.html', resources = os.path.join(path, uploads))
#return render_template('resources.html')
else:
return render_template('error.html',error = 'Nieautoryzowany dostęp!')
@app.route('/downloadResource')
def downloadResource():
if session.get('user'):
file_id = request.files["files"]
return render_template('addResources.html', resources = os.path.join(path, uploads))
#return render_template('resources.html')
else:
return render_template('error.html',error = 'Nieautoryzowany dostęp!')
'''
@app.route('/create_test', methods = ["GET", "POST"])
def create_test():
if session["role"] == "admin":
test_name = request.form["inputTestName"]
users = request.form["inputUsers"]
splitted_users = users.split(",")
emails = []
for splitted_user in splitted_users:
if "@gmail.com" not in splitted_user:
emails.append(splitted_user.strip() + "@gmail.com")
else:
emails.append(splitted_user.strip())
# msg = Message("Zostałeś przydzielony do nowego testu",
# sender = "wirtualna.uczelnia.2018@gmail.com",
# recipients = emails)
print(users)
# msg.body = "Link do testu:\n" + "http://150.254.78.161:5000/solve_test/1"
# msg.body = "Pojawił się nowy test, zaloguj się i go rozwiąż:\n" + "http://150.254.78.161:5000/showSignIn"
# msg = MIMEMultipart('alternative')
# msg.set_charset('utf8')
# msg['Subject'] = Header("Zostałeś przydzielony do nowego testu", "utf-8")
# msg['From'] = me
# html = "Pojawił się nowy test, zaloguj się i go rozwiąż:\n" + "http://150.254.78.161:5000/showSignIn"
# part2 = MIMEText(html, 'html', _charset="UTF-8")
# msg.attach(part2)
# s = smtplib.SMTP_SSL('smtp.gmail.com')
# s.login(me, my_password)
# for email in emails:
# msg['To'] = email
# s.sendmail(me, email, msg.as_string())
# mail.send(msg)
msg = mail.send_message(
'Zostałeś przydzielony do nowego testu',
sender='wirtualna.uczelnia.2018@gmail.com',
recipients=emails,
body="Pojawił się nowy test, zaloguj się i go rozwiąż:\n" + "http://192.168.8.100:5000/showSignIn"
)
connection = sqlite3.connect('database.db', isolation_level = None)
cur = connection.cursor()
cur.execute('DROP TABLE IF EXISTS tests')
cur.execute('DROP TABLE IF EXISTS questions')
cur.execute('DROP TABLE IF EXISTS users')
cur.execute('DROP TABLE IF EXISTS results')
cur.execute('CREATE TABLE tests (title TEXT PRIMARY KEY)')
cur.execute('CREATE TABLE users (nickname TEXT PRIMARY KEY)')
cur.execute('CREATE TABLE questions (id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, answer_a TEXT, answer_b TEXT, answer_c TEXT, answer_d TEXT, correct_answer TEXT)')
cur.execute('CREATE TABLE results (nickname TEXT PRIMARY KEY, points INTEGER)')
print (test_name)
cur.execute('INSERT INTO tests(title) values(?)', (test_name,))
for splitted_user in splitted_users:
user = splitted_user.split("@")
cur.execute('INSERT INTO users values(?)', (user[0].strip(),))
cursor = cur.execute('SELECT * from tests')
connection.close()
# return render_template("add_question.html", test_title = test_name)
return redirect("show_test_owner")
else:
return render_template("error.html", error = "Brak dostępu!")
@app.route("/show_test_owner")
def show_test_owner():
connection = sqlite3.connect('database.db', isolation_level = None)
cur = connection.cursor()
cursor = cur.execute("SELECT title from tests")
test_name = cursor.fetchone()
connection.close()
return render_template("add_question.html", test_title = test_name[0])
@app.route('/add_question', methods = ["POST"])
def add_question():
if session["role"] == "admin":
if request.form["submit"] == "Dodaj kolejne pytanie":
name = request.form["inputTitle"]
answer_a = request.form["inputA"]
answer_b = request.form["inputB"]
answer_c = request.form["inputC"]
answer_d = request.form["inputD"]
correct_answer = request.form["inputCorrect"]
connection = sqlite3.connect('database.db', isolation_level = None)
cur = connection.cursor()
cur.execute("INSERT INTO questions(title, answer_a, answer_b, answer_c, answer_d, correct_answer) values(?, ?, ?, ?, ?, ?)", (name, answer_a, answer_b, answer_c, answer_d, correct_answer,))
connection.close()
return redirect("show_test_owner")
elif request.form["submit"] == "Zatwierdź test":
name = request.form["inputTitle"]
answer_a = request.form["inputA"]
answer_b = request.form["inputB"]
answer_c = request.form["inputC"]
answer_d = request.form["inputD"]
correct_answer = request.form["inputCorrect"]
connection = sqlite3.connect('database.db', isolation_level = None)
cur = connection.cursor()
print (name)
# print (answer_a)
# print (correct_answer)
cursor = cur.execute("INSERT INTO questions(title, answer_a, answer_b, answer_c, answer_d, correct_answer) values(?, ?, ?, ?, ?, ?)", (name, answer_a, answer_b, answer_c, answer_d, correct_answer,))
cursor = cur.execute("SELECT * from questions")
print (cursor.fetchone())
# print (cursor)
cursor = cur.execute("SELECT login from logins")
logins = cursor.fetchall()
connection.close()
return render_template("userHome.html", user=session['user'].split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = True, logins = logins)
else:
return render_template("error.html", error = "Brak dostępu!")
@app.route("/show_test", methods = ["GET", "POST"])
def show_test():
connection = sqlite3.connect('database.db', isolation_level = None)
cur = connection.cursor()
cursor = cur.execute("SELECT nickname from results")
users = cursor.fetchall()
# if session["user"] + "@gmail.com" in users:
print (users)
if (session["user"].split("@")[0],) not in users:
cursor = cur.execute("SELECT * from questions")
questions = cursor.fetchall()
print (questions)
cursor = cur.execute("SELECT title from tests")
title = cursor.fetchone()
print (title)
connection.close()
return render_template("test.html", questions = questions, test = title[0])
else:
return render_template("error.html", error = "Nie możesz wypełnić tego testu!")
@app.route("/test", methods = ["POST"])
def test():
# if session["role"] == "admin":
# return render_template("end_test_owner.html")
# if session["user"] in users:
if session["role"] == "user":
points = 0
answers = []
connection = sqlite3.connect('database.db', isolation_level = None)
cursor = connection.execute('SELECT correct_answer from questions')
correct_answers = cursor.fetchall()
for i in range(len(correct_answers)):
answers.append(request.form["inputAnswer"+str(i)])
print (correct_answers)
for j in range(len(correct_answers)):
if answers[j] == correct_answers[j][0]:
points += 1
cursor.execute("INSERT INTO results values(?, ?)", (session["user"].split("@")[0], points))
if points == 1:
sentence = "punkt"
elif points == 2 or points == 3 or points == 4:
sentence = "punkty"
elif points > 4 or points == 0:
sentence = "punktów"
cur = connection.cursor()
c = cur.execute("SELECT login from logins")
logins = c.fetchall()
connection.close()
return render_template("userHome.html", user=session['user'].split('@')[0] + ". Twój wynik wyniósł " + str(points) + sentence, calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = False, logins = logins)
# if answer == correct_answer:
# cursor = connection.execute('SELECT points from results where nickname = ?', session["user"])
# points = cursor.fetchone()
# connection.execute('UPDATE TABLE results SET points = ?', points + 1)
# connection.close()
# if question_number != count_questions:
# return redirect(url_for("solve_test", question_number + 1))
# else:
# return render_template("end_test.html", points = points)
else:
return render_template("error.html", error = "Brak dostępu!")
@app.route("/end_test", methods = ["GET", "POST"])
def end_test():
connection = sqlite3.connect('database.db', isolation_level = None)
cur = connection.cursor()
if session["role"] == "admin":
cursor = cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='results'")
if len(cursor.fetchall()) > 0:
cursor = cur.execute('SELECT MAX(points), nickname from results group by nickname')
results = cursor.fetchall()
cur.execute('DROP TABLE IF EXISTS tests')
cur.execute('DROP TABLE IF EXISTS questions')
cur.execute('DROP TABLE IF EXISTS users')
cur.execute('DROP TABLE IF EXISTS results')
cursor = cur.execute("SELECT login from logins")
logins = cursor.fetchall()
connection.close()
if len(results) != 0:
print (results)
if results[0][0] == 1:
sentence = "punkt, a uzyskał go: "
elif results[0][0] == 2 or results[0][0] == 3 or results[0][0] == 4:
sentence = "punkty, a uzyskał go: "
elif results[0][0] > 4 or results[0][0] == 0:
sentence = "punktów, a uzyskał go: "
return render_template("userHome.html", user=session['user'].split('@')[0] + "\nNajlepszy wynik z ostatniego testu wyniósł " + str(results[0][0]) + sentence + str(results[0][1]), calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = False, logins = logins)
else:
return render_template("userHome.html", user=session['user'].split('@')[0], calendar_src="https://calendar.google.com/calendar/embed?src=" + session.get("user"), test = False, logins = logins)
else:
return render_template("error.html", error = "Brak dostępu!")
if __name__ == "__main__":
#app.run(host='0.0.0.0')
# Port: 5000
#socketio.run(app, debug=True)
socketio.run(app, host="0.0.0.0")#, ssl_context=("cert.pem", "key.pem"))