import csv from lib2to3.pytree import Base from logging import raiseExceptions import pandas as pd import regex as re import nltk import sys from nltk import trigrams, word_tokenize from collections import Counter, defaultdict # nltk.download("punkt") # train set train_data = pd.read_csv( "train/in.tsv.xz", sep="\t", error_bad_lines=False, warn_bad_lines=False, header=None, quoting=csv.QUOTE_NONE, nrows=100_000 ) # training labels train_labels = pd.read_csv( "train/expected.tsv", sep="\t", error_bad_lines=False, warn_bad_lines=False, header=None, quoting=csv.QUOTE_NONE, nrows=100_000 ) # dev set dev_data = pd.read_csv( "dev-0/in.tsv.xz", sep="\t", error_bad_lines=False, warn_bad_lines=False, header=None, quoting=csv.QUOTE_NONE, ) # test set test_data = pd.read_csv( "test-A/in.tsv.xz", sep="\t", error_bad_lines=False, warn_bad_lines=False, header=None, quoting=csv.QUOTE_NONE, ) class Model(): def __init__(self, vocab_size, alpha): self.alpha = alpha self.model = defaultdict(lambda: defaultdict(lambda: 0)) self.vocab = set() self.vocab_size = vocab_size def train(self, corpus: list): for _, row in corpus[:self.vocab_size].iterrows(): text = prepare_text(str(row["final"])) words = word_tokenize(text) for w1, w2, w3 in trigrams(words, pad_right=True, pad_left=True): if all([w1, w2, w3]): self.vocab.add(w1) self.vocab.add(w2) self.vocab.add(w3) self.model[(w2, w3)][w1] += 1 self.model[(w1, w2)][w3] += 1 for w_pair in self.model: ngram_count = float(sum(self.model[w_pair].values())) denominator = ngram_count + self.alpha * len(self.vocab) for w3 in self.model[w_pair]: nominator = self.model[w_pair][w3] + self.alpha self.model[w_pair][w3] = nominator / denominator def predict(self, word1, word2): raw_prediction = dict(self.model[word1, word2]) prediction = dict(Counter(raw_prediction).most_common(6)) total_prob = 0.0 str_prediction = "" for word, prob in prediction.items(): total_prob += prob str_prediction += f"{word}:{prob} " remaining_prob = 1 - total_prob str_prediction += f":{remaining_prob}" return str_prediction def prepare_text(text): text = text.lower().replace("-\\n", "").replace("\\n", " ") text = re.sub(r"\p{P}", "", text) return text # def write_output(): # with open('dev-0/out.tsv', 'w') as file: # for _, row in dev_data.iterrows(): # left_text, right_text = prepare_text(str(row[6])), prepare_text(str(row[7])) # left_words, right_words = word_tokenize(left_text), word_tokenize(right_text) # if len(left_words) < 2 or len(right_words) < 2: # prediction = ':1.0' # else: # prediction = model.predict(left_words[len(left_words) - 1], right_words[0]) # file.write(prediction + '\n') # with open('test-A/out.tsv', 'w') as file: # for _, row in test_data.iterrows(): # left_text, right_text = prepare_text(str(row[6])), prepare_text(str(row[7])) # left_words, right_words = word_tokenize(left_text), word_tokenize(right_text) # if len(left_words) < 2 or len(right_words) < 2: # prediction = ':1.0' # else: # prediction = model.predict(left_words[len(left_words) - 1], right_words[0]) # file.write(prediction + '\n') def write_output(): with open("dev-0/out.tsv", "w") as file: for _, row in dev_data.iterrows(): text = prepare_text(str(row[7])) words = word_tokenize(text) if len(words) < 3: prediction = "the:0.2 be:0.2 to:0.2 of:0.1 and:0.1 a:0.1 :0.1" else: prediction = model.predict(words[0], words[1]) file.write(prediction + "\n") with open("test-A/out.tsv", "w") as file: for _, row in test_data.iterrows(): text = prepare_text(str(row[7])) words = word_tokenize(text) if len(words) < 3: prediction = "the:0.2 be:0.2 to:0.2 of:0.1 and:0.1 a:0.1 :0.1" else: prediction = model.predict(words[0], words[1]) file.write(prediction + "\n") if __name__ == "__main__": # Preapare train data print("Preparing data...") train_data = train_data[[6, 7]] train_data = pd.concat([train_data, train_labels], axis=1) train_data["final"] = train_data[6] + train_data[0] + train_data[7] # declare model print("Preparing model...") model = Model(100_000, 0.0001) # train model print("Model training...") model.train(train_data) # write outputs print("Writing outputs...") write_output()