391 lines
14 KiB
Python
391 lines
14 KiB
Python
from flask import Flask, request, jsonify, session, redirect, url_for
|
|
from flask_cors import CORS
|
|
import google.oauth2.credentials
|
|
import google_auth_oauthlib.flow
|
|
import googleapiclient.discovery
|
|
import os
|
|
import json
|
|
import traceback
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
from sklearn.naive_bayes import MultinomialNB
|
|
import pandas as pd
|
|
import re
|
|
import joblib
|
|
import imaplib
|
|
import email
|
|
from email.header import decode_header
|
|
import base64
|
|
import csv
|
|
|
|
app = Flask(__name__)
|
|
CORS(app)
|
|
app.secret_key = 'your_secret_key'
|
|
|
|
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
|
|
|
|
CLIENT_SECRETS_FILE = "client_secret.json"
|
|
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly','https://www.googleapis.com/auth/gmail.modify']
|
|
API_SERVICE_NAME = 'gmail'
|
|
API_VERSION = 'v1'
|
|
|
|
SAFE_EMAILS_FILE = 'safe_emails.json'
|
|
SAFE_TRASH_FILE = 'safe_trash.csv'
|
|
|
|
# Load safe emails from file
|
|
|
|
def load_safe_emails():
|
|
if os.path.exists(SAFE_EMAILS_FILE):
|
|
with open(SAFE_EMAILS_FILE, 'r') as file:
|
|
return json.load(file)
|
|
return []
|
|
|
|
# Save safe emails to file
|
|
|
|
def save_safe_emails(safe_emails):
|
|
with open(SAFE_EMAILS_FILE, 'w') as file:
|
|
json.dump(safe_emails, file)
|
|
|
|
def save_safe_trash(safe_trash):
|
|
file_exists = os.path.isfile(SAFE_TRASH_FILE)
|
|
|
|
with open(SAFE_TRASH_FILE, 'a', newline='', encoding='utf-8') as file:
|
|
fieldnames = ['from', 'subject', 'body']
|
|
writer = csv.DictWriter(file, fieldnames=fieldnames)
|
|
|
|
if not file_exists:
|
|
writer.writeheader()
|
|
|
|
writer.writerow(safe_trash)
|
|
|
|
safe_emails = load_safe_emails()
|
|
|
|
model = joblib.load('spam_classifier_model.pkl')
|
|
vectorizer = joblib.load('vectorizer.pkl')
|
|
|
|
def load_client_secrets():
|
|
with open(CLIENT_SECRETS_FILE) as f:
|
|
return json.load(f)
|
|
|
|
def save_credentials(credentials):
|
|
session['credentials'] = credentials_to_dict(credentials)
|
|
app.logger.info("Credentials saved in session.")
|
|
|
|
def load_credentials():
|
|
if 'credentials' in session:
|
|
return google.oauth2.credentials.Credentials(
|
|
**session['credentials'])
|
|
return None
|
|
|
|
@app.route('/authorize')
|
|
|
|
def authorize():
|
|
client_secrets = load_client_secrets()
|
|
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
|
|
CLIENT_SECRETS_FILE, scopes=SCOPES)
|
|
flow.redirect_uri = url_for('oauth2callback', _external=True)
|
|
authorization_url, state = flow.authorization_url(
|
|
access_type='offline',
|
|
include_granted_scopes='true')
|
|
session['state'] = state
|
|
app.logger.info(f"Authorization URL: {authorization_url}, State: {state}")
|
|
return jsonify({'url': authorization_url, 'state': state})
|
|
|
|
@app.route('/oauth2callback')
|
|
def oauth2callback():
|
|
state = session.pop('state', None)
|
|
if not state:
|
|
app.logger.error('CSRF Warning: State mismatch')
|
|
return jsonify({'error': 'CSRF Warning: State mismatch'}), 400
|
|
|
|
client_secrets = load_client_secrets()
|
|
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
|
|
CLIENT_SECRETS_FILE, scopes=SCOPES, state=state)
|
|
flow.redirect_uri = url_for('oauth2callback', _external=True)
|
|
|
|
authorization_response = request.url
|
|
app.logger.info(f"Authorization response: {authorization_response}")
|
|
try:
|
|
flow.fetch_token(authorization_response=authorization_response)
|
|
credentials = flow.credentials
|
|
save_credentials(credentials)
|
|
app.logger.info(f"Token received and saved: {credentials.token}")
|
|
return """
|
|
<script>
|
|
window.opener.postMessage('auth_complete', 'http://localhost:5000');
|
|
window.close();
|
|
</script>
|
|
"""
|
|
except KeyError as e:
|
|
app.logger.error(f"KeyError: {str(e)}")
|
|
app.logger.error("Client secrets: " + json.dumps(client_secrets, indent=2))
|
|
app.logger.error("Authorization response: " + authorization_response)
|
|
return jsonify({'error': 'KeyError', 'message': str(e)}), 500
|
|
except Exception as e:
|
|
app.logger.error(f"Exception: {str(e)}")
|
|
app.logger.error("Traceback: " + traceback.format_exc())
|
|
return jsonify({'error': 'Exception', 'message': str(e)}), 500
|
|
@app.route('/validate-outlook-login', methods=['POST'])
|
|
|
|
def validate_outlook_login():
|
|
data = request.json
|
|
username = data['email']
|
|
password = data['password']
|
|
try:
|
|
mail = imaplib.IMAP4_SSL("outlook.office365.com")
|
|
mail.login(username, password)
|
|
mail.logout()
|
|
return jsonify({"success": True}), 200
|
|
except imaplib.IMAP4.error:
|
|
return jsonify({"success": False, "error": "Invalid credentials"}), 401
|
|
except UnicodeEncodeError:
|
|
return jsonify({"success": False, "error": "Encoding error"}), 400
|
|
|
|
@app.route('/fetch-emails', methods=['POST'])
|
|
|
|
def fetch_emails():
|
|
data = request.json
|
|
username = data['username']
|
|
password = data['password']
|
|
try:
|
|
mail = imaplib.IMAP4_SSL("outlook.office365.com")
|
|
mail.login(username, password)
|
|
mail.select("inbox")
|
|
except imaplib.IMAP4.error:
|
|
return jsonify({"error": "Login failed. Check your email and password."}), 401
|
|
except UnicodeEncodeError:
|
|
return jsonify({"error": "Encoding error. Ensure your email and password contain only valid characters."}), 400
|
|
status, messages = mail.search(None, "ALL")
|
|
if status != "OK":
|
|
return jsonify({"error": "Failed to retrieve emails."}), 500
|
|
status, messages = mail.search(None, "ALL")
|
|
email_ids = messages[0].split()
|
|
emails = []
|
|
|
|
for email_id in email_ids:
|
|
res, msg = mail.fetch(email_id, "(RFC822)")
|
|
for response_part in msg:
|
|
if isinstance(response_part, tuple):
|
|
msg = email.message_from_bytes(response_part[1])
|
|
subject, encoding = decode_header(msg["Subject"])[0]
|
|
if isinstance(subject, bytes):
|
|
subject = subject.decode(encoding if encoding else "utf-8")
|
|
from_ = msg.get("From")
|
|
name, email_address = email.utils.parseaddr(from_)
|
|
body = ""
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
if part.get_content_type() == "text/plain" and part.get("Content-Disposition") is None:
|
|
body += part.get_payload(decode=True).decode(part.get_content_charset() or "utf-8")
|
|
else:
|
|
body = msg.get_payload(decode=True).decode(msg.get_content_charset() or "utf-8")
|
|
email_vectorized = vectorizer.transform([body])
|
|
prediction = model.predict(email_vectorized)
|
|
result = True if prediction == 1 or contains_suspicious_links(body) or contains_phishing_indicators(subject, email_address,body) else False
|
|
emails.append({"id": email_id.decode(),
|
|
"from": email_address,
|
|
"subject": subject,
|
|
"snippet": body,
|
|
"suspicious": result})
|
|
|
|
|
|
return jsonify(emails)
|
|
|
|
@app.route('/check_auth_status', methods=['GET'])
|
|
|
|
def check_auth_status():
|
|
if 'credentials' in session:
|
|
credentials = google.oauth2.credentials.Credentials(**session['credentials'])
|
|
if credentials and credentials.valid:
|
|
return jsonify({'logged_in': True})
|
|
return jsonify({'logged_in': False})
|
|
|
|
@app.route('/check_mail')
|
|
|
|
def check_mail():
|
|
if 'credentials' not in session:
|
|
return redirect('authorize')
|
|
|
|
credentials = google.oauth2.credentials.Credentials(
|
|
**session['credentials'])
|
|
|
|
gmail = googleapiclient.discovery.build(
|
|
'gmail', 'v1', credentials=credentials)
|
|
|
|
results = gmail.users().messages().list(userId='me', labelIds=['INBOX']).execute()
|
|
messages = results.get('messages', [])
|
|
|
|
|
|
|
|
emails = []
|
|
|
|
for message in messages:
|
|
msg = gmail.users().messages().get(userId='me', id=message['id']).execute()
|
|
snippet = msg.get('snippet', '')
|
|
payload = msg.get('payload', {})
|
|
headers = payload.get('headers', [])
|
|
subject = ''
|
|
sender = ''
|
|
result = False
|
|
for header in headers:
|
|
if header['name'] == 'Subject':
|
|
subject = header['value']
|
|
if header['name'] == 'From':
|
|
sender = header['value']
|
|
email_vectorized = vectorizer.transform([snippet])
|
|
prediction = model.predict(email_vectorized)
|
|
if message['id'] not in safe_emails:
|
|
result = True if prediction == 1 or contains_suspicious_links(snippet) or contains_phishing_indicators(subject, sender,snippet) else False
|
|
emails.append({
|
|
'subject': subject,
|
|
'from': sender,
|
|
'snippet': snippet,
|
|
'id': message['id'],
|
|
'suspicious': result
|
|
})
|
|
return jsonify(emails)
|
|
@app.route('/logout', methods=['POST'])
|
|
|
|
def logout():
|
|
session.clear()
|
|
app.logger.info("Session cleared. User logged out.")
|
|
return jsonify({'message': 'Logged out'}), 200
|
|
@app.route('/mark_safe/<email_id>', methods=['POST'])
|
|
|
|
def mark_safe(email_id):
|
|
global safe_emails
|
|
safe_emails.append(email_id)
|
|
save_safe_emails(safe_emails)
|
|
app.logger.info(f'Email {email_id} marked as safe')
|
|
return jsonify({"message": f"Email {email_id} marked as safe"}), 200
|
|
@app.route('/move_trash/<email_id>', methods=['POST'])
|
|
|
|
def move_trash(email_id):
|
|
if 'credentials' not in session:
|
|
return jsonify({'message': 'Not logged in'}), 401
|
|
credentials = google.oauth2.credentials.Credentials(
|
|
**session['credentials'])
|
|
gmail = googleapiclient.discovery.build(
|
|
'gmail', 'v1', credentials=credentials)
|
|
|
|
try:
|
|
message = gmail.users().messages().get(userId='me', id=email_id).execute()
|
|
payload = message.get('payload', {})
|
|
headers = payload.get('headers', [])
|
|
subject = ''
|
|
sender = ''
|
|
body = ''
|
|
|
|
for header in headers:
|
|
if header['name'] == 'Subject':
|
|
subject = header['value']
|
|
if header['name'] == 'From':
|
|
sender = header['value']
|
|
|
|
if 'parts' in payload:
|
|
for part in payload['parts']:
|
|
if part['mimeType'] == 'text/plain':
|
|
body = part['body']['data']
|
|
body = base64.urlsafe_b64decode(body).decode('utf-8')
|
|
|
|
safe_trash = {
|
|
"from": sender,
|
|
"subject": subject,
|
|
"body": body
|
|
}
|
|
save_safe_trash(safe_trash)
|
|
|
|
gmail.users().messages().modify(
|
|
userId='me',
|
|
id=email_id,
|
|
body={'removeLabelIds': ['INBOX'], 'addLabelIds': ['TRASH']}
|
|
).execute()
|
|
app.logger.info(f'Email {email_id} moved to trash')
|
|
return jsonify({"message": f"Email {email_id} moved to trash"}), 200
|
|
except Exception as e:
|
|
app.logger.error(f"Exception moving email to trash: {str(e)}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route('/delete-email', methods=['POST'])
|
|
|
|
def delete_email():
|
|
data = request.json
|
|
email_id = data['email_id']
|
|
username = data['username']
|
|
password = data['password']
|
|
|
|
try:
|
|
mail = imaplib.IMAP4_SSL("outlook.office365.com")
|
|
mail.login(username, password)
|
|
mail.select("inbox")
|
|
status, message_data = mail.fetch(email_id, "(RFC822)")
|
|
|
|
if status != "OK":
|
|
return jsonify({"error": "Failed to fetch email"}), 500
|
|
|
|
msg = email.message_from_bytes(message_data[0][1])
|
|
subject, encoding = decode_header(msg["Subject"])[0]
|
|
if isinstance(subject, bytes):
|
|
subject = subject.decode(encoding if encoding else "utf-8")
|
|
from_ = msg.get("From")
|
|
name, email_address = email.utils.parseaddr(from_)
|
|
body = ""
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
if part.get_content_type() == "text/plain" and part.get("Content-Disposition") is None:
|
|
body += part.get_payload(decode=True).decode(part.get_content_charset() or "utf-8")
|
|
else:
|
|
body = msg.get_payload(decode=True).decode(msg.get_content_charset() or "utf-8")
|
|
safe_trash = {
|
|
"from": from_,
|
|
"subject": subject,
|
|
"body": body
|
|
}
|
|
save_safe_trash(safe_trash)
|
|
mail.store(email_id, '+FLAGS', '\\Deleted')
|
|
mail.expunge()
|
|
return jsonify({"message": f"Email {email_id} deleted"})
|
|
except imaplib.IMAP4.error:
|
|
return jsonify({"error": "Failed to delete email"}), 500
|
|
|
|
def contains_suspicious_links(snippet):
|
|
url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
|
|
urls = url_pattern.findall(snippet)
|
|
suspicious_domains = ['phishingsite.com', 'malicious.com']
|
|
|
|
for url in urls:
|
|
for domain in suspicious_domains:
|
|
if domain in url:
|
|
return True
|
|
return False
|
|
|
|
def contains_phishing_indicators(subject, sender, body):
|
|
phishing_subject_keywords = ['urgent', 'important', 'update', 'verification', 'account', 'password', 'verify']
|
|
phishing_senders = ['no-reply@phishingsite.com', 'support@malicious.com']
|
|
phishing_body_keywords = ['click', 'verify', 'confirm']
|
|
|
|
for keyword in phishing_subject_keywords:
|
|
if keyword in subject.lower():
|
|
return True
|
|
|
|
for phishing_sender in phishing_senders:
|
|
if phishing_sender in sender.lower():
|
|
return True
|
|
|
|
for keyword in phishing_body_keywords:
|
|
if keyword in body.lower():
|
|
return True
|
|
|
|
return False
|
|
|
|
def credentials_to_dict(credentials):
|
|
return {
|
|
'token': credentials.token,
|
|
'refresh_token': credentials.refresh_token,
|
|
'token_uri': credentials.token_uri,
|
|
'client_id': credentials.client_id,
|
|
'client_secret': credentials.client_secret,
|
|
'scopes': credentials.scopes
|
|
}
|
|
|
|
if __name__ == '__main__':
|
|
app.run(port=5000, debug=True) |