import pandas as pd import csv import regex as re from nltk import bigrams, word_tokenize from collections import Counter, defaultdict def clean(text): text = str(text).lower().replace("-\\n", "").replace("\\n", " ") return re.sub(r"\p{P}", "", text) class Collection: def __init__(self, path: str) -> None: self._path = path def read(self): self.data = pd.read_csv( self._path, sep="\t", error_bad_lines=False, header=None, quoting=csv.QUOTE_NONE ) class Model: def __init__(self, alpha: float = 0.01) -> None: self.alpha = alpha self.model = defaultdict(lambda: defaultdict(lambda: 0)) self.vocab = set() def train(self, data): for _, row in data.iterrows(): words = word_tokenize(clean(row["final"])) for w1, w2 in bigrams(words, pad_left=True, pad_right=True): if w1 and w2: self.model[w1][w2] += 1 self.vocab.add(w1) self.vocab.add(w2) for w1 in self.model: total_count = float(sum(self.model[w1].values())) denominator = total_count + self.alpha * len(self.vocab) for w2 in self.model[w1]: nominator = self.model[w1][w2] + self.alpha self.model[w1][w2] = nominator / denominator def _predict(self, word): predictions = dict(self.model[word]) most_common = dict(Counter(predictions).most_common(6)) total_prob = 0.0 str_prediction = "" for word, prob in most_common.items(): total_prob += prob str_prediction += f"{word}:{prob} " if not total_prob: return "the:0.2 be:0.2 to:0.2 of:0.1 and:0.1 a:0.1 :0.1" if 1 - total_prob >= 0.01: str_prediction += f":{1-total_prob}" else: str_prediction += f":0.01" return str_prediction def _save(self, save_path: str, data): with open(save_path, "w") as file: for _, row in data.iterrows(): words = word_tokenize(clean(row[6])) if len(words) < 3: prediction = "the:0.2 be:0.2 to:0.2 of:0.1 and:0.1 a:0.1 :0.1" else: prediction = self._predict(words[-1]) file.write(prediction + "\n") def predict(self, read_path: str, save_path: str): collection = Collection(read_path) collection.read() self._save(save_path, collection.data) if __name__ == '__main__': data = Collection("train/in.tsv.xz") data.read() train_labels = Collection("train/expected.tsv") train_labels.read() train_data = data.data[[6, 7]] train_data = pd.concat([train_data, train_labels.data], axis=1) train_data["final"] = train_data[6] + train_data[0] + train_data[7] model = Model() model.train(train_data) model.predict("dev-0/in.tsv.xz", "dev-0/out.tsv") model.predict("test-A/in.tsv.xz", "test-A/out.tsv")