challenging-america-word-ga.../run.py
2024-05-22 05:20:42 +02:00

191 lines
5.0 KiB
Python

#!/usr/bin/env python
# coding: utf-8
# In[1]:
from torch.utils.data import IterableDataset, DataLoader
from torchtext.vocab import build_vocab_from_iterator
import regex as re
import itertools
from itertools import islice
from torch import nn
import torch
from tqdm.notebook import tqdm
embed_size = 300
vocab_size = 30_000
num_epochs = 1
device = 'cuda'
batch_size = 8192
train_file_path = 'train/train.txt'
# In[2]:
# Function to extract words from a line of text
def get_words_from_line(line):
line = line.rstrip()
yield '<s>'
for m in re.finditer(r'[\p{L}0-9\*]+|\p{P}+', line):
yield m.group(0).lower()
yield '</s>'
# Generator to read lines from a file
def get_word_lines_from_file(file_name):
with open(file_name, 'r', encoding='utf8') as fh:
for line in fh:
yield get_words_from_line(line)
# Function to create 5-grams from a sequence
def look_ahead_iterator(gen):
prev2, prev1, next1, next2 = None, None, None, None
for item in gen:
if prev2 is not None and prev1 is not None and next1 is not None and next2 is not None:
yield (prev2, prev1, next2, item, next1)
prev2, prev1, next1, next2 = prev1, next1, next2, item
# Dataset class for 5-grams
class FiveGrams(IterableDataset):
def __init__(self, text_file, vocabulary_size):
self.vocab = build_vocab_from_iterator(
get_word_lines_from_file(text_file),
max_tokens=vocabulary_size,
specials=['<unk>']
)
self.vocab.set_default_index(self.vocab['<unk>'])
self.vocabulary_size = vocabulary_size
self.text_file = text_file
def __iter__(self):
return look_ahead_iterator(
(self.vocab[t] for t in itertools.chain.from_iterable(get_word_lines_from_file(self.text_file)))
)
# Instantiate the dataset
train_dataset = FiveGrams(train_file_path, vocab_size)
# In[3]:
i = 0
for x in train_dataset:
print(train_dataset.vocab.lookup_tokens(x))
if i >= 1:
break
i += 1
# In[4]:
class SimpleFiveGramNeuralLanguageModel(nn.Module):
def __init__(self, vocabulary_size, embedding_size):
super(SimpleFiveGramNeuralLanguageModel, self).__init__()
self.embedding = nn.Embedding(vocabulary_size, embedding_size)
self.linear1 = nn.Linear(embedding_size * 4, embedding_size)
self.linear2 = nn.Linear(embedding_size, vocabulary_size)
self.softmax = nn.Softmax(dim=1)
self.embedding_size = embedding_size
def forward(self, x):
embeds = self.embedding(x).view(x.size(0), -1)
out = self.linear1(embeds)
out = self.linear2(out)
return self.softmax(out)
model = SimpleFiveGramNeuralLanguageModel(vocab_size, embed_size).to(device)
# In[5]:
data = DataLoader(train_dataset, batch_size=batch_size)
optimizer = torch.optim.Adam(model.parameters())
criterion = torch.nn.CrossEntropyLoss()
model.train()
step = 0
for _ in range(num_epochs):
for x1, x2, x3, x4, y in tqdm(data, desc="Train loop"):
y = y.to(device)
x = torch.cat((x1.unsqueeze(1), x2.unsqueeze(1), x3.unsqueeze(1), x4.unsqueeze(1)), dim=1).to(device)
optimizer.zero_grad()
ypredicted = model(x)
loss = criterion(torch.log(ypredicted), y)
if step % 5000 == 0:
print(step, loss)
step += 1
loss.backward()
optimizer.step()
step = 0
model.eval()
# In[8]:
def get_gap_candidates(words, n=20, vocab=train_dataset.vocab):
ixs = vocab(words)
ixs = torch.tensor(ixs).unsqueeze(0).to(device)
out = model(ixs)
top = torch.topk(out[0], n)
top_indices = top.indices.tolist()
top_probs = top.values.tolist()
top_words = vocab.lookup_tokens(top_indices)
return list(zip(top_words, top_probs))
def clean(text):
text = text.replace('-\\n', '').replace('\\n', ' ').replace('\\t', ' ')
text = re.sub(r'\n', ' ', text)
text = re.sub(r'(?<=\w)[,-](?=\w)', '', text)
text = re.sub(r'\s+', ' ', text)
text = re.sub(r'\p{P}', '', text)
text = text.strip()
return text
def predictor(prefix, suffix):
prefix = clean(prefix)
suffix = clean(suffix)
words = prefix.split(' ')[-2:] + suffix.split(' ')[:2]
candidates = get_gap_candidates(words)
probs_sum = 0
output = ''
for word, prob in candidates:
if word == "<unk>":
continue
probs_sum += prob
output += f"{word}:{prob} "
output += f":{1-probs_sum}"
return output
# In[9]:
def generate_result(input_path, output_path='out.tsv'):
lines = []
with open(input_path, encoding='utf-8') as f:
for line in f:
columns = line.split('\t')
prefix = columns[6]
suffix = columns[7]
lines.append((prefix, suffix))
with open(output_path, 'w', encoding='utf-8') as output_file:
for prefix, suffix in tqdm(lines):
result = predictor(prefix, suffix)
output_file.write(result + '\n')
generate_result('dev-0/in.tsv', output_path='dev-0/out.tsv')