199 lines
4.9 KiB
Python
199 lines
4.9 KiB
Python
#!/usr/bin/env python
|
|
# coding: utf-8
|
|
|
|
# In[1]:
|
|
|
|
|
|
from torch.utils.data import IterableDataset, DataLoader
|
|
from torchtext.vocab import build_vocab_from_iterator
|
|
|
|
import regex as re
|
|
import sys
|
|
import itertools
|
|
from itertools import islice
|
|
|
|
from torch import nn
|
|
import torch
|
|
|
|
from tqdm.notebook import tqdm
|
|
|
|
embed_size = 300
|
|
vocab_size = 30_000
|
|
num_epochs = 1
|
|
device = 'cuda'
|
|
batch_size = 8192
|
|
train_file_path = 'train/train.txt'
|
|
|
|
with open(train_file_path, 'r', encoding='utf-8') as file:
|
|
total = len(file.readlines())
|
|
|
|
|
|
# In[2]:
|
|
|
|
|
|
# Function to extract words from a line of text
|
|
def get_words_from_line(line):
|
|
line = line.rstrip()
|
|
yield '<s>'
|
|
for m in re.finditer(r'[\p{L}0-9\*]+|\p{P}+', line):
|
|
yield m.group(0).lower()
|
|
yield '</s>'
|
|
|
|
# Generator to read lines from a file
|
|
def get_word_lines_from_file(file_name):
|
|
limit = total * 2
|
|
with open(file_name, 'r', encoding='utf8') as fh:
|
|
for line in fh:
|
|
limit -= 1
|
|
if not limit:
|
|
break
|
|
yield get_words_from_line(line)
|
|
|
|
# Function to create trigrams from a sequence
|
|
def look_ahead_iterator(gen):
|
|
prev1, prev2 = None, None
|
|
for item in gen:
|
|
if prev1 is not None and prev2 is not None:
|
|
yield (prev2, prev1, item)
|
|
prev2 = prev1
|
|
prev1 = item
|
|
|
|
# Dataset class for trigrams
|
|
class Trigrams(IterableDataset):
|
|
def __init__(self, text_file, vocabulary_size):
|
|
self.vocab = build_vocab_from_iterator(
|
|
get_word_lines_from_file(text_file),
|
|
max_tokens=vocabulary_size,
|
|
specials=['<unk>']
|
|
)
|
|
self.vocab.set_default_index(self.vocab['<unk>'])
|
|
self.vocabulary_size = vocabulary_size
|
|
self.text_file = text_file
|
|
|
|
def __iter__(self):
|
|
return look_ahead_iterator(
|
|
(self.vocab[t] for t in itertools.chain.from_iterable(get_word_lines_from_file(self.text_file)))
|
|
)
|
|
|
|
# Instantiate the dataset
|
|
train_dataset = Trigrams(train_file_path, vocab_size)
|
|
|
|
|
|
# In[3]:
|
|
|
|
|
|
class SimpleTrigramNeuralLanguageModel(nn.Module):
|
|
def __init__(self, vocabulary_size, embedding_size):
|
|
super(SimpleTrigramNeuralLanguageModel, self).__init__()
|
|
self.embedding = nn.Embedding(vocabulary_size, embedding_size)
|
|
self.linear1 = nn.Linear(embedding_size * 2, embedding_size)
|
|
self.linear2 = nn.Linear(embedding_size, vocabulary_size)
|
|
self.softmax = nn.Softmax(dim=1)
|
|
self.embedding_size = embedding_size
|
|
|
|
def forward(self, x):
|
|
embeds = self.embedding(x).view(x.size(0), -1)
|
|
out = self.linear1(embeds)
|
|
out = self.linear2(out)
|
|
return self.softmax(out)
|
|
|
|
model = SimpleTrigramNeuralLanguageModel(vocab_size, embed_size).to(device)
|
|
|
|
|
|
# In[4]:
|
|
|
|
|
|
data = DataLoader(train_dataset, batch_size=batch_size)
|
|
optimizer = torch.optim.Adam(model.parameters())
|
|
criterion = torch.nn.CrossEntropyLoss()
|
|
|
|
|
|
# In[5]:
|
|
|
|
|
|
model.train()
|
|
step = 0
|
|
for _ in range(num_epochs):
|
|
for x1,x2,y in tqdm(data, desc="Train loop"):
|
|
y = y.to(device)
|
|
x = torch.cat((x1.unsqueeze(1),x2.unsqueeze(1)), dim=1).to(device)
|
|
optimizer.zero_grad()
|
|
ypredicted = model(x)
|
|
|
|
loss = criterion(torch.log(ypredicted), y)
|
|
if step % 5000 == 0:
|
|
print(step, loss)
|
|
step += 1
|
|
loss.backward()
|
|
optimizer.step()
|
|
step = 0
|
|
model.eval()
|
|
|
|
|
|
# In[6]:
|
|
|
|
|
|
def get_gap_candidates(words, n=10, vocab=train_dataset.vocab):
|
|
ixs = vocab(words)
|
|
ixs = torch.tensor(ixs).unsqueeze(0).to(device)
|
|
|
|
out = model(ixs)
|
|
top = torch.topk(out[0], n)
|
|
top_indices = top.indices.tolist()
|
|
top_probs = top.values.tolist()
|
|
top_words = vocab.lookup_tokens(top_indices)
|
|
return list(zip(top_words, top_probs))
|
|
|
|
|
|
# In[7]:
|
|
|
|
|
|
def clean(text):
|
|
text = text.replace('-\\n', '').replace('\\n', ' ').replace('\\t', ' ')
|
|
text = re.sub(r'\n', ' ', text)
|
|
text = re.sub(r'(?<=\w)[,-](?=\w)', '', text)
|
|
text = re.sub(r'\s+', ' ', text)
|
|
text = re.sub(r'\p{P}', '', text)
|
|
text = text.strip()
|
|
return text
|
|
|
|
def predictor(prefix):
|
|
words = clean(prefix)
|
|
candidates = get_gap_candidates(words.strip().split(' ')[-2:])
|
|
|
|
probs_sum = 0
|
|
output = ''
|
|
for word,prob in candidates:
|
|
if word == "<unk>":
|
|
continue
|
|
probs_sum += prob
|
|
output += f"{word}:{prob} "
|
|
output += f":{1-probs_sum}"
|
|
|
|
return output
|
|
|
|
|
|
# In[8]:
|
|
|
|
|
|
def generate_result(input_path, output_path='out.tsv'):
|
|
lines = []
|
|
with open(input_path, encoding='utf-8') as f:
|
|
for line in f:
|
|
columns = line.split('\t')
|
|
prefix = columns[6]
|
|
suffix = columns[7]
|
|
lines.append(prefix)
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as output_file:
|
|
for line in lines:
|
|
result = predictor(line)
|
|
output_file.write(result + '\n')
|
|
|
|
|
|
# In[9]:
|
|
|
|
|
|
generate_result('dev-0/in.tsv', output_path='dev-0/out.tsv')
|
|
|