challenging-america-word-ga.../trigram.ipynb
2024-05-22 03:25:24 +02:00

10 KiB

from torch.utils.data import IterableDataset, DataLoader
from torchtext.vocab import build_vocab_from_iterator

import regex as re
import sys
import itertools
from itertools import islice

from torch import nn
import torch

from tqdm.notebook import tqdm

embed_size = 300
vocab_size = 30_000
num_epochs = 1
device = 'cuda'
batch_size = 8192
train_file_path = 'train/train.txt'

with open(train_file_path, 'r', encoding='utf-8') as file:
    total = len(file.readlines())
S:\WENV_TORCHTEXT\Lib\site-packages\torchtext\vocab\__init__.py:4: UserWarning: 
/!\ IMPORTANT WARNING ABOUT TORCHTEXT STATUS /!\ 
Torchtext is deprecated and the last released version will be 0.18 (this one). You can silence this warning by calling the following at the beginnign of your scripts: `import torchtext; torchtext.disable_torchtext_deprecation_warning()`
  warnings.warn(torchtext._TORCHTEXT_DEPRECATION_MSG)
S:\WENV_TORCHTEXT\Lib\site-packages\torchtext\utils.py:4: UserWarning: 
/!\ IMPORTANT WARNING ABOUT TORCHTEXT STATUS /!\ 
Torchtext is deprecated and the last released version will be 0.18 (this one). You can silence this warning by calling the following at the beginnign of your scripts: `import torchtext; torchtext.disable_torchtext_deprecation_warning()`
  warnings.warn(torchtext._TORCHTEXT_DEPRECATION_MSG)
# Function to extract words from a line of text
def get_words_from_line(line):
    line = line.rstrip()
    yield '<s>'
    for m in re.finditer(r'[\p{L}0-9\*]+|\p{P}+', line):
        yield m.group(0).lower()
    yield '</s>'

# Generator to read lines from a file
def get_word_lines_from_file(file_name):
    limit = total * 2
    with open(file_name, 'r', encoding='utf8') as fh:
        for line in fh:
            limit -= 1
            if not limit:
                break
            yield get_words_from_line(line)

# Function to create trigrams from a sequence
def look_ahead_iterator(gen):
    prev1, prev2 = None, None
    for item in gen:
        if prev1 is not None and prev2 is not None:
            yield (prev2, prev1, item)
        prev2 = prev1
        prev1 = item

# Dataset class for trigrams
class Trigrams(IterableDataset):
    def __init__(self, text_file, vocabulary_size):
        self.vocab = build_vocab_from_iterator(
            get_word_lines_from_file(text_file),
            max_tokens=vocabulary_size,
            specials=['<unk>']
        )
        self.vocab.set_default_index(self.vocab['<unk>'])
        self.vocabulary_size = vocabulary_size
        self.text_file = text_file

    def __iter__(self):
        return look_ahead_iterator(
            (self.vocab[t] for t in itertools.chain.from_iterable(get_word_lines_from_file(self.text_file)))
        )

# Instantiate the dataset
train_dataset = Trigrams(train_file_path, vocab_size)
class SimpleTrigramNeuralLanguageModel(nn.Module):
    def __init__(self, vocabulary_size, embedding_size):
        super(SimpleTrigramNeuralLanguageModel, self).__init__()
        self.embedding = nn.Embedding(vocabulary_size, embedding_size)
        self.linear1 = nn.Linear(embedding_size * 2, embedding_size)
        self.linear2 = nn.Linear(embedding_size, vocabulary_size)
        self.softmax = nn.Softmax(dim=1)
        self.embedding_size = embedding_size

    def forward(self, x):
        embeds = self.embedding(x).view(x.size(0), -1)
        out = self.linear1(embeds)
        out = self.linear2(out)
        return self.softmax(out)

model = SimpleTrigramNeuralLanguageModel(vocab_size, embed_size).to(device)
data = DataLoader(train_dataset, batch_size=batch_size)
optimizer = torch.optim.Adam(model.parameters())
criterion = torch.nn.CrossEntropyLoss()
model.train()
step = 0
for _ in range(num_epochs):
    for x1,x2,y in tqdm(data, desc="Train loop"):
        y = y.to(device)
        x = torch.cat((x1.unsqueeze(1),x2.unsqueeze(1)), dim=1).to(device)
        optimizer.zero_grad()
        ypredicted = model(x)
        
        loss = criterion(torch.log(ypredicted), y)
        if step % 5000 == 0:
           print(step, loss)
        step += 1
        loss.backward()
        optimizer.step()
    step = 0
model.eval()
Train loop: 0it [00:00, ?it/s]
0 tensor(10.3631, device='cuda:0', grad_fn=<NllLossBackward0>)
5000 tensor(5.7081, device='cuda:0', grad_fn=<NllLossBackward0>)
10000 tensor(5.5925, device='cuda:0', grad_fn=<NllLossBackward0>)
15000 tensor(5.5097, device='cuda:0', grad_fn=<NllLossBackward0>)
SimpleTrigramNeuralLanguageModel(
  (embedding): Embedding(30000, 300)
  (linear1): Linear(in_features=600, out_features=300, bias=True)
  (linear2): Linear(in_features=300, out_features=30000, bias=True)
  (softmax): Softmax(dim=1)
)
def get_gap_candidates(words, n=10, vocab=train_dataset.vocab):
    ixs = vocab(words)
    ixs = torch.tensor(ixs).unsqueeze(0).to(device)

    out = model(ixs)
    top = torch.topk(out[0], n)
    top_indices = top.indices.tolist()
    top_probs = top.values.tolist()
    top_words = vocab.lookup_tokens(top_indices)
    return list(zip(top_words, top_probs))
def clean(text):
    text = text.replace('-\\\\n', '').replace('\\\\n', ' ').replace('\\\\t', ' ')
    text = re.sub(r'\n', ' ', text)
    text = re.sub(r'(?<=\w)[,-](?=\w)', '', text)
    text = re.sub(r'\s+', ' ', text)
    text = re.sub(r'\p{P}', '', text)
    text = text.strip()
    return text
    
def predictor(prefix):
    words = clean(prefix)
    candidates = get_gap_candidates(words.strip().split(' ')[-2:])

    probs_sum = 0
    output = ''
    for word,prob in candidates:
        if word == "<unk>":
            continue
        probs_sum += prob
        output += f"{word}:{prob} "
    output += f":{1-probs_sum}"

    return output
def generate_result(input_path, output_path='out.tsv'):
    lines = []
    with open(input_path, encoding='utf-8') as f:
        for line in f:
            columns = line.split('\t')
            prefix = columns[6]
            suffix = columns[7]
            lines.append(prefix)

    with open(output_path, 'w', encoding='utf-8') as output_file:
        for line in lines:
            result = predictor(line)
            output_file.write(result + '\n')
generate_result('dev-0/in.tsv', output_path='dev-0/out.tsv')