144 lines
5.5 KiB
Python
144 lines
5.5 KiB
Python
|
import pandas as pd
|
||
|
import numpy as np
|
||
|
import csv
|
||
|
import os
|
||
|
import re
|
||
|
import random
|
||
|
from collections import Counter, defaultdict
|
||
|
import nltk
|
||
|
import math
|
||
|
from tqdm import tqdm
|
||
|
|
||
|
directory = "train/in.tsv.xz"
|
||
|
directory_dev_0 = "dev-0/in.tsv.xz"
|
||
|
directory_test_A = "test-A/in.tsv.xz"
|
||
|
|
||
|
class Model():
|
||
|
|
||
|
def __init__(self, vocab_size=30_000, UNK_token= '<UNK>', n=3):
|
||
|
if (n <= 1 or n % 2 == 0):
|
||
|
raise "change N value !!!"
|
||
|
self.n = n
|
||
|
self.vocab_size = vocab_size
|
||
|
self.UNK_token = UNK_token
|
||
|
|
||
|
def train(self, corpus:list) -> None:
|
||
|
if(self.n > 1):
|
||
|
self.n_grams = list(nltk.ngrams(corpus, n=self.n))
|
||
|
else:
|
||
|
self.n_grams = corpus
|
||
|
self.counter = Counter(self.n_grams)
|
||
|
self.words_counter = Counter(corpus)
|
||
|
self.all_quantities = Counter([gram[:math.floor(self.n/2)]+gram[math.ceil(self.n/2):] for gram in self.n_grams])
|
||
|
|
||
|
self.all_grams = defaultdict(set)
|
||
|
|
||
|
for gram in tqdm(self.n_grams):
|
||
|
previous_words = tuple(gram[:math.floor(self.n/2)])
|
||
|
next_words = tuple(gram[math.ceil(self.n/2):])
|
||
|
word = gram[math.floor(self.n/2)]
|
||
|
self.all_grams[(previous_words, next_words)].add(word)
|
||
|
|
||
|
def get_conditional_prob_for_word(self, left_text: list, right_text: list, word: str) -> float:
|
||
|
previous_words = tuple(left_text[-math.floor(self.n/2):])
|
||
|
next_words = tuple(right_text[:math.floor(self.n/2)])
|
||
|
quantity = self.counter[previous_words + tuple([word]) + next_words]
|
||
|
all_quantity = self.all_quantities[previous_words + next_words]
|
||
|
if (all_quantity <= 0):
|
||
|
return 0
|
||
|
return quantity/all_quantity
|
||
|
|
||
|
def get_prob_for_text(self, text: list) -> float:
|
||
|
prob = 1
|
||
|
for gram in list(nltk.ngrams(text, self.n)):
|
||
|
prob *= self.get_conditional_prob_for_word(gram[:math.floor(self.n/2)], gram[math.ceil(self.n/2):], gram[math.floor(self.n/2)])
|
||
|
return prob
|
||
|
|
||
|
def most_probable_words(self, left_text: list, right_text: list) -> str:
|
||
|
previous_words = tuple(left_text[-math.floor(self.n/2):])
|
||
|
next_words = tuple(right_text[:math.floor(self.n/2)])
|
||
|
all_words = self.all_grams[(previous_words, next_words)]
|
||
|
best_words = []
|
||
|
for word in all_words:
|
||
|
probability = self.get_conditional_prob_for_word(list(previous_words), list(next_words), word)
|
||
|
best_words.append((word, probability))
|
||
|
return sorted(best_words, key=(lambda l: l[1]), reverse=True)[:20]
|
||
|
|
||
|
def generate_text(self, text_beggining:list, text_ending:list, greedy: bool) -> list:
|
||
|
words = self.most_probable_words(text_beggining, text_ending)
|
||
|
return words
|
||
|
|
||
|
dataframeList = pd.read_csv(directory, sep='\t', header=None, names=['FileId', 'Year', 'LeftContext', 'RightContext'], quoting=csv.QUOTE_NONE, chunksize=10000)
|
||
|
|
||
|
expectedList = pd.read_csv(directory, sep='\t', header=None, names=['Word'], quoting=csv.QUOTE_NONE, chunksize=10000)
|
||
|
|
||
|
DATASET = ""
|
||
|
|
||
|
for number, (dataframe, expected) in enumerate(zip(dataframeList, expectedList)):
|
||
|
dataframe = dataframe.replace(r'\\r|\\n|\n|\\t', ' ', regex=True)
|
||
|
|
||
|
left_text = dataframe['LeftContext'].to_list()
|
||
|
right_text = dataframe['RightContext'].to_list()
|
||
|
word = expected['Word'].to_list()
|
||
|
|
||
|
lines = zip(left_text, word, right_text)
|
||
|
lines = list(map(lambda l: " ".join(l), lines))
|
||
|
DATASET = DATASET + " ".join(lines)
|
||
|
|
||
|
FINAL_DATASET = re.split(r"\s+", DATASET)
|
||
|
print(FINAL_DATASET[:100])
|
||
|
|
||
|
model_3gram = Model(n = 3)
|
||
|
model_3gram.train(FINAL_DATASET)
|
||
|
|
||
|
model = model_3gram
|
||
|
|
||
|
def convert_predictions(line):
|
||
|
sum_predictions = np.sum([pred[1] for pred in line])
|
||
|
result = ""
|
||
|
all_pred = 0
|
||
|
for word, pred in line:
|
||
|
new_pred = math.floor(pred / sum_predictions * 100) / 100
|
||
|
if(new_pred == 1.0):
|
||
|
new_pred = 0.99
|
||
|
all_pred = all_pred + new_pred
|
||
|
result = result + word + ":" + str(new_pred) + " "
|
||
|
if(round(all_pred, 2) < 1):
|
||
|
result = result + ":" + str(round(1 - all_pred, 2))
|
||
|
else:
|
||
|
result = result + ":" + str(0.01)
|
||
|
return result
|
||
|
|
||
|
# PREDICTION FOR DEV-0
|
||
|
|
||
|
dataframe = pd.read_csv(directory_dev_0, sep='\t', header=None, names=['FileId', 'Year', 'LeftContext', 'RightContext'], quoting=csv.QUOTE_NONE)
|
||
|
dataframe = dataframe.replace(r'\\r|\\n|\n|\\t', ' ', regex=True)
|
||
|
|
||
|
left_text = dataframe['LeftContext'].apply(lambda l: re.split(r"\s+", l)).to_list()
|
||
|
right_text = dataframe['RightContext'].apply(lambda l: re.split(r"\s+", l)).to_list()
|
||
|
|
||
|
lines = zip(left_text, right_text)
|
||
|
lines = list(map(lambda l: model.generate_text(l[0], l[1], False), tqdm(lines)))
|
||
|
print(lines[:100])
|
||
|
|
||
|
with open("dev-0/out.tsv", "w", encoding="UTF-8") as file:
|
||
|
result = "\n".join(list(map(lambda l: convert_predictions(l), tqdm(lines))))
|
||
|
file.write(result)
|
||
|
file.close()
|
||
|
|
||
|
# PREDICTION FOR TEST-A
|
||
|
|
||
|
dataframe = pd.read_csv(directory_test_A, sep='\t', header=None, names=['FileId', 'Year', 'LeftContext', 'RightContext'], quoting=csv.QUOTE_NONE)
|
||
|
dataframe = dataframe.replace(r'\\r|\\n|\n|\\t', ' ', regex=True)
|
||
|
|
||
|
left_text = dataframe['LeftContext'].apply(lambda l: re.split(r"\s+", l)).to_list()
|
||
|
right_text = dataframe['RightContext'].apply(lambda l: re.split(r"\s+", l)).to_list()
|
||
|
|
||
|
lines = zip(left_text, right_text)
|
||
|
lines = list(map(lambda l: model.generate_text(l[0], l[1], False), tqdm(lines)))
|
||
|
print(lines[:100])
|
||
|
|
||
|
with open("test-A/out.tsv", "w", encoding="UTF-8") as file:
|
||
|
result = "\n".join(list(map(lambda l: convert_predictions(l), tqdm(lines))))
|
||
|
file.write(result)
|
||
|
file.close()
|