from flask import Flask, request, jsonify, session, redirect, url_for from flask_cors import CORS import google.oauth2.credentials import google_auth_oauthlib.flow import googleapiclient.discovery import os import json import traceback from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.naive_bayes import MultinomialNB import pandas as pd import re import joblib import imaplib import email from email.header import decode_header import base64 import csv app = Flask(__name__) CORS(app) app.secret_key = 'your_secret_key' os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' CLIENT_SECRETS_FILE = "client_secret.json" SCOPES = ['https://www.googleapis.com/auth/gmail.readonly','https://www.googleapis.com/auth/gmail.modify'] API_SERVICE_NAME = 'gmail' API_VERSION = 'v1' SAFE_EMAILS_FILE = 'safe_emails.json' SAFE_TRASH_FILE = 'safe_trash.csv' # Load safe emails from file def load_safe_emails(): if os.path.exists(SAFE_EMAILS_FILE): with open(SAFE_EMAILS_FILE, 'r') as file: return json.load(file) return [] # Save safe emails to file def save_safe_emails(safe_emails): with open(SAFE_EMAILS_FILE, 'w') as file: json.dump(safe_emails, file) def save_safe_trash(safe_trash): file_exists = os.path.isfile(SAFE_TRASH_FILE) with open(SAFE_TRASH_FILE, 'a', newline='', encoding='utf-8') as file: fieldnames = ['from', 'subject', 'body'] writer = csv.DictWriter(file, fieldnames=fieldnames) if not file_exists: writer.writeheader() writer.writerow(safe_trash) safe_emails = load_safe_emails() model = joblib.load('spam_classifier_model.pkl') vectorizer = joblib.load('vectorizer.pkl') def load_client_secrets(): with open(CLIENT_SECRETS_FILE) as f: return json.load(f) def save_credentials(credentials): session['credentials'] = credentials_to_dict(credentials) app.logger.info("Credentials saved in session.") def load_credentials(): if 'credentials' in session: return google.oauth2.credentials.Credentials( **session['credentials']) return None @app.route('/authorize') def authorize(): client_secrets = load_client_secrets() flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file( CLIENT_SECRETS_FILE, scopes=SCOPES) flow.redirect_uri = url_for('oauth2callback', _external=True) authorization_url, state = flow.authorization_url( access_type='offline', include_granted_scopes='true') session['state'] = state app.logger.info(f"Authorization URL: {authorization_url}, State: {state}") return jsonify({'url': authorization_url, 'state': state}) @app.route('/oauth2callback') def oauth2callback(): state = session.pop('state', None) if not state: app.logger.error('CSRF Warning: State mismatch') return jsonify({'error': 'CSRF Warning: State mismatch'}), 400 client_secrets = load_client_secrets() flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file( CLIENT_SECRETS_FILE, scopes=SCOPES, state=state) flow.redirect_uri = url_for('oauth2callback', _external=True) authorization_response = request.url app.logger.info(f"Authorization response: {authorization_response}") try: flow.fetch_token(authorization_response=authorization_response) credentials = flow.credentials save_credentials(credentials) app.logger.info(f"Token received and saved: {credentials.token}") return """ """ except KeyError as e: app.logger.error(f"KeyError: {str(e)}") app.logger.error("Client secrets: " + json.dumps(client_secrets, indent=2)) app.logger.error("Authorization response: " + authorization_response) return jsonify({'error': 'KeyError', 'message': str(e)}), 500 except Exception as e: app.logger.error(f"Exception: {str(e)}") app.logger.error("Traceback: " + traceback.format_exc()) return jsonify({'error': 'Exception', 'message': str(e)}), 500 @app.route('/validate-outlook-login', methods=['POST']) def validate_outlook_login(): data = request.json username = data['email'] password = data['password'] try: mail = imaplib.IMAP4_SSL("outlook.office365.com") mail.login(username, password) mail.logout() return jsonify({"success": True}), 200 except imaplib.IMAP4.error: return jsonify({"success": False, "error": "Invalid credentials"}), 401 except UnicodeEncodeError: return jsonify({"success": False, "error": "Encoding error"}), 400 @app.route('/fetch-emails', methods=['POST']) def fetch_emails(): data = request.json username = data['username'] password = data['password'] try: mail = imaplib.IMAP4_SSL("outlook.office365.com") mail.login(username, password) mail.select("inbox") except imaplib.IMAP4.error: return jsonify({"error": "Login failed. Check your email and password."}), 401 except UnicodeEncodeError: return jsonify({"error": "Encoding error. Ensure your email and password contain only valid characters."}), 400 status, messages = mail.search(None, "ALL") if status != "OK": return jsonify({"error": "Failed to retrieve emails."}), 500 status, messages = mail.search(None, "ALL") email_ids = messages[0].split() emails = [] for email_id in email_ids: res, msg = mail.fetch(email_id, "(RFC822)") for response_part in msg: if isinstance(response_part, tuple): msg = email.message_from_bytes(response_part[1]) subject, encoding = decode_header(msg["Subject"])[0] if isinstance(subject, bytes): subject = subject.decode(encoding if encoding else "utf-8") from_ = msg.get("From") name, email_address = email.utils.parseaddr(from_) body = "" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain" and part.get("Content-Disposition") is None: body += part.get_payload(decode=True).decode(part.get_content_charset() or "utf-8") else: body = msg.get_payload(decode=True).decode(msg.get_content_charset() or "utf-8") email_vectorized = vectorizer.transform([body]) prediction = model.predict(email_vectorized) result = True if prediction == 1 or contains_suspicious_links(body) or contains_phishing_indicators(subject, email_address,body) else False emails.append({"id": email_id.decode(), "from": email_address, "subject": subject, "snippet": body, "suspicious": result}) return jsonify(emails) @app.route('/check_auth_status', methods=['GET']) def check_auth_status(): if 'credentials' in session: credentials = google.oauth2.credentials.Credentials(**session['credentials']) if credentials and credentials.valid: return jsonify({'logged_in': True}) return jsonify({'logged_in': False}) @app.route('/check_mail') def check_mail(): if 'credentials' not in session: return redirect('authorize') credentials = google.oauth2.credentials.Credentials( **session['credentials']) gmail = googleapiclient.discovery.build( 'gmail', 'v1', credentials=credentials) results = gmail.users().messages().list(userId='me', labelIds=['INBOX']).execute() messages = results.get('messages', []) emails = [] for message in messages: msg = gmail.users().messages().get(userId='me', id=message['id']).execute() snippet = msg.get('snippet', '') payload = msg.get('payload', {}) headers = payload.get('headers', []) subject = '' sender = '' result = False for header in headers: if header['name'] == 'Subject': subject = header['value'] if header['name'] == 'From': sender = header['value'] email_vectorized = vectorizer.transform([snippet]) prediction = model.predict(email_vectorized) if message['id'] not in safe_emails: result = True if prediction == 1 or contains_suspicious_links(snippet) or contains_phishing_indicators(subject, sender,snippet) else False emails.append({ 'subject': subject, 'from': sender, 'snippet': snippet, 'id': message['id'], 'suspicious': result }) return jsonify(emails) @app.route('/logout', methods=['POST']) def logout(): session.clear() app.logger.info("Session cleared. User logged out.") return jsonify({'message': 'Logged out'}), 200 @app.route('/mark_safe/', methods=['POST']) def mark_safe(email_id): global safe_emails safe_emails.append(email_id) save_safe_emails(safe_emails) app.logger.info(f'Email {email_id} marked as safe') return jsonify({"message": f"Email {email_id} marked as safe"}), 200 @app.route('/move_trash/', methods=['POST']) def move_trash(email_id): if 'credentials' not in session: return jsonify({'message': 'Not logged in'}), 401 credentials = google.oauth2.credentials.Credentials( **session['credentials']) gmail = googleapiclient.discovery.build( 'gmail', 'v1', credentials=credentials) try: message = gmail.users().messages().get(userId='me', id=email_id).execute() payload = message.get('payload', {}) headers = payload.get('headers', []) subject = '' sender = '' body = '' for header in headers: if header['name'] == 'Subject': subject = header['value'] if header['name'] == 'From': sender = header['value'] if 'parts' in payload: for part in payload['parts']: if part['mimeType'] == 'text/plain': body = part['body']['data'] body = base64.urlsafe_b64decode(body).decode('utf-8') safe_trash = { "from": sender, "subject": subject, "body": body } save_safe_trash(safe_trash) gmail.users().messages().modify( userId='me', id=email_id, body={'removeLabelIds': ['INBOX'], 'addLabelIds': ['TRASH']} ).execute() app.logger.info(f'Email {email_id} moved to trash') return jsonify({"message": f"Email {email_id} moved to trash"}), 200 except Exception as e: app.logger.error(f"Exception moving email to trash: {str(e)}") return jsonify({"error": str(e)}), 500 @app.route('/delete-email', methods=['POST']) def delete_email(): data = request.json email_id = data['email_id'] username = data['username'] password = data['password'] try: mail = imaplib.IMAP4_SSL("outlook.office365.com") mail.login(username, password) mail.select("inbox") status, message_data = mail.fetch(email_id, "(RFC822)") if status != "OK": return jsonify({"error": "Failed to fetch email"}), 500 msg = email.message_from_bytes(message_data[0][1]) subject, encoding = decode_header(msg["Subject"])[0] if isinstance(subject, bytes): subject = subject.decode(encoding if encoding else "utf-8") from_ = msg.get("From") name, email_address = email.utils.parseaddr(from_) body = "" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain" and part.get("Content-Disposition") is None: body += part.get_payload(decode=True).decode(part.get_content_charset() or "utf-8") else: body = msg.get_payload(decode=True).decode(msg.get_content_charset() or "utf-8") safe_trash = { "from": from_, "subject": subject, "body": body } save_safe_trash(safe_trash) mail.store(email_id, '+FLAGS', '\\Deleted') mail.expunge() return jsonify({"message": f"Email {email_id} deleted"}) except imaplib.IMAP4.error: return jsonify({"error": "Failed to delete email"}), 500 def contains_suspicious_links(snippet): url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+') urls = url_pattern.findall(snippet) suspicious_domains = ['phishingsite.com', 'malicious.com'] for url in urls: for domain in suspicious_domains: if domain in url: return True return False def contains_phishing_indicators(subject, sender, body): phishing_subject_keywords = ['urgent', 'important', 'update', 'verification', 'account', 'password', 'verify'] phishing_senders = ['no-reply@phishingsite.com', 'support@malicious.com'] phishing_body_keywords = ['click', 'verify', 'confirm'] for keyword in phishing_subject_keywords: if keyword in subject.lower(): return True for phishing_sender in phishing_senders: if phishing_sender in sender.lower(): return True for keyword in phishing_body_keywords: if keyword in body.lower(): return True return False def credentials_to_dict(credentials): return { 'token': credentials.token, 'refresh_token': credentials.refresh_token, 'token_uri': credentials.token_uri, 'client_id': credentials.client_id, 'client_secret': credentials.client_secret, 'scopes': credentials.scopes } if __name__ == '__main__': app.run(port=5000, debug=True)