203 lines
7.5 KiB
Python
203 lines
7.5 KiB
Python
from flask import Flask, request, jsonify, session
|
|
from flask_cors import CORS
|
|
import imaplib
|
|
import email
|
|
from email.header import decode_header
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
from sklearn.naive_bayes import MultinomialNB
|
|
import traceback
|
|
import json
|
|
import os
|
|
|
|
app = Flask(__name__)
|
|
CORS(app)
|
|
app.secret_key = 'your_secret_key'
|
|
|
|
SAFE_EMAILS_FILE = 'safe_emails.json'
|
|
|
|
# Load safe emails from file
|
|
def load_safe_emails():
|
|
if os.path.exists(SAFE_EMAILS_FILE):
|
|
with open(SAFE_EMAILS_FILE, 'r') as file:
|
|
return json.load(file)
|
|
return []
|
|
|
|
# Save safe emails to file
|
|
def save_safe_emails(safe_emails):
|
|
with open(SAFE_EMAILS_FILE, 'w') as file:
|
|
json.dump(safe_emails, file)
|
|
|
|
safe_emails = load_safe_emails()
|
|
|
|
# Dane treningowe
|
|
training_data = [
|
|
("Urgent account verification", "support@example.com", 1),
|
|
("Meeting agenda", "boss@example.com", 0),
|
|
("Password reset request", "no-reply@example.com", 1),
|
|
("Team lunch schedule", "hr@example.com", 0),
|
|
("Suspicious login attempt", "security@example.com", 1),
|
|
("Project update", "colleague@example.com", 0),
|
|
("Verify your email address", "verification@example.com", 1),
|
|
("Weekly report", "manager@example.com", 0),
|
|
("Your account has been suspended", "no-reply@example.com", 1),
|
|
("Company policy update", "admin@example.com", 0),
|
|
("Immediate action required", "alert@example.com", 1),
|
|
("Holiday party invitation", "events@example.com", 0),
|
|
("Important security update", "security@example.com", 1),
|
|
("Monthly performance review", "boss@example.com", 0),
|
|
("Claim your prize now", "lottery@example.com", 1),
|
|
("Training session details", "training@example.com", 0),
|
|
("Unauthorized access detected", "alert@example.com", 1),
|
|
("Office relocation notice", "admin@example.com", 0),
|
|
("Confirm your subscription", "newsletter@example.com", 1),
|
|
("Sales team meeting", "sales@example.com", 0),
|
|
("Your payment is overdue", "billing@example.com", 1),
|
|
("Client feedback", "client@example.com", 0),
|
|
("Update your account details", "update@example.com", 1),
|
|
("Social event invitation", "social@example.com", 0),
|
|
("Action required: Update password", "security@example.com", 1),
|
|
("New project assignment", "manager@example.com", 0),
|
|
("Notice of data breach", "security@example.com", 1),
|
|
("Weekly newsletter", "newsletter@example.com", 0),
|
|
("Re: Your recent purchase", "support@example.com", 1),
|
|
("Performance appraisal meeting", "hr@example.com", 0),
|
|
("Important account notice", "no-reply@example.com", 1),
|
|
("Quarterly earnings report", "finance@example.com", 0),
|
|
("Urgent: Verify your identity", "security@example.com", 1),
|
|
("Birthday celebration", "events@example.com", 0),
|
|
]cd
|
|
|
|
subjects = [x[0] for x in training_data]
|
|
senders = [x[1] for x in training_data]
|
|
labels = [x[2] for x in training_data]
|
|
|
|
# Połączenie tytułów i nadawców
|
|
combined_features = [s + ' ' + senders[i] for i, s in enumerate(subjects)]
|
|
vectorizer = TfidfVectorizer()
|
|
X = vectorizer.fit_transform(combined_features)
|
|
y = labels
|
|
|
|
model = MultinomialNB()
|
|
model.fit(X, y)
|
|
|
|
@app.route('/login', methods=['POST'])
|
|
def login():
|
|
data = request.get_json()
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
|
|
try:
|
|
mail = imaplib.IMAP4_SSL('imap.wp.pl')
|
|
mail.login(username, password)
|
|
session['username'] = username
|
|
session['password'] = password
|
|
return jsonify({'message': 'Login successful'}), 200
|
|
except imaplib.IMAP4.error as e:
|
|
print(f'Login failed: {e}')
|
|
return jsonify({'message': 'Login failed'}), 401
|
|
except Exception as e:
|
|
print('Error during login:', e)
|
|
traceback.print_exc()
|
|
return jsonify({'message': 'Internal server error'}), 500
|
|
|
|
@app.route('/check_mail', methods=['GET'])
|
|
def check_mail():
|
|
if 'username' not in session or 'password' not in session:
|
|
return jsonify({'message': 'Not logged in'}), 401
|
|
|
|
username = session['username']
|
|
password = session['password']
|
|
|
|
try:
|
|
mail = imaplib.IMAP4_SSL('imap.wp.pl')
|
|
mail.login(username, password)
|
|
mail.select('INBOX')
|
|
result, data = mail.search(None, 'ALL')
|
|
email_ids = data[0].split()[-10:] # Pobierz ostatnie 10 e-maili
|
|
emails = []
|
|
|
|
for e_id in email_ids:
|
|
result, email_data = mail.fetch(e_id, '(RFC822)')
|
|
raw_email = email_data[0][1]
|
|
msg = email.message_from_bytes(raw_email)
|
|
subject = decode_header_value(msg['subject'])
|
|
sender = decode_header_value(msg['from'])
|
|
is_phishing = detect_phishing(subject, sender, e_id.decode())
|
|
emails.append({'subject': subject, 'from': sender, 'is_phishing': is_phishing, 'id': e_id.decode()})
|
|
|
|
return jsonify(emails), 200
|
|
except Exception as e:
|
|
print('Error during email check:', e)
|
|
traceback.print_exc()
|
|
return jsonify({'message': 'Internal server error'}), 500
|
|
|
|
@app.route('/logout', methods=['POST'])
|
|
def logout():
|
|
try:
|
|
session.pop('username', None)
|
|
session.pop('password', None)
|
|
return jsonify({'message': 'Logged out'}), 200
|
|
except Exception as e:
|
|
print('Error during logout:', e)
|
|
traceback.print_exc()
|
|
return jsonify({'message': 'Internal server error'}), 500
|
|
|
|
@app.route('/mark_safe/<email_id>', methods=['POST'])
|
|
def mark_safe(email_id):
|
|
global safe_emails
|
|
safe_emails.append(email_id)
|
|
save_safe_emails(safe_emails)
|
|
print(f'Email {email_id} marked as safe')
|
|
return jsonify({"message": f"Email {email_id} marked as safe"}), 200
|
|
|
|
@app.route('/move_trash/<email_id>', methods=['POST'])
|
|
def move_trash(email_id):
|
|
if 'username' not in session or 'password' not in session:
|
|
return jsonify({'message': 'Not logged in'}), 401
|
|
|
|
username = session['username']
|
|
password = session['password']
|
|
|
|
try:
|
|
mail = imaplib.IMAP4_SSL('imap.wp.pl')
|
|
mail.login(username, password)
|
|
mail.select('INBOX')
|
|
print(f'Trying to move email ID {email_id} to Trash') # Logging email ID
|
|
mail.store(email_id, '+FLAGS', '\\Deleted')
|
|
mail.expunge()
|
|
print(f'Email {email_id} deleted') # Logging deletion
|
|
return jsonify({"message": f"Email {email_id} deleted"}), 200
|
|
except Exception as e:
|
|
print(f'Error during moving email to trash: {e}')
|
|
traceback.print_exc()
|
|
return jsonify({'message': 'Internal server error'}), 500
|
|
|
|
def decode_header_value(value):
|
|
parts = decode_header(value)
|
|
header_parts = []
|
|
for part, encoding in parts:
|
|
if isinstance(part, bytes):
|
|
try:
|
|
if encoding:
|
|
header_parts.append(part.decode(encoding))
|
|
else:
|
|
header_parts.append(part.decode('utf-8'))
|
|
except (LookupError, UnicodeDecodeError):
|
|
header_parts.append(part.decode('utf-8', errors='ignore'))
|
|
else:
|
|
header_parts.append(part)
|
|
return ''.join(header_parts)
|
|
|
|
def detect_phishing(subject, sender, email_id):
|
|
if email_id in safe_emails:
|
|
return False # If email is marked as safe, it's not phishing
|
|
|
|
phishing_keywords = ['urgent', 'verify', 'account', 'suspend', 'login']
|
|
phishing_senders = ['support@example.com', 'no-reply@example.com']
|
|
if any(keyword in subject.lower() for keyword in phishing_keywords) or sender.lower() in phishing_senders:
|
|
return True
|
|
return False
|
|
|
|
if __name__ == '__main__':
|
|
app.run(port=5000)
|