challenging-america-word-ga.../trigram.ipynb
2024-05-14 17:10:07 +02:00

12 KiB

from torch.utils.data import IterableDataset, DataLoader
from torchtext.vocab import build_vocab_from_iterator

import regex as re
import sys
import itertools
from itertools import islice

from torch import nn
import torch

from tqdm.notebook import tqdm

embed_size = 100
vocab_size = 25_000
num_epochs = 1
device = 'cuda'
batch_size = 2048
train_file_path = 'train/train.txt'

with open(train_file_path, 'r', encoding='utf-8') as file:
    total = len(file.readlines())
S:\WENV\Lib\site-packages\torchtext\vocab\__init__.py:4: UserWarning: 
/!\ IMPORTANT WARNING ABOUT TORCHTEXT STATUS /!\ 
Torchtext is deprecated and the last released version will be 0.18 (this one). You can silence this warning by calling the following at the beginnign of your scripts: `import torchtext; torchtext.disable_torchtext_deprecation_warning()`
  warnings.warn(torchtext._TORCHTEXT_DEPRECATION_MSG)
S:\WENV\Lib\site-packages\torchtext\utils.py:4: UserWarning: 
/!\ IMPORTANT WARNING ABOUT TORCHTEXT STATUS /!\ 
Torchtext is deprecated and the last released version will be 0.18 (this one). You can silence this warning by calling the following at the beginnign of your scripts: `import torchtext; torchtext.disable_torchtext_deprecation_warning()`
  warnings.warn(torchtext._TORCHTEXT_DEPRECATION_MSG)
# Function to extract words from a line of text
def get_words_from_line(line):
    line = line.rstrip()
    yield '<s>'
    for m in re.finditer(r'[\p{L}0-9\*]+|\p{P}+', line):
        yield m.group(0).lower()
    yield '</s>'

# Generator to read lines from a file
def get_word_lines_from_file(file_name):
    limit = total * 2
    with open(file_name, 'r', encoding='utf8') as fh:
        for line in tqdm(fh, total=total):
            limit -= 1
            if not limit:
                break
            yield get_words_from_line(line)

# Function to create trigrams from a sequence
def look_ahead_iterator(gen):
    prev1, prev2 = None, None
    for item in gen:
        if prev1 is not None and prev2 is not None:
            yield (prev2, prev1, item)
        prev2 = prev1
        prev1 = item

# Dataset class for trigrams
class Trigrams(IterableDataset):
    def __init__(self, text_file, vocabulary_size):
        self.vocab = build_vocab_from_iterator(
            get_word_lines_from_file(text_file),
            max_tokens=vocabulary_size,
            specials=['<unk>']
        )
        self.vocab.set_default_index(self.vocab['<unk>'])
        self.vocabulary_size = vocabulary_size
        self.text_file = text_file

    def __iter__(self):
        return look_ahead_iterator(
            (self.vocab[t] for t in itertools.chain.from_iterable(get_word_lines_from_file(self.text_file)))
        )

# Instantiate the dataset
train_dataset = Trigrams(train_file_path, vocab_size)
  0%|          | 0/432022 [00:00<?, ?it/s]
# Neural network model for trigram language modeling
class SimpleTrigramNeuralLanguageModel(nn.Module):
    def __init__(self, vocabulary_size, embedding_size):
        super(SimpleTrigramNeuralLanguageModel, self).__init__()
        self.embedding = nn.Embedding(vocabulary_size, embedding_size)
        self.linear1 = nn.Linear(embedding_size * 2, embedding_size)
        self.linear2 = nn.Linear(embedding_size, vocabulary_size)
        self.softmax = nn.Softmax(dim=1)
        self.embedding_size = embedding_size

    def forward(self, x):
        embeds = self.embedding(x).view(-1, self.embedding_size * 2)
        out = torch.relu(self.linear1(embeds))
        out = self.linear2(out)
        return self.softmax(out)

# Instantiate the model
model = SimpleTrigramNeuralLanguageModel(vocab_size, embed_size)
model = SimpleTrigramNeuralLanguageModel(vocab_size, embed_size).to(device)
data = DataLoader(train_dataset, batch_size=batch_size)
optimizer = torch.optim.Adam(model.parameters())
criterion = torch.nn.NLLLoss()

