249 lines
7.1 KiB
Python
249 lines
7.1 KiB
Python
# %% [markdown]
|
|
# # <b>Trigram</b> neural network model for gap fill task
|
|
|
|
# %% [markdown]
|
|
# ## Import required packages
|
|
|
|
# %%
|
|
from tqdm import tqdm
|
|
import re
|
|
import nltk
|
|
import os
|
|
import csv
|
|
import pandas as pd
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.optim as optim
|
|
import sys
|
|
import numpy as np
|
|
from torch.utils.data import DataLoader, TensorDataset
|
|
from bidict import bidict
|
|
import math
|
|
from sklearn.utils import shuffle
|
|
from collections import Counter
|
|
import random
|
|
|
|
# %%
|
|
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
|
|
os.environ['TORCH_USE_CUDA_DSA'] = '1'
|
|
|
|
# %% [markdown]
|
|
# ## Global configuration variables
|
|
|
|
# %%
|
|
vocab_size = 60_000
|
|
batch_size = 64
|
|
embedding_dim = 64
|
|
hidden_dim = 1024
|
|
learning_rate = 0.001
|
|
epochs = 20
|
|
|
|
output_size = vocab_size
|
|
|
|
# %%
|
|
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
device = torch.device("cpu")
|
|
print(device)
|
|
|
|
# %% [markdown]
|
|
# ## Load train data corpus
|
|
|
|
# %%
|
|
dataset_dir = os.path.join('..', 'train', 'in.tsv.xz')
|
|
expected_dir = os.path.join('..', 'train', 'expected.tsv')
|
|
|
|
df = pd.read_csv(dataset_dir, sep='\t', header=None, names=['FileId', 'Year', 'LeftContext', 'RightContext'], quoting=csv.QUOTE_NONE, dtype=str, chunksize=1000)
|
|
expected_df = pd.read_csv(expected_dir, sep='\t', header=None, names=['Word'], quoting=csv.QUOTE_NONE, dtype=str, chunksize=1000)
|
|
|
|
|
|
input_corpus = []
|
|
target_corpus = []
|
|
|
|
left_tokens = 1
|
|
right_tokens = 1
|
|
|
|
for j, (df, expected_df) in tqdm(enumerate(zip(df, expected_df)), total=433):
|
|
df = df.replace(r'\\r+|\\n+|\\t+', ' ', regex=True)
|
|
|
|
for left_context, word, right_context in zip(df['LeftContext'].to_list(), expected_df['Word'].to_list(), df['RightContext'].to_list()):
|
|
target_corpus.append([str(word).strip()])
|
|
input_corpus.append(re.split(r"\s+", left_context.strip())[-left_tokens:] + re.split(r"\s+", right_context.strip())[:right_tokens])
|
|
|
|
# %% [markdown]
|
|
# ## Create dictionaries for mapping words to indices
|
|
|
|
# %%
|
|
def flatten(matrix):
|
|
flat_list = []
|
|
for row in matrix:
|
|
flat_list += row
|
|
return flat_list
|
|
|
|
# %%
|
|
word_to_ix = bidict({})
|
|
words_corpus = flatten(input_corpus) + flatten(target_corpus)
|
|
|
|
counts = Counter(words_corpus)
|
|
|
|
for word, _ in tqdm(counts.most_common(vocab_size - 1)):
|
|
if word not in word_to_ix:
|
|
word_to_ix[word] = len(word_to_ix) + 1
|
|
|
|
# %% [markdown]
|
|
# ## Tokenize entire corpus
|
|
|
|
# %%
|
|
def tokenize(w):
|
|
if w in word_to_ix:
|
|
return word_to_ix[w]
|
|
else:
|
|
return 0
|
|
|
|
tokenized_input_corpus = []
|
|
tokenized_target_corpus = []
|
|
|
|
for words in tqdm(input_corpus):
|
|
tokenized_input_corpus.append([tokenize(word) for word in words])
|
|
|
|
for words in tqdm(target_corpus):
|
|
tokenized_target_corpus.append([tokenize(word) for word in words])
|
|
|
|
# %%
|
|
tokenized_input_corpus, tokenized_target_corpus = shuffle(tokenized_input_corpus, tokenized_target_corpus)
|
|
|
|
# %% [markdown]
|
|
# ## Create dataset
|
|
|
|
# %%
|
|
indices = np.nonzero(np.array(tokenized_target_corpus).flatten())
|
|
|
|
tokenized_input_corpus = np.take(tokenized_input_corpus, indices, axis=0)
|
|
tokenized_target_corpus = np.take(tokenized_target_corpus, indices, axis=0)
|
|
|
|
# %%
|
|
input_corpus_tensor = torch.flatten(torch.tensor(tokenized_input_corpus, dtype=torch.long, device=device), end_dim=-2)
|
|
target_corpus_tensor = torch.flatten(torch.tensor(tokenized_target_corpus, dtype=torch.long, device=device)).reshape(-1, 1)
|
|
|
|
# %%
|
|
print(input_corpus_tensor.size())
|
|
print(target_corpus_tensor.size())
|
|
|
|
# %%
|
|
random_index = random.randint(0, len(input_corpus_tensor) - 1)
|
|
|
|
# Get random element from input corpus
|
|
random_input_element = input_corpus_tensor[random_index]
|
|
|
|
# Get corresponding element from target corpus
|
|
random_target_element = target_corpus_tensor[random_index]
|
|
|
|
print([word_to_ix.inverse[int(idx)] if int(idx) > 0 else '<UNK>' for idx in random_input_element])
|
|
print([word_to_ix.inverse[int(idx)] if int(idx) > 0 else '<UNK>' for idx in random_target_element])
|
|
|
|
# %%
|
|
dataset = TensorDataset(input_corpus_tensor[:10_000], target_corpus_tensor[:10_000])
|
|
|
|
# %%
|
|
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
|
|
|
|
# %% [markdown]
|
|
# ## Define the trigram neural network model
|
|
|
|
# %%
|
|
class TrigramNN(nn.Module):
|
|
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_size):
|
|
super(TrigramNN, self).__init__()
|
|
self.embedding = nn.Embedding(vocab_size, embedding_dim)
|
|
self.linear1 = nn.Linear(embedding_dim * (left_tokens + right_tokens), hidden_dim)
|
|
self.linear2 = nn.Linear(hidden_dim, output_size)
|
|
|
|
def forward(self, inputs):
|
|
out = self.embedding(inputs)
|
|
out = out.view(inputs.size(0), -1)
|
|
out = torch.softmax(self.linear1(out), dim=1)
|
|
out = self.linear2(out)
|
|
return out
|
|
|
|
# %% [markdown]
|
|
# ## Initialize the model, loss function, and optimizer
|
|
|
|
# %%
|
|
model = TrigramNN(vocab_size, embedding_dim, hidden_dim, output_size)
|
|
criterion = nn.CrossEntropyLoss()
|
|
optimizer = optim.SGD(model.parameters(), lr=learning_rate)
|
|
|
|
# %% [markdown]
|
|
# ## Training loop
|
|
|
|
# %%
|
|
model.to(device)
|
|
|
|
for epoch in range(epochs):
|
|
total_loss = 0
|
|
for batch_inputs, batch_targets in tqdm(dataloader):
|
|
batch_inputs, batch_targets = batch_inputs.to(device), batch_targets.to(device)
|
|
|
|
model.zero_grad()
|
|
output = model(batch_inputs)
|
|
|
|
loss = criterion(output, batch_targets.view(-1))
|
|
total_loss += loss.item()
|
|
|
|
loss.backward()
|
|
optimizer.step()
|
|
|
|
print(f"Epoch {epoch+1}, Loss: {total_loss/len(dataloader)}")
|
|
|
|
# %% [markdown]
|
|
# ## Write function to convert index to word
|
|
|
|
# %%
|
|
def idx_to_word(idx):
|
|
idx = int(idx)
|
|
if idx not in word_to_ix.inverse:
|
|
return '<UNK>'
|
|
return word_to_ix.inverse[idx]
|
|
|
|
# %% [markdown]
|
|
# ## test the model
|
|
|
|
# %%
|
|
def predict(left_context, right_context):
|
|
with torch.no_grad():
|
|
context = left_context + right_context
|
|
test_context_idxs = torch.tensor([[tokenize(x) for x in context]], device=device)
|
|
output = model(test_context_idxs)
|
|
top_predicted_scores, top_predicted_indices = torch.topk(output, 5)
|
|
predictions = list(zip(top_predicted_scores[0], top_predicted_indices[0]))
|
|
predictions = [(float(score), idx_to_word(idx)) for score, idx in predictions]
|
|
total_score = np.sum([score for score, _ in predictions])
|
|
predictions = ' '.join([f"{word}:{score}" for score, word in predictions]) + ' :' + str(1.0 - total_score)
|
|
return predictions
|
|
|
|
# %%
|
|
print(predict(["came", "fiom"], []))
|
|
|
|
# %% [markdown]
|
|
# # Generate result for dev dataset
|
|
|
|
# %%
|
|
dataset_dir = os.path.join('..', 'dev-0', 'in.tsv.xz')
|
|
output_dir = os.path.join('..', 'dev-0', 'out.tsv')
|
|
|
|
df = pd.read_csv(dataset_dir, sep='\t', header=None, names=['FileId', 'Year', 'LeftContext', 'RightContext'], quoting=csv.QUOTE_NONE)
|
|
df = df.replace(r'\\r+|\\n+|\\t+', ' ', regex=True)
|
|
|
|
# %%
|
|
final = ""
|
|
|
|
for i, (_, row) in tqdm(enumerate(df.iterrows()), total=len(df)):
|
|
left_context = re.split(r"\s+", row['LeftContext'].strip())[-left_tokens:]
|
|
right_context = re.split(r"\s+", row['RightContext'].strip())[:right_tokens]
|
|
|
|
final += predict(left_context, right_context) + '\n'
|
|
|
|
with open(output_dir, 'w', encoding="UTF-8") as f:
|
|
f.write(final)
|
|
|
|
|