#!/usr/bin/env python # coding: utf-8 # In[2]: from itertools import islice import regex as re import sys from torchtext.vocab import build_vocab_from_iterator import pandas as pd from torch import nn import torch from torch.utils.data import IterableDataset import itertools from torch.utils.data import DataLoader from torch.utils.data import DataLoader import csv from tqdm import tqdm from nltk import trigrams, word_tokenize VOCAB_SIZE = 20000 EMBED_SIZE = 100 CONTEXT_SIZE = 2 # hidden units H = 100 def get_words_from_line(line): line = clean(line) line = line.rstrip() yield '' for m in re.finditer(r'[\p{L}0-9\*]+|\p{P}+', line): yield m.group(0).lower() yield '' def get_word_lines_from_file(file_name): with open(file_name, 'r') as fh: for line in fh: yield get_words_from_line(line) def clean(text): text = str(text).lower().replace("-\\n", "").replace("\\n", " ") return re.sub(r"\p{P}", "", text) vocab = build_vocab_from_iterator( get_word_lines_from_file('train-300k.txt'), max_tokens = VOCAB_SIZE, specials = ['']) !shuf < train-300k.txt > train-300k.shuf.txt class SimpleTrigramNeuralLanguageModel(nn.Module): def __init__(self, vocabulary_size, embedding_size, context_size, h): super(SimpleTrigramNeuralLanguageModel, self).__init__() self.context_size = context_size self.embedding_size = embedding_size self.embeddings = nn.Embedding(vocabulary_size, embedding_size) self.linear1 = nn.Linear(context_size * embedding_size, h) self.linear2 = nn.Linear(h, vocabulary_size, bias = False) self.softmax = nn.Softmax() def forward(self, x): embeds = self.embeddings(x).view((-1,self.context_size * self.embedding_size)) out = torch.tanh(self.linear1(embeds)) out = self.linear2(out) log_probs = self.softmax(out) return log_probs def look_ahead_iterator(gen): prev_1 = None prev_2 = None for item in gen: if prev_1 is not None and prev_2 is not None: yield (prev_1, prev_2, item) if prev_1 is None: prev_1 = item elif prev_2 is None: prev_2 = item class Trigrams(IterableDataset): def __init__(self, text_file, vocabulary_size): self.vocab = build_vocab_from_iterator( get_word_lines_from_file(text_file), max_tokens = vocabulary_size, specials = ['']) self.vocab.set_default_index(self.vocab['']) self.vocabulary_size = vocabulary_size self.text_file = text_file def __iter__(self): return look_ahead_iterator((self.vocab[t] for t in itertools.chain.from_iterable(get_word_lines_from_file(self.text_file)))) model = SimpleTrigramNeuralLanguageModel(VOCAB_SIZE, EMBED_SIZE, CONTEXT_SIZE, H) vocab.set_default_index(vocab['']) # def decrease_train_set_size(lines_amount): # lines = [] # with open('train.txt', 'r') as fh: # for line in fh: # lines.append(line) # lines_amount -= 1 # if(lines_amount == 0): # break # with open('train-300k.txt', 'w') as fh: # for line in lines: # fh.write(line) # fh.write('\n') # decrease_train_set_size(300000) train_dataset = Trigrams('train-300k.shuf.txt', VOCAB_SIZE) device = 'cpu' model = SimpleTrigramNeuralLanguageModel(VOCAB_SIZE, EMBED_SIZE, CONTEXT_SIZE, H).to(device) data = DataLoader(train_dataset, batch_size=5000) optimizer = torch.optim.Adam(model.parameters()) criterion = torch.nn.NLLLoss() model.train() step = 0 for x1, x2, y in data: x = torch.stack((x1,x2), 0) x = x.to(device) y = y.to(device) optimizer.zero_grad() ypredicted = model(x) loss = criterion(torch.log(ypredicted), y) if step % 100 == 0: print(step, loss) step += 1 loss.backward() optimizer.step() torch.save(model.state_dict(), 'model1.bin') print(model) def predict(words): vocab = train_dataset.vocab ixs = torch.tensor(vocab.forward(words)).to(device) predictions = model(ixs) top_predictions = torch.topk(predictions[0], 5) top_indices = top_predictions.indices.tolist() top_probs = top_predictions.values.tolist() top_words = vocab.lookup_tokens(top_indices) result_list = list(zip(top_words, top_probs)) total_prob = 0.0 str_prediction = "" for word, prob in result_list: total_prob += prob str_prediction += f"{word}:{prob} " if not total_prob: return "the:0.2 be:0.2 to:0.2 of:0.1 and:0.1 a:0.1 :0.1" if 1 - total_prob >= 0.01: str_prediction += f":{1-total_prob}" else: str_prediction += f":0.01" return str_prediction def predict_data(read_path, save_path): data = pd.read_csv( read_path, sep="\t", error_bad_lines=False, header=None, quoting=csv.QUOTE_NONE ) with open(save_path, "w", encoding="utf-8") as file: for _, row in tqdm(data.iterrows()): words = word_tokenize(clean(row[6])) if len(words) < 3: prediction = "the:0.2 be:0.2 to:0.2 of:0.1 and:0.1 a:0.1 :0.1" else: prediction = predict(words[-2:]) file.write(prediction + "\n") print("Predicting...") print("Dev set") predict_data("dev-0/in.tsv.xz", "dev-0/out.tsv") print("Test set") predict_data("test-A/in.tsv.xz", "test-A/out.tsv")