challenging-america-word-ga.../neural-trigram.ipynb
2022-05-08 19:33:06 +02:00

7.0 KiB
Raw Permalink Blame History

from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import DataLoader
import torch
from torch import nn
import pandas as pd
import nltk
import regex as re
import csv
import itertools
from nltk import word_tokenize
from os.path import exists


def clean(text):
    text = str(text).strip().lower()
    text = re.sub("|>|<|\.|\\\\\\\\|\"|”|-|,|\*|:|\/", "", text)
    text = text.replace('\\\\\\\\n', " ").replace("'t", " not").replace("'s", " is").replace("'ll", " will").replace("'m", " am").replace("'ve", " have")
    text = text.replace("'", "")
    return text

def get_words_from_line(line, specials = True):
    line = line.rstrip()
    if specials:
        yield '<s>'
    for m in re.finditer(r'[\p{L}0-9\*]+|\p{P}+', line):
        yield m.group(0).lower()
    if specials:
        yield '</s>'

def get_word_lines_from_data(d):
    for line in d:
        yield get_words_from_line(line)


class SimpleBigramNeuralLanguageModel(torch.nn.Module):
    
    def __init__(self, vocabulary_size, embedding_size):
        super(SimpleBigramNeuralLanguageModel, self).__init__()
        self.model = nn.Sequential(
        nn.Embedding(vocabulary_size, embedding_size),
        nn.Linear(embedding_size, vocabulary_size),
        nn.Softmax()
        )

    def forward(self, x):
        return self.model(x)

def look_ahead_iterator(gen):
    w1 = None
    for item in gen:
        if w1 is not None:
            yield (w1, item)
        w1 = item
        
class Bigrams(torch.utils.data.IterableDataset):
    def __init__(self, data, vocabulary_size):
        self.vocab = build_vocab_from_iterator(
           get_word_lines_from_data(data),
           max_tokens = vocabulary_size,
           specials = ['<unk>'])
        self.vocab.set_default_index(self.vocab['<unk>'])
        self.vocabulary_size = vocabulary_size
        self.data = data

    def __iter__(self):
         return look_ahead_iterator(
             (self.vocab[t] for t in itertools.chain.from_iterable(get_word_lines_from_data(self.data))))


# ładowanie danych treningowych
in_file = 'train/in.tsv.xz'
out_file = 'train/expected.tsv'

X_train = pd.read_csv(in_file, sep='\t',  header=None, quoting=csv.QUOTE_NONE, nrows=200000, on_bad_lines="skip", encoding="UTF-8")
Y_train = pd.read_csv(out_file, sep='\t', header=None, quoting=csv.QUOTE_NONE, nrows=200000, on_bad_lines="skip", encoding="UTF-8")

X_train = X_train[[6, 7]]
X_train = pd.concat([X_train, Y_train], axis=1)
X_train = X_train[6] + X_train[0] + X_train[7]
X_train = X_train.apply(clean)
vocab_size = 30000
embed_size = 150
Dataset = Bigrams(X_train, vocab_size)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = SimpleBigramNeuralLanguageModel(vocab_size, embed_size).to(device)

if(not exists('nn_model2.bin')):
    data = DataLoader(Dataset, batch_size=8000)
    optimizer = torch.optim.Adam(model.parameters())
    criterion = torch.nn.NLLLoss()

    model.train()
    step = 0
    for i in range(2):
        print(f" Epoka {i}--------------------------------------------------------")
        for x, y in data:
            x = x.to(device)
            y = y.to(device)
            optimizer.zero_grad()
            ypredicted = model(x)
            loss = criterion(torch.log(ypredicted), y)
            if step % 100 == 0:
                print(step, loss)
            step += 1
            loss.backward()
            optimizer.step()

    torch.save(model.state_dict(), 'nn_model2.bin')
else:
    model.load_state_dict(torch.load('nn_model2.bin')) 


vocab = Dataset.vocab


# nltk.download('punkt')
def predict_word(ws):
    ixs = torch.tensor(vocab.forward(ws)).to(device)
    out = model(ixs)
    top = torch.topk(out[0], 8)
    top_indices = top.indices.tolist()
    top_probs = top.values.tolist()
    top_words = vocab.lookup_tokens(top_indices)
    pred_str = ""
    for word, prob in list(zip(top_words, top_probs)):
                pred_str += f"{word}:{prob} "
#     pred_str  += f':0.01'
    return pred_str


def word_gap_prediction(file):
    X_test = pd.read_csv(f'{file}/in.tsv.xz', sep='\t', header=None, quoting=csv.QUOTE_NONE,  on_bad_lines='skip', encoding="UTF-8")[6]
    X_test = X_test.apply(clean)
    with open(f'{file}/out.tsv', "w+", encoding="UTF-8") as f:
        for row in X_test:
            result = {}
            before = None
            for before in get_words_from_line(clean(str(row)), False):
                pass
            before = [before]
            if(len(before) < 1):
                pred_str = "a:0.2 the:0.2 to:0.2 of:0.1 and:0.1 of:0.1 :0.1"
            else:
                pred_str = predict_word(before)
            pred_str = pred_str.strip()
            f.write(pred_str + "\n")
word_gap_prediction("dev-0/")
word_gap_prediction("test-A/")