# %% [markdown] # # Trigram neural network model for gap fill task # %% [markdown] # ## Import required packages # %% from tqdm import tqdm import re import nltk import os import csv import pandas as pd import torch import torch.nn as nn import torch.optim as optim import sys import numpy as np from torch.utils.data import DataLoader, TensorDataset from bidict import bidict import torchtext.vocab as vocab import math from collections import Counter # %% [markdown] # ## Load train data corpus # %% dataset_dir = os.path.join('..', 'train', 'in.tsv.xz') expected_dir = os.path.join('..', 'train', 'expected.tsv') df = pd.read_csv(dataset_dir, sep='\t', header=None, names=['FileId', 'Year', 'LeftContext', 'RightContext'], quoting=csv.QUOTE_NONE, dtype=str, chunksize=1000) expected_df = pd.read_csv(expected_dir, sep='\t', header=None, names=['Word'], quoting=csv.QUOTE_NONE, dtype=str, chunksize=1000) corpus = [] for j, (df, expected_df) in tqdm(enumerate(zip(df, expected_df)), total=433): df = df.replace(r'\\r+|\\n+|\\t+', ' ', regex=True) for left_context, word, right_context in zip(df['LeftContext'].to_list(), expected_df['Word'].to_list(), df['LeftContext'].to_list()): corpus.extend(re.split(r"\s+", left_context.strip()) + [str(word).strip()] + re.split(r"\s+", right_context.strip())) # %% [markdown] # ## Create dictionaries for mapping words to indices # %% word_to_ix = bidict({}) counts = Counter(corpus) for word, _ in tqdm(counts.most_common(1_500_000)): if word not in word_to_ix: word_to_ix[word] = len(word_to_ix) + 1 # %% [markdown] # ## Tokenize entire corpus # %% device = torch.device("cuda" if torch.cuda.is_available() else "cpu") def tokenize(w): if w in word_to_ix: return word_to_ix[w] else: return 0 tokenized_corpus = [] for word in tqdm(corpus): tokenized_corpus.append(tokenize(word)) # %% [markdown] # ## Create n-grams # %% tokenized_training_corpus = [] ngrams = list(nltk.ngrams(tokenized_corpus, n=7)) np.random.shuffle(ngrams) ngrams = ngrams[:100_000] ngrams_tensor = torch.tensor(ngrams, dtype=torch.long, device=device) indices = torch.any(ngrams_tensor == 0, dim=1) ngrams_tensor = ngrams_tensor[~indices] # %% target_tensor = ngrams_tensor[:, 3].reshape(-1, 1).to(device) input_tensor = torch.cat((ngrams_tensor[:, :3], ngrams_tensor[:, 4:]), dim=1).to(device) # %% batched_input_tensor = torch.split(input_tensor, 512) batched_target_tensor = torch.split(target_tensor, 512) # %% [markdown] # ## Define the trigram neural network model # %% class TrigramNN(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, output_size): super(TrigramNN, self).__init__() self.embedding = nn.Embedding(vocab_size, 50) self.linear1 = nn.Linear(50 * 6, output_size) def forward(self, inputs): out = self.embedding(inputs) out = out.view(inputs.size(0), -1) out = self.linear1(out) out = torch.softmax(out, dim=1) return out # %% [markdown] # ## Define training parameters # %% batch_size = 512 vocab_size = len(word_to_ix) + 1 embedding_dim = 10 hidden_dim = 64 output_size = vocab_size learning_rate = 0.005 epochs = 1 # %% [markdown] # ## Initialize the model, loss function, and optimizer # %% model = TrigramNN(vocab_size, embedding_dim, hidden_dim, output_size) criterion = nn.CrossEntropyLoss() optimizer = optim.SGD(model.parameters(), lr=learning_rate) # %% [markdown] # ## Training loop # %% model.to(device) batches = list(zip(batched_input_tensor, batched_target_tensor)) for epoch in range(epochs): total_loss = 0 for batch_inputs, batch_targets in tqdm(batches): model.zero_grad() output = model(batch_inputs) loss = criterion(output, batch_targets.view(-1)) total_loss += loss.item() loss.backward() optimizer.step() print(f"Epoch {epoch+1}, Loss: {total_loss/len(batches)}") # %% [markdown] # ## Write function to convert index to word # %% def idx_to_word(idx): idx = int(idx) if idx not in word_to_ix.inverse: return '' return word_to_ix.inverse[idx] # %% [markdown] # ## test the model # %% def predict(left_context, right_context): with torch.no_grad(): context = left_context + right_context test_context_idxs = torch.tensor([[tokenize(x) for x in context]], device=device) output = model(test_context_idxs) top_predicted_scores, top_predicted_indices = torch.topk(output, 5) predictions = list(zip(top_predicted_scores[0], top_predicted_indices[0])) predictions = [(round(float(score), 2), idx_to_word(idx)) for score, idx in predictions] total_score = np.sum([score for score, _ in predictions]) predictions = ' '.join([f"{word}:{round(score/total_score, 2)}" for score, word in predictions]) + ' :0.01' return predictions # %% test_context = ["came", "fiom", "the", "place", "to", "this"] print(predict(test_context[:3], test_context[3:])) # %% [markdown] # # Generate result for dev dataset # %% dataset_dir = os.path.join('..', 'dev-0', 'in.tsv.xz') output_dir = os.path.join('..', 'dev-0', 'out.tsv') df = pd.read_csv(dataset_dir, sep='\t', header=None, names=['FileId', 'Year', 'LeftContext', 'RightContext'], quoting=csv.QUOTE_NONE) df = df.replace(r'\\r+|\\n+|\\t+', ' ', regex=True) # %% final = "" for i, (_, row) in tqdm(enumerate(df.iterrows()), total=len(df)): left_context = re.split(r"\s+", row['LeftContext'].strip())[-3:] right_context = re.split(r"\s+", row['RightContext'].strip())[:3] final += predict(left_context, right_context) + '\n' with open(output_dir, 'w', encoding="UTF-8") as f: f.write(final)