164 lines
5.1 KiB
Python
164 lines
5.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import itertools
|
|
import lzma
|
|
import numpy as np
|
|
import regex as re
|
|
import torch
|
|
from torch import nn
|
|
from torch.utils.data import IterableDataset, DataLoader
|
|
from torchtext.vocab import build_vocab_from_iterator
|
|
|
|
vocab_size = 30000
|
|
embed_size = 300
|
|
hidden_size = 150
|
|
batch_size = 1000
|
|
device = 'cuda'
|
|
train_path = 'train/in.tsv.xz'
|
|
model_path = 'model.bin'
|
|
|
|
# Commented out IPython magic to ensure Python compatibility.
|
|
from google.colab import drive
|
|
drive.mount('modelowanie_jezyka', force_remount=True)
|
|
# %cd /content/modelowanie_jezyka/MyDrive/modelowanie_jezyka
|
|
|
|
def process_line(line):
|
|
separated = line.split('\t')
|
|
left = separated[6].replace(r'\n', ' ').strip()
|
|
right = separated[7].replace(r'\n', ' ').strip()
|
|
line = left + ' ' + right
|
|
return line
|
|
|
|
|
|
def get_line(line):
|
|
line = process_line(line)
|
|
for word in line.split():
|
|
yield word
|
|
|
|
def get_word_lines_from_file(file_name):
|
|
i = 0
|
|
with lzma.open(file_name, mode='rt', encoding='utf-8') as fid:
|
|
for line in fid:
|
|
if i > 100_000:
|
|
break
|
|
i += 1
|
|
yield get_line(line)
|
|
|
|
def double_look_ahead_iterator(gen):
|
|
prev_prev = None
|
|
prev = None
|
|
for item in gen:
|
|
if prev_prev is not None:
|
|
yield np.asarray((prev_prev, prev, item))
|
|
prev_prev = prev
|
|
prev = item
|
|
|
|
def prediction(words, model, top):
|
|
words_tensor = [train_dataset.vocab.forward([word]) for word in words]
|
|
word_t = torch.tensor(words_tensor).view(-1).to(device)
|
|
out = model(word_t)
|
|
top = torch.topk(out[0], top)
|
|
top_indices = top.indices.tolist()
|
|
top_probs = top.values.tolist()
|
|
top_words = vocab.lookup_tokens(top_indices)
|
|
zipped = list(zip(top_words, top_probs))
|
|
for index, element in enumerate(zipped):
|
|
unk = None
|
|
if '<unk>' in element:
|
|
unk = zipped.pop(index)
|
|
zipped.append(('', unk[1]))
|
|
break
|
|
if unk is None:
|
|
zipped[-1] = ('', zipped[-1][1])
|
|
return ' '.join([f'{x[0]}:{x[1]}' for x in zipped])
|
|
|
|
def create_outputs(folder_name, model, top):
|
|
print(f'Creating outputs in {folder_name}')
|
|
with lzma.open(f'{folder_name}/in.tsv.xz', mode='rt', encoding='utf-8') as fid:
|
|
with open(f'{folder_name}/out-top={top}.tsv', 'w', encoding='utf-8', newline='\n') as f:
|
|
for line in fid:
|
|
separated = line.split('\t')
|
|
prefix = separated[6].replace(r'\n', ' ').split()[-2:]
|
|
output_line = prediction(prefix, model, top)
|
|
f.write(output_line + '\n')
|
|
|
|
def train_model(lr):
|
|
model = TrigramNeuralLanguageModel(vocab_size, embed_size, hidden_size).to(device)
|
|
data = DataLoader(train_dataset, batch_size=batch_size)
|
|
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
|
|
criterion = torch.nn.NLLLoss()
|
|
|
|
model.train()
|
|
step = 0
|
|
for batch in data:
|
|
x = batch[:, :2]
|
|
y = batch[:, 2]
|
|
x = x.to(device)
|
|
y = y.to(device)
|
|
optimizer.zero_grad()
|
|
predicted = model(x)
|
|
loss = criterion(torch.log(predicted), y)
|
|
if step % 100 == 0:
|
|
print(step, loss)
|
|
step += 1
|
|
loss.backward()
|
|
|
|
torch.nn.utils.clip_grad_norm_(model.parameters(), 10)
|
|
|
|
|
|
optimizer.step()
|
|
|
|
torch.save(model.state_dict(), model_path)
|
|
|
|
def with_hyperparams():
|
|
train_model(lr=0.001)
|
|
model = TrigramNeuralLanguageModel(vocab_size, embed_size, hidden_size).to(device)
|
|
model.load_state_dict(torch.load(model_path))
|
|
model.eval()
|
|
for top in [100, 200, 400, 600, 800]:
|
|
create_outputs('dev-0', model, top)
|
|
create_outputs('test-A', model, top)
|
|
|
|
"""### Classes"""
|
|
|
|
class Trigrams(IterableDataset):
|
|
def __init__(self, text_file, vocabulary_size):
|
|
self.vocab = build_vocab_from_iterator(
|
|
get_word_lines_from_file(text_file),
|
|
max_tokens=vocabulary_size,
|
|
specials=['<unk>'])
|
|
self.vocab.set_default_index(self.vocab['<unk>'])
|
|
self.vocabulary_size = vocabulary_size
|
|
self.text_file = text_file
|
|
|
|
def __iter__(self):
|
|
return double_look_ahead_iterator(
|
|
(self.vocab[t] for t in itertools.chain.from_iterable(get_word_lines_from_file(self.text_file))))
|
|
|
|
class TrigramNeuralLanguageModel(nn.Module):
|
|
def __init__(self, vocabulary_size, embedding_size, hidden_size):
|
|
super(TrigramNeuralLanguageModel, self).__init__()
|
|
self.embedding_size = embedding_size
|
|
self.embedding = nn.Embedding(vocabulary_size, embedding_size)
|
|
self.linear = nn.Linear(2 * embedding_size, hidden_size)
|
|
self.relu = nn.ReLU()
|
|
self.linear2 = nn.Linear(hidden_size, vocabulary_size)
|
|
self.softmax = nn.Softmax()
|
|
|
|
def forward(self, x):
|
|
x = self.embedding(x).view((-1, 2 * self.embedding_size))
|
|
x = self.linear(x)
|
|
x = self.relu(x)
|
|
x = self.linear2(x)
|
|
return self.softmax(x)
|
|
|
|
vocab = build_vocab_from_iterator(
|
|
get_word_lines_from_file(train_path),
|
|
max_tokens=vocab_size,
|
|
specials=['<unk>']
|
|
)
|
|
|
|
vocab.set_default_index(vocab['<unk>'])
|
|
train_dataset = Trigrams(train_path, vocab_size)
|
|
with_hyperparams()
|