from nltk.corpus import wordnet from nltk import pos_tag from string import punctuation from sklearn.metrics import classification_report, confusion_matrix, accuracy_score from wordcloud import WordCloud, STOPWORDS from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.feature_extraction.text import CountVectorizer from nltk.stem import PorterStemmer, WordNetLemmatizer from nltk.corpus import stopwords # *To Remove the stop words from sklearn.model_selection import train_test_split import pandas as pd import os from collections import Counter from prepare_data import preprocess_dataset, save_dataset import nltk nltk.download("stopwords") ps = PorterStemmer() # *To perform stemming # *For tokenizing the words and putting it into the word list def return_word_list(stop_words, sentence): word_list = [] for word in sentence.lower(): if word not in stop_words and word.isalpha(): word_list.append(ps.stem(word)) return word_list # *For finding the conditional probability def return_category_probability_dictionary( dict_category_wise_probability: dict, word_list, probab: int, prob_df: int, pro: int): help_dict = {} for i, _ in probab.iterrows(): for word in word_list: if word in prob_df.index.tolist(): pro = pro * probab.loc[i, word] help_dict[i] = pro * dict_category_wise_probability[i] pro = 1 return help_dict class NaiveBayes: def __init__(self, data, labels, features): self.data = data self.labels = labels self.features = features def fit(self): pass # TODO def transform(self): pass # TODO def predict(self): pass # TODO def evaluate(self, test_data): pass # TODO def read_data(data_path: str, prepare_data: bool = False) -> pd.DataFrame: """Read data from given path - if @prepared_data is True, data is also preprocessed and cleaned""" if prepare_data: data = preprocess_dataset(data_path) else: data = pd.read_csv(data_path, nrows=1000) # !Delete the nrows option return data["tokens"], data["fraudulent"] def to_dictionary(stop_words: set, category: int) -> dict: """Create and return a dictionary containing (word: occurrence_count) pairs for words not being stop words""" vocab = set() sentences = category for i in sentences: for word in i: word_lower = word.lower() if word_lower not in stop_words and word_lower.isalpha(): vocab.add(ps.stem(word_lower)) word_dic = Counter(vocab) return word_dic def build_master_dict(data: pd.DataFrame, classes: list, stop_words: set) -> dict: """Create the master dictionary containing each word's frequency""" master_dict = {} for category in classes: category_temp = data[data["fraudulent"] == category] temp_dict = to_dictionary(stop_words, category_temp["tokens"]) master_dict[category] = temp_dict return master_dict def build_category_probs_dicts( word_frequency_df: pd.DataFrame, categories_to_iterate: list) -> tuple(dict, dict): """Create the dictionary holding category-wise sums and word-wise probabilities""" category_sum = [] for category in categories_to_iterate: # *Prepared category sum for zip category_sum.append(word_frequency_df[category].sum()) # *Dictionary with category based sums dict_category_sum = dict(zip(categories_to_iterate, category_sum)) cat_wise_probs_dict = dict_category_sum.copy() total_sentences_values = cat_wise_probs_dict.values() total = sum(total_sentences_values) for key, value in cat_wise_probs_dict.items(): cat_wise_probs_dict[key] = value / total return cat_wise_probs_dict, dict_category_sum def build_word_probs(word_freqs, categories_to_iterate, dict_category_sum): """Calculate word probability with smoothing application""" prob_df = word_freqs for category in categories_to_iterate: for index, row in prob_df.iterrows(): row[category] = (row[category] + 1) / ( dict_category_sum[category] + len(prob_df[category]) ) # *Smoothing prob_df.at[index, category] = row[category] return prob_df def main(): # *Reading and splitting data x, y = read_data(os.path.join(os.path.abspath("./data"), "clean-data.csv")) x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=123, stratify=y) train_data = pd.concat([x_train, y_train], axis=1) print("\tTrain data:\n", train_data) test_data = pd.concat([x_test, y_test], axis=1) classes = [0, 1] # *Building the master dictionary that contains the word frequency stop_words = set(stopwords.words('english')) master_dict = build_master_dict(train_data, classes, stop_words) print("Master dictionary with word freqs", master_dict) # *Converting the dictionary to data frame for ease of use word_frequency_df = pd.DataFrame(master_dict).fillna(0) print("Dictionary converted to DataFrame\n", word_frequency_df.head) # *Building the dictionary that holds category wise sums and word wise probabilities categories_to_iterate = list( word_frequency_df) # *Prepared category for zip dict_category_wise_probability, dict_category_sum = build_category_probs_dicts( word_frequency_df, categories_to_iterate) print( f"The dictionary that holds the cateogry wise sum is {dict_category_sum}" ) print( f"The dictionary that holds the category wise probabilities is {dict_category_wise_probability}" ) # *Building word probability with the application of smoothing prob_df = build_word_probs(word_frequency_df, categories_to_iterate, dict_category_sum) print(prob_df) probab = prob_df.transpose() pro = 1 match = 0 total = 0 counter = 0 for _, row in test_data.iterrows(): if counter > 200: break ind = row["fraudulent"] text = row["tokens"] word_list = return_word_list(stop_words, text) # *Get the dictionary that contains the final probability P(word|category) help_dict = return_category_probability_dictionary( dict_category_wise_probability, word_list, probab, prob_df, pro) if ind == max(help_dict, key=help_dict.get): match = match + 1 total = total + 1 counter += 1 print(f"The model predicted {match} correctly of {total}") print(f"The model accuracy then is {int((match / total) * 100)}%") if __name__ == "__main__": main()