Implementing phishing detection for Gmail and Outlook messages. Creating methods to check for phishing through links, keywords, and integrating a trained model for email verification. Adding validation checks for data from Outlook.
This commit is contained in:
parent
7d649bdcfd
commit
32c8389ca5
@ -26,8 +26,25 @@ SCOPES = ['https://www.googleapis.com/auth/gmail.readonly','https://www.googleap
|
||||
API_SERVICE_NAME = 'gmail'
|
||||
API_VERSION = 'v1'
|
||||
|
||||
SAFE_EMAILS_FILE = 'safe_emails.json'
|
||||
|
||||
# Load safe emails from file
|
||||
def load_safe_emails():
|
||||
if os.path.exists(SAFE_EMAILS_FILE):
|
||||
with open(SAFE_EMAILS_FILE, 'r') as file:
|
||||
return json.load(file)
|
||||
return []
|
||||
|
||||
# Save safe emails to file
|
||||
def save_safe_emails(safe_emails):
|
||||
with open(SAFE_EMAILS_FILE, 'w') as file:
|
||||
json.dump(safe_emails, file)
|
||||
|
||||
safe_emails = load_safe_emails()
|
||||
|
||||
model = joblib.load('spam_classifier_model.pkl')
|
||||
vectorizer = joblib.load('vectorizer.pkl')
|
||||
|
||||
def load_client_secrets():
|
||||
with open(CLIENT_SECRETS_FILE) as f:
|
||||
return json.load(f)
|
||||
@ -91,6 +108,22 @@ def oauth2callback():
|
||||
app.logger.error("Traceback: " + traceback.format_exc())
|
||||
return jsonify({'error': 'Exception', 'message': str(e)}), 500
|
||||
|
||||
@app.route('/validate-outlook-login', methods=['POST'])
|
||||
def validate_outlook_login():
|
||||
data = request.json
|
||||
username = data['email']
|
||||
password = data['password']
|
||||
|
||||
try:
|
||||
mail = imaplib.IMAP4_SSL("outlook.office365.com")
|
||||
mail.login(username, password)
|
||||
mail.logout()
|
||||
return jsonify({"success": True}), 200
|
||||
except imaplib.IMAP4.error:
|
||||
return jsonify({"success": False, "error": "Invalid credentials"}), 401
|
||||
except UnicodeEncodeError:
|
||||
return jsonify({"success": False, "error": "Encoding error"}), 400
|
||||
|
||||
@app.route('/fetch-emails', methods=['POST'])
|
||||
def fetch_emails():
|
||||
data = request.json
|
||||
@ -103,6 +136,12 @@ def fetch_emails():
|
||||
mail.select("inbox")
|
||||
except imaplib.IMAP4.error:
|
||||
return jsonify({"error": "Login failed. Check your email and password."}), 401
|
||||
except UnicodeEncodeError:
|
||||
return jsonify({"error": "Encoding error. Ensure your email and password contain only valid characters."}), 400
|
||||
|
||||
status, messages = mail.search(None, "ALL")
|
||||
if status != "OK":
|
||||
return jsonify({"error": "Failed to retrieve emails."}), 500
|
||||
|
||||
status, messages = mail.search(None, "ALL")
|
||||
email_ids = messages[0].split()
|
||||
@ -126,39 +165,112 @@ def fetch_emails():
|
||||
body += part.get_payload(decode=True).decode(part.get_content_charset() or "utf-8")
|
||||
else:
|
||||
body = msg.get_payload(decode=True).decode(msg.get_content_charset() or "utf-8")
|
||||
email_vectorized = vectorizer.transform([body])
|
||||
prediction = model.predict(email_vectorized)
|
||||
result = True if prediction == 1 or contains_suspicious_links(body) or contains_phishing_indicators(subject, email_address,body) else False
|
||||
|
||||
emails.append({"id": email_id.decode(),
|
||||
"from": email_address,
|
||||
"subject": subject,
|
||||
"snippet": body,
|
||||
"suspicious": result})
|
||||
|
||||
emails.append({"id": email_id.decode(), "from": from_, "name": name, "email_address": email_address,
|
||||
"subject": subject, "body": body})
|
||||
|
||||
return jsonify(emails)
|
||||
|
||||
@app.route('/check_auth_status', methods=['GET'])
|
||||
def check_auth_status():
|
||||
if 'credentials' in session:
|
||||
credentials = google.oauth2.credentials.Credentials(**session['credentials'])
|
||||
if credentials and credentials.valid:
|
||||
return jsonify({'logged_in': True})
|
||||
return jsonify({'logged_in': False})
|
||||
|
||||
@app.route('/classify-email', methods=['POST'])
|
||||
def classify_email():
|
||||
data = request.json
|
||||
email_body = data['body']
|
||||
email_vectorized = vectorizer.transform([email_body])
|
||||
@app.route('/check_mail')
|
||||
def check_mail():
|
||||
if 'credentials' not in session:
|
||||
return redirect('authorize')
|
||||
|
||||
credentials = google.oauth2.credentials.Credentials(
|
||||
**session['credentials'])
|
||||
|
||||
gmail = googleapiclient.discovery.build(
|
||||
'gmail', 'v1', credentials=credentials)
|
||||
|
||||
results = gmail.users().messages().list(userId='me', labelIds=['INBOX']).execute()
|
||||
messages = results.get('messages', [])
|
||||
|
||||
emails = []
|
||||
for message in messages:
|
||||
msg = gmail.users().messages().get(userId='me', id=message['id']).execute()
|
||||
snippet = msg.get('snippet', '')
|
||||
payload = msg.get('payload', {})
|
||||
headers = payload.get('headers', [])
|
||||
subject = ''
|
||||
sender = ''
|
||||
result = False
|
||||
for header in headers:
|
||||
if header['name'] == 'Subject':
|
||||
subject = header['value']
|
||||
if header['name'] == 'From':
|
||||
sender = header['value']
|
||||
email_vectorized = vectorizer.transform([snippet])
|
||||
prediction = model.predict(email_vectorized)
|
||||
result = "Suspicious" if prediction == 1 else "Not suspicious"
|
||||
return jsonify({"result": result})
|
||||
if message['id'] not in safe_emails:
|
||||
result = True if prediction == 1 or contains_suspicious_links(snippet) or contains_phishing_indicators(subject, sender,snippet) else False
|
||||
|
||||
emails.append({
|
||||
'subject': subject,
|
||||
'from': sender,
|
||||
'snippet': snippet,
|
||||
'id': message['id'],
|
||||
'suspicious': result
|
||||
})
|
||||
return jsonify(emails)
|
||||
|
||||
@app.route('/mark-safe', methods=['POST'])
|
||||
def mark_safe():
|
||||
data = request.json
|
||||
email_id = data['email_id']
|
||||
# Logic to mark email as safe
|
||||
return jsonify({"message": f"Email {email_id} marked as safe"})
|
||||
@app.route('/logout', methods=['POST'])
|
||||
def logout():
|
||||
session.clear()
|
||||
app.logger.info("Session cleared. User logged out.")
|
||||
return jsonify({'message': 'Logged out'}), 200
|
||||
|
||||
@app.route('/mark_safe/<email_id>', methods=['POST'])
|
||||
def mark_safe(email_id):
|
||||
global safe_emails
|
||||
safe_emails.append(email_id)
|
||||
save_safe_emails(safe_emails)
|
||||
app.logger.info(f'Email {email_id} marked as safe')
|
||||
return jsonify({"message": f"Email {email_id} marked as safe"}), 200
|
||||
|
||||
@app.route('/move_trash/<email_id>', methods=['POST'])
|
||||
def move_trash(email_id):
|
||||
if 'credentials' not in session:
|
||||
return jsonify({'message': 'Not logged in'}), 401
|
||||
|
||||
credentials = google.oauth2.credentials.Credentials(
|
||||
**session['credentials'])
|
||||
gmail = googleapiclient.discovery.build(
|
||||
'gmail', 'v1', credentials=credentials)
|
||||
|
||||
try:
|
||||
gmail.users().messages().modify(
|
||||
userId='me',
|
||||
id=email_id,
|
||||
body={'removeLabelIds': ['INBOX'], 'addLabelIds': ['TRASH']}
|
||||
).execute()
|
||||
app.logger.info(f'Email {email_id} moved to trash')
|
||||
return jsonify({"message": f"Email {email_id} moved to trash"}), 200
|
||||
except Exception as e:
|
||||
app.logger.error(f"Exception moving email to trash: {str(e)}")
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
@app.route('/delete-email', methods=['POST'])
|
||||
def delete_email():
|
||||
data = request.json
|
||||
email_id = data['email_id']
|
||||
|
||||
# Connect to the mail server and delete the email
|
||||
username = data['username']
|
||||
password = data['password']
|
||||
|
||||
try:
|
||||
mail = imaplib.IMAP4_SSL("outlook.office365.com")
|
||||
mail.login(username, password)
|
||||
@ -169,6 +281,36 @@ def delete_email():
|
||||
except imaplib.IMAP4.error:
|
||||
return jsonify({"error": "Failed to delete email"}), 500
|
||||
|
||||
|
||||
def contains_suspicious_links(snippet):
|
||||
url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
|
||||
urls = url_pattern.findall(snippet)
|
||||
suspicious_domains = ['phishingsite.com', 'malicious.com']
|
||||
for url in urls:
|
||||
for domain in suspicious_domains:
|
||||
if domain in url:
|
||||
return True
|
||||
return False
|
||||
|
||||
def contains_phishing_indicators(subject, sender, body):
|
||||
phishing_subject_keywords = ['urgent', 'important', 'update', 'verification', 'account', 'password', 'verify']
|
||||
phishing_senders = ['no-reply@phishingsite.com', 'support@malicious.com']
|
||||
phishing_body_keywords = ['click', 'verify', 'confirm']
|
||||
|
||||
for keyword in phishing_subject_keywords:
|
||||
if keyword in subject.lower():
|
||||
return True
|
||||
|
||||
for phishing_sender in phishing_senders:
|
||||
if phishing_sender in sender.lower():
|
||||
return True
|
||||
|
||||
for keyword in phishing_body_keywords:
|
||||
if keyword in body.lower():
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def credentials_to_dict(credentials):
|
||||
return {
|
||||
'token': credentials.token,
|
||||
@ -179,6 +321,5 @@ def credentials_to_dict(credentials):
|
||||
'scopes': credentials.scopes
|
||||
}
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=True)
|
||||
app.run(port=5000, debug=True)
|
Loading…
Reference in New Issue
Block a user