model.train()
step = 0
for _ in range(num_epochs):
    for x1,x2,y in tqdm(data, total=total):
       x = torch.cat((x1,x2), dim=0).to(device)
       y = y.to(device)
       optimizer.zero_grad()
       ypredicted = model(x)
       loss = criterion(torch.log(ypredicted), y)
       if step % 5000 == 0:
          print(step, loss)
       step += 1
       loss.backward()
       optimizer.step()

model.eval()
  0%|          | 0/432022 [00:00<?, ?it/s]
  0%|          | 0/432022 [00:00<?, ?it/s]
0 tensor(10.1654, device='cuda:0', grad_fn=<NllLossBackward0>)
5000 tensor(6.5147, device='cuda:0', grad_fn=<NllLossBackward0>)
10000 tensor(6.6747, device='cuda:0', grad_fn=<NllLossBackward0>)
15000 tensor(6.9061, device='cuda:0', grad_fn=<NllLossBackward0>)
20000 tensor(6.8899, device='cuda:0', grad_fn=<NllLossBackward0>)
25000 tensor(6.8373, device='cuda:0', grad_fn=<NllLossBackward0>)
30000 tensor(6.8942, device='cuda:0', grad_fn=<NllLossBackward0>)
35000 tensor(6.9564, device='cuda:0', grad_fn=<NllLossBackward0>)
40000 tensor(6.9709, device='cuda:0', grad_fn=<NllLossBackward0>)
45000 tensor(6.9592, device='cuda:0', grad_fn=<NllLossBackward0>)
50000 tensor(6.8195, device='cuda:0', grad_fn=<NllLossBackward0>)
55000 tensor(6.7074, device='cuda:0', grad_fn=<NllLossBackward0>)
60000 tensor(6.8755, device='cuda:0', grad_fn=<NllLossBackward0>)
65000 tensor(6.9605, device='cuda:0', grad_fn=<NllLossBackward0>)
SimpleTrigramNeuralLanguageModel(
  (embedding): Embedding(25000, 100)
  (linear1): Linear(in_features=200, out_features=100, bias=True)
  (linear2): Linear(in_features=100, out_features=25000, bias=True)
  (softmax): Softmax(dim=1)
)
def get_gap_candidates(words, n=10, vocab=train_dataset.vocab):
    ixs = vocab.forward(words)
    ixs = torch.tensor(ixs)
    ixs = torch.cat(tuple([ixs]), dim=0).to(device)

    out = model(ixs)
    top = torch.topk(out[0], n)
    top_indices = top.indices.tolist()
    top_probs = top.values.tolist()
    top_words = vocab.lookup_tokens(top_indices)
    return list(zip(top_words, top_probs))
def clean(text):
    text = text.replace('-\\\\n', '').replace('\\\\n', ' ').replace('\\\\t', ' ')
    text = re.sub(r'\n', ' ', text)
    text = re.sub(r'(?<=\w)[,-](?=\w)', '', text)
    text = re.sub(r'\s+', ' ', text)
    text = re.sub(r'\p{P}', '', text)
    text = text.strip()
    return text
    
def predictor(prefix):
    words = clean(prefix)
    candidates = get_gap_candidates(words.strip().split(' ')[-2:])

    probs_sum = 0
    output = ''
    for word,prob in candidates:
        if word == "<unk>":
            continue
        probs_sum += prob
        output += f"{word}:{prob} "
    output += f":{1-probs_sum}"

    return output
predictor("I really bug")
'the:0.07267232984304428 of:0.043321046978235245 and:0.032147664576768875 to:0.02692588046193123 a:0.020654045045375824 in:0.020213929936289787 that:0.010836434550583363 is:0.00959325022995472 it:0.008407277055084705 :0.755228141322732'
def generate_result(input_path, output_path='out.tsv'):
    with open(input_path, encoding='utf-8') as f:
        lines = f.readlines()

    with open(output_path, 'w', encoding='utf-8') as output_file:
        for line in lines:
            result = predictor(line)
            output_file.write(result + '\n')
generate_result('dev-0/in.tsv', output_path='dev-0/out.tsv')