# %% [markdown] # # Trigram neural network model for gap fill task # %% [markdown] # ## Import required packages # %% from tqdm import tqdm import re import nltk import os import csv import pandas as pd import torch import torch.nn as nn import torch.optim as optim import sys import numpy as np from torch.utils.data import DataLoader, TensorDataset from bidict import bidict import math from sklearn.utils import shuffle from collections import Counter import random # %% os.environ['CUDA_LAUNCH_BLOCKING'] = '1' os.environ['TORCH_USE_CUDA_DSA'] = '1' # %% [markdown] # ## Global configuration variables # %% vocab_size = 60_000 batch_size = 64 embedding_dim = 64 hidden_dim = 1024 learning_rate = 0.001 epochs = 20 output_size = vocab_size # %% # device = torch.device("cuda" if torch.cuda.is_available() else "cpu") device = torch.device("cpu") print(device) # %% [markdown] # ## Load train data corpus # %% dataset_dir = os.path.join('..', 'train', 'in.tsv.xz') expected_dir = os.path.join('..', 'train', 'expected.tsv') df = pd.read_csv(dataset_dir, sep='\t', header=None, names=['FileId', 'Year', 'LeftContext', 'RightContext'], quoting=csv.QUOTE_NONE, dtype=str, chunksize=1000) expected_df = pd.read_csv(expected_dir, sep='\t', header=None, names=['Word'], quoting=csv.QUOTE_NONE, dtype=str, chunksize=1000) input_corpus = [] target_corpus = [] left_tokens = 1 right_tokens = 1 for j, (df, expected_df) in tqdm(enumerate(zip(df, expected_df)), total=433): df = df.replace(r'\\r+|\\n+|\\t+', ' ', regex=True) for left_context, word, right_context in zip(df['LeftContext'].to_list(), expected_df['Word'].to_list(), df['RightContext'].to_list()): target_corpus.append([str(word).strip()]) input_corpus.append(re.split(r"\s+", left_context.strip())[-left_tokens:] + re.split(r"\s+", right_context.strip())[:right_tokens]) # %% [markdown] # ## Create dictionaries for mapping words to indices # %% def flatten(matrix): flat_list = [] for row in matrix: flat_list += row return flat_list # %% word_to_ix = bidict({}) words_corpus = flatten(input_corpus) + flatten(target_corpus) counts = Counter(words_corpus) for word, _ in tqdm(counts.most_common(vocab_size - 1)): if word not in word_to_ix: word_to_ix[word] = len(word_to_ix) + 1 # %% [markdown] # ## Tokenize entire corpus # %% def tokenize(w): if w in word_to_ix: return word_to_ix[w] else: return 0 tokenized_input_corpus = [] tokenized_target_corpus = [] for words in tqdm(input_corpus): tokenized_input_corpus.append([tokenize(word) for word in words]) for words in tqdm(target_corpus): tokenized_target_corpus.append([tokenize(word) for word in words]) # %% tokenized_input_corpus, tokenized_target_corpus = shuffle(tokenized_input_corpus, tokenized_target_corpus) # %% [markdown] # ## Create dataset # %% indices = np.nonzero(np.array(tokenized_target_corpus).flatten()) tokenized_input_corpus = np.take(tokenized_input_corpus, indices, axis=0) tokenized_target_corpus = np.take(tokenized_target_corpus, indices, axis=0) # %% input_corpus_tensor = torch.flatten(torch.tensor(tokenized_input_corpus, dtype=torch.long, device=device), end_dim=-2) target_corpus_tensor = torch.flatten(torch.tensor(tokenized_target_corpus, dtype=torch.long, device=device)).reshape(-1, 1) # %% print(input_corpus_tensor.size()) print(target_corpus_tensor.size()) # %% random_index = random.randint(0, len(input_corpus_tensor) - 1) # Get random element from input corpus random_input_element = input_corpus_tensor[random_index] # Get corresponding element from target corpus random_target_element = target_corpus_tensor[random_index] print([word_to_ix.inverse[int(idx)] if int(idx) > 0 else '' for idx in random_input_element]) print([word_to_ix.inverse[int(idx)] if int(idx) > 0 else '' for idx in random_target_element]) # %% dataset = TensorDataset(input_corpus_tensor[:10_000], target_corpus_tensor[:10_000]) # %% dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True) # %% [markdown] # ## Define the trigram neural network model # %% class TrigramNN(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, output_size): super(TrigramNN, self).__init__() self.embedding = nn.Embedding(vocab_size, embedding_dim) self.linear1 = nn.Linear(embedding_dim * (left_tokens + right_tokens), hidden_dim) self.linear2 = nn.Linear(hidden_dim, output_size) def forward(self, inputs): out = self.embedding(inputs) out = out.view(inputs.size(0), -1) out = torch.softmax(self.linear1(out), dim=1) out = self.linear2(out) return out # %% [markdown] # ## Initialize the model, loss function, and optimizer # %% model = TrigramNN(vocab_size, embedding_dim, hidden_dim, output_size) criterion = nn.CrossEntropyLoss() optimizer = optim.SGD(model.parameters(), lr=learning_rate) # %% [markdown] # ## Training loop # %% model.to(device) for epoch in range(epochs): total_loss = 0 for batch_inputs, batch_targets in tqdm(dataloader): batch_inputs, batch_targets = batch_inputs.to(device), batch_targets.to(device) model.zero_grad() output = model(batch_inputs) loss = criterion(output, batch_targets.view(-1)) total_loss += loss.item() loss.backward() optimizer.step() print(f"Epoch {epoch+1}, Loss: {total_loss/len(dataloader)}") # %% [markdown] # ## Write function to convert index to word # %% def idx_to_word(idx): idx = int(idx) if idx not in word_to_ix.inverse: return '' return word_to_ix.inverse[idx] # %% [markdown] # ## test the model # %% def predict(left_context, right_context): with torch.no_grad(): context = left_context + right_context test_context_idxs = torch.tensor([[tokenize(x) for x in context]], device=device) output = model(test_context_idxs) top_predicted_scores, top_predicted_indices = torch.topk(output, 5) predictions = list(zip(top_predicted_scores[0], top_predicted_indices[0])) predictions = [(float(score), idx_to_word(idx)) for score, idx in predictions] total_score = np.sum([score for score, _ in predictions]) predictions = ' '.join([f"{word}:{score}" for score, word in predictions]) + ' :' + str(1.0 - total_score) return predictions # %% print(predict(["came", "fiom"], [])) # %% [markdown] # # Generate result for dev dataset # %% dataset_dir = os.path.join('..', 'dev-0', 'in.tsv.xz') output_dir = os.path.join('..', 'dev-0', 'out.tsv') df = pd.read_csv(dataset_dir, sep='\t', header=None, names=['FileId', 'Year', 'LeftContext', 'RightContext'], quoting=csv.QUOTE_NONE) df = df.replace(r'\\r+|\\n+|\\t+', ' ', regex=True) # %% final = "" for i, (_, row) in tqdm(enumerate(df.iterrows()), total=len(df)): left_context = re.split(r"\s+", row['LeftContext'].strip())[-left_tokens:] right_context = re.split(r"\s+", row['RightContext'].strip())[:right_tokens] final += predict(left_context, right_context) + '\n' with open(output_dir, 'w', encoding="UTF-8") as f: f.write(final)