implement NaiveBayes Class

This commit is contained in:
AdamOsiowy123 2022-05-17 22:08:42 +02:00
parent f30d8b8712
commit 45eb1ff6f2
3 changed files with 83 additions and 18068 deletions

File diff suppressed because one or more lines are too long

View File

@ -1,200 +1,66 @@
from nltk.corpus import wordnet
from nltk import pos_tag
from string import punctuation
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from wordcloud import WordCloud, STOPWORDS
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_extraction.text import CountVectorizer
from nltk.stem import PorterStemmer, WordNetLemmatizer
from nltk.corpus import stopwords # *To Remove the stop words
import numpy as np
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
import pandas as pd
import os
from collections import Counter
from prepare_data import preprocess_dataset, save_dataset
from ast import literal_eval
import nltk
# TODO: stworzyc mapy slow dla zbiorów z fraudulent 0 i 1
nltk.download("stopwords")
ps = PorterStemmer() # *To perform stemming
# *For tokenizing the words and putting it into the word list
def return_word_list(stop_words, sentence):
word_list = []
for word in sentence.lower():
if word not in stop_words and word.isalpha():
word_list.append(ps.stem(word))
return word_list
# *For finding the conditional probability
def return_category_probability_dictionary(
dict_category_wise_probability: dict, word_list, probab: int,
prob_df: int, pro: int):
help_dict = {}
for i, _ in probab.iterrows():
for word in word_list:
if word in prob_df.index.tolist():
pro = pro * probab.loc[i, word]
help_dict[i] = pro * dict_category_wise_probability[i]
pro = 1
return help_dict
from prepare_data import read_data
class NaiveBayes:
def __init__(self, data, labels, features):
self.data = data
def __init__(self, train_x, train_y, labels):
self.train_x = train_x
self.train_y = train_y
self.labels = labels
self.features = features
self.counts = {}
self.prior_prob = {}
self.word_counts = {}
def count_words(self):
for label in self.labels:
indexes = self.train_y.index[self.train_y == label].tolist()
data = self.train_x[self.train_x.index.isin(indexes)]
vocabulary = []
for tokens in data:
vocabulary += tokens
self.word_counts.update({label: (len(vocabulary), len(set(vocabulary)), Counter(vocabulary))})
def fit(self):
pass # TODO
self.counts = {l: self.train_y[self.train_y == l].shape[0] for l in self.labels}
self.prior_prob = {l: float(self.counts[l]) / float(self.train_y.shape[0]) for l in self.labels}
self.count_words()
def transform(self):
pass # TODO
def get_posteriori(self, text):
values = {}
for label in self.labels:
values = {label: 0 for label in self.labels}
for word in text:
values[label] += np.log((float(self.word_counts[label][2].get(word, 0) + 1)) / (
self.word_counts[label][0] + self.word_counts[label][1]))
values[label] *= np.log(self.prior_prob[label])
return values.values()
def predict(self):
pass # TODO
def evaluate(self, test_data):
pass # TODO
def read_data(data_path: str, prepare_data: bool = False) -> pd.DataFrame:
"""Read data from given path - if @prepared_data is True, data is also preprocessed and cleaned"""
if prepare_data:
data = preprocess_dataset(data_path)
else:
data = pd.read_csv(data_path, nrows=1000) # !Delete the nrows option
return data["tokens"], data["fraudulent"]
def to_dictionary(stop_words: set, category: int) -> dict:
"""Create and return a dictionary containing (word: occurrence_count) pairs for words not being stop words"""
vocab = set()
sentences = category
for i in sentences:
for word in i:
word_lower = word.lower()
if word_lower not in stop_words and word_lower.isalpha():
vocab.add(ps.stem(word_lower))
word_dic = Counter(vocab)
return word_dic
def build_master_dict(data: pd.DataFrame, classes: list,
stop_words: set) -> dict:
"""Create the master dictionary containing each word's frequency"""
master_dict = {}
for category in classes:
category_temp = data[data["fraudulent"] == category]
temp_dict = to_dictionary(stop_words, category_temp["tokens"])
master_dict[category] = temp_dict
return master_dict
def build_category_probs_dicts(
word_frequency_df: pd.DataFrame,
categories_to_iterate: list) -> tuple(dict, dict):
"""Create the dictionary holding category-wise sums and word-wise probabilities"""
category_sum = []
for category in categories_to_iterate:
# *Prepared category sum for zip
category_sum.append(word_frequency_df[category].sum())
# *Dictionary with category based sums
dict_category_sum = dict(zip(categories_to_iterate, category_sum))
cat_wise_probs_dict = dict_category_sum.copy()
total_sentences_values = cat_wise_probs_dict.values()
total = sum(total_sentences_values)
for key, value in cat_wise_probs_dict.items():
cat_wise_probs_dict[key] = value / total
return cat_wise_probs_dict, dict_category_sum
def build_word_probs(word_freqs, categories_to_iterate, dict_category_sum):
"""Calculate word probability with smoothing application"""
prob_df = word_freqs
for category in categories_to_iterate:
for index, row in prob_df.iterrows():
row[category] = (row[category] + 1) / (
dict_category_sum[category] + len(prob_df[category])
) # *Smoothing
prob_df.at[index, category] = row[category]
return prob_df
def predict(self, test_x):
predicted = []
for row in test_x:
predicted.append(np.argmax(self.get_posteriori(row)))
return predicted
def main():
# *Reading and splitting data
x, y = read_data(os.path.join(os.path.abspath("./data"), "clean-data.csv"))
x_train, x_test, y_train, y_test = train_test_split(x,
y,
test_size=0.2,
random_state=123,
stratify=y)
train_data = pd.concat([x_train, y_train], axis=1)
print("\tTrain data:\n", train_data)
test_data = pd.concat([x_test, y_test], axis=1)
data = read_data(os.path.join(os.path.abspath("./data"), "clean-data.csv"))
data['tokens'] = data['tokens'].apply(literal_eval)
x = data['tokens']
y = data['fraudulent']
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=123, stratify=y)
bayes = NaiveBayes(x_train, y_train, [0, 1])
bayes.fit()
predicted = bayes.predict(x_test)
classes = [0, 1]
# *Building the master dictionary that contains the word frequency
stop_words = set(stopwords.words('english'))
master_dict = build_master_dict(train_data, classes, stop_words)
print("Master dictionary with word freqs", master_dict)
# *Converting the dictionary to data frame for ease of use
word_frequency_df = pd.DataFrame(master_dict).fillna(0)
print("Dictionary converted to DataFrame\n", word_frequency_df.head)
# *Building the dictionary that holds category wise sums and word wise probabilities
categories_to_iterate = list(
word_frequency_df) # *Prepared category for zip
dict_category_wise_probability, dict_category_sum = build_category_probs_dicts(
word_frequency_df, categories_to_iterate)
print(
f"The dictionary that holds the cateogry wise sum is {dict_category_sum}"
)
print(
f"The dictionary that holds the category wise probabilities is {dict_category_wise_probability}"
)
# *Building word probability with the application of smoothing
prob_df = build_word_probs(word_frequency_df, categories_to_iterate,
dict_category_sum)
print(prob_df)
probab = prob_df.transpose()
pro = 1
match = 0
total = 0
counter = 0
for _, row in test_data.iterrows():
if counter > 200:
break
ind = row["fraudulent"]
text = row["tokens"]
word_list = return_word_list(stop_words, text)
# *Get the dictionary that contains the final probability P(word|category)
help_dict = return_category_probability_dictionary(
dict_category_wise_probability, word_list, probab, prob_df, pro)
if ind == max(help_dict, key=help_dict.get):
match = match + 1
total = total + 1
counter += 1
print(f"The model predicted {match} correctly of {total}")
print(f"The model accuracy then is {int((match / total) * 100)}%")
print(accuracy_score(y_test, predicted))
if __name__ == "__main__":

View File

@ -3,11 +3,28 @@ import numpy as np
import pandas as pd
from kaggle import api
from sklearn.utils import shuffle
import nltk
from nltk.tokenize import RegexpTokenizer
from nltk.stem.snowball import SnowballStemmer
from nltk.corpus import stopwords
nltk.download("punkt")
nltk.download("stopwords")
stemmer = SnowballStemmer(language="english")
tokenizer = RegexpTokenizer(r'\w+')
stop_words = set(stopwords.words('english'))
def read_data(data_path: str, prepare_data: bool = False):
"""Read data from given path - if @prepared_data is True, data is also preprocessed and cleaned"""
if prepare_data:
data = preprocess_dataset(data_path)
else:
data = pd.read_csv(data_path)
return data
def download_data(data_path, dataset_name):
@ -24,13 +41,24 @@ def download_data(data_path, dataset_name):
)
def tokenize_and_stem_text(text):
tokenized_text = tokenizer.tokenize(text)
tokens = [token.lower() for token in tokenized_text if token.lower() not in stop_words and len(token) > 3]
return [stemmer.stem(token) for token in tokens]
def preprocess_dataset(data_path):
data = pd.read_csv(data_path).replace(np.nan, "", regex=True)
data["description"] = data["description"].str.replace(
r"(\W+)|(url_\w+)|(\s+)", " ", regex=True)
# data["description"] = data["description"].str.replace(r"url_\w+", " ", regex=True)
# data["description"] = data["description"].str.replace(r"\s+", " ", regex=True)
data_not_fraudulent = data[data['fraudulent'] == 0]
data_fraudulent = data[data['fraudulent'] == 1]
sample = data_not_fraudulent.sample(data_fraudulent.shape[0], replace=False)
data = pd.concat([sample.reset_index(), data_fraudulent.reset_index()], axis=0)
data = shuffle(data)
data["description"] = data["description"].str.replace(r"\W+", " ", regex=True)
data["description"] = data["description"].str.replace(r"url_\w+", " ", regex=True)
data["description"] = data["description"].str.replace(r"\s+", " ", regex=True)
data["text"] = data[[
"title",
@ -39,10 +67,12 @@ def preprocess_dataset(data_path):
"description",
"requirements",
"benefits",
]].apply(lambda x: " ".join(x).lower(), axis=1)
]].apply(lambda x: " ".join(x), axis=1)
# data["text"] = data[[
# "description"
# ]].apply(lambda x: " ".join(x), axis=1)
tokenizer = RegexpTokenizer(r"\w+")
data["tokens"] = data["text"].apply(tokenizer.tokenize)
data["tokens"] = data["text"].apply(lambda text: tokenize_and_stem_text(text))
return data.drop(
[