32 KiB
32 KiB
Seq2Seq translation
import pandas as pd
import numpy as np
# Pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader, RandomSampler
import torch.nn.functional as F
import torchtext
torchtext.disable_torchtext_deprecation_warning()
from torchtext.data.metrics import bleu_score
from unidecode import unidecode
import regex as re
from string import punctuation
import random
from tqdm.notebook import tqdm
# Use GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
Load data corpus
# Load the data
df = pd.read_csv('pol-eng/pol.txt', sep='\t', header=None, usecols=[0, 1], names=['source', 'target'])
df_1000 = df.head(1000)
df = df_1000
df
source | target | |
---|---|---|
0 | Go. | Idź. |
1 | Hi. | Cześć. |
2 | Run! | Uciekaj! |
3 | Run. | Biegnij. |
4 | Run. | Uciekaj. |
... | ... | ... |
995 | We walked. | Poszliśmy pieszo. |
996 | We yawned. | Ziewaliśmy. |
997 | We'll see. | Zobaczymy. |
998 | We're men. | Jesteśmy mężczyznami. |
999 | We're sad. | Jesteśmy smutni. |
1000 rows × 2 columns
Initial data preprocessing
# Helper class to create language vocabularies
# <bos> - beginning of sentence token - 0
# <eos> - end of sentence token - 1
bos_token = 0
eos_token = 1
class Lang:
def __init__(self, name):
self.name = name
self.word2index = {'<bos>': 0, '<eos>': 1}
self.word2count = {}
self.index2word = {0: '<bos>', 1: '<eos>'}
self.n_words = 2 # Count <bos> and <eos>
def add_sentence(self, sentence):
for word in sentence.split(' '):
self.add_word(word)
def add_word(self, word):
if word not in self.word2index:
self.word2index[word] = self.n_words
self.word2count[word] = 1
self.index2word[self.n_words] = word
self.n_words += 1
else:
self.word2count[word] += 1
# Helper methods to preprocess data
def normalizeText(text, ascii: bool = False):
# Convert to ASCII
if ascii:
text = unidecode(text)
# Lowercase and trim whitespace
text = text.lower().strip()
# Remove non-letter characters
text = re.sub(r"[" + punctuation + "]", "", text)
return text
# Method for data preparation (vocabularies, pairs of sentences)
def prepareData(df, source_lang, target_lang, ascii: bool = False):
# Normalize source and target sentences
df['source'] = df['source'].apply(lambda x: normalizeText(x, ascii=ascii))
df['target'] = df['target'].apply(lambda x: normalizeText(x, ascii=ascii))
# Get pairs of sentences
pairs = list(zip(df['source'], df['target']))
# Create language vocabularies
source_lang = Lang(source_lang)
target_lang = Lang(target_lang)
for source_sentence, target_sentence in pairs:
source_lang.add_sentence(source_sentence)
target_lang.add_sentence(target_sentence)
return source_lang, target_lang, pairs
Prepare data for training
# Convert sentence to list of indexes (with <bos> and <eos> tokens)
def indexesFromSentence(lang, sentence):
return [bos_token] + [lang.word2index[word] for word in sentence.split(' ')] + [eos_token]
# Convert sentence to tensor of indexes (with <bos> and <eos> tokens)
def tensorFromSentence(lang, sentence):
return torch.tensor(indexesFromSentence(lang, sentence), dtype=torch.long, device=device).view(-1, 1)
# Data loader
def prepareDataLoader(df, source_lang, target_lang, batch_size: int = 32, ascii: bool = False):
# Prepare data (vocabularies, pairs of sentences)
source_lang, target_lang, pairs = prepareData(df, source_lang, target_lang, ascii=ascii)
# Get maximum length of sentence
MAX_LENGTH = max(np.max([len(sentence.split(' ')) for sentence in df['source']]), np.max([len(sentence.split(' ')) for sentence in df['target']])) + 2
# Get number of pairs
n_pairs = len(pairs)
# Initialize tensors (source and target)
source_indexes = np.zeros((n_pairs, MAX_LENGTH), dtype=np.int32)
target_indexes = np.zeros((n_pairs, MAX_LENGTH), dtype=np.int32)
# Fill tensors
for idx, (source_sentence, target_sentence) in enumerate(pairs):
source_idx = indexesFromSentence(source_lang, source_sentence)
target_idx = indexesFromSentence(target_lang, target_sentence)
source_indexes[idx, :len(source_idx)] = source_idx
target_indexes[idx, :len(target_idx)] = target_idx
# Tensor dataset
train_data = TensorDataset(torch.LongTensor(source_indexes).to(device),
torch.LongTensor(target_indexes).to(device))
# Sampler
train_sampler = RandomSampler(train_data)
# Data loader
train_loader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)
return source_lang, target_lang, pairs, train_loader, MAX_LENGTH
Seq2Seq model - Encoder and Decoder
# Encoder
class EncoderRNN(nn.Module):
def __init__(self, input_size: int, hidden_size: int = 100, dropout: float = 0.1):
super(EncoderRNN, self).__init__()
self.hidden_size = hidden_size
# Embedding layer
self.embedding = nn.Embedding(input_size, hidden_size)
# GRU layer
self.gru = nn.GRU(hidden_size, hidden_size, batch_first=True)
# Dropout layer for regularization
self.dropout = nn.Dropout(p=dropout)
def forward(self, input):
# Transform input (as tensor of word indexes) to embeddings vectors
embedded = self.embedding(input)
# Apply dropout to embeddings
embedded = self.dropout(embedded)
# Pass embeddings through GRU and get output and hidden state
output, hidden = self.gru(embedded)
return output, hidden
# What is Tearcher Forcing - https://saturncloud.io/glossary/teacher-forcing/#:~:text=What%20is%20Teacher%20Forcing%3F,%2C%20translation%2C%20and%20text%20generation.
class BahdanauAttention(nn.Module):
def __init__(self, hidden_size):
super(BahdanauAttention, self).__init__()
self.hidden_size = hidden_size
# Linear layer to transform encoder output to attention weights
self.Wa = nn.Linear(hidden_size, hidden_size)
self.Ua = nn.Linear(hidden_size, hidden_size)
self.Va = nn.Linear(hidden_size, 1)
def forward(self, query, keys):
"""
:param query: hidden state from decoder
:param keys: output from encoder
:return: context vector and attention weights
"""
scores = self.Va(torch.tanh(self.Wa(query) + self.Ua(keys)))
scores = scores.squeeze(2).unsqueeze(1)
# Apply softmax to get attention weights
weights = F.softmax(scores, dim=-1)
# Calculate context vector
context = torch.bmm(weights, keys)
return context, weights
# Decoder
class DecoderRNN(nn.Module):
def __init__(self, hidden_size, output_size, dropout_p: float = 0.1):
super(DecoderRNN, self).__init__()
# Embedding layer
self.embedding = nn.Embedding(output_size, hidden_size)
# Attention layer
self.attention = BahdanauAttention(hidden_size)
# GRU layer - input is concatenation of embeddings and context vector
self.gru = nn.GRU(2 * hidden_size, hidden_size, batch_first=True)
# Linear layer to get output
self.out = nn.Linear(hidden_size, output_size)
# Dropout layer for regularization
self.dropout = nn.Dropout(p=dropout_p)
def forward(self, encoder_outputs, encoder_hidden, target_tensor=None):
"""
:param encoder_outputs: output from encoder
:param encoder_hidden: last hidden states from encoder, used as initial hidden states for decoder
:param target_tensor: target tensor - used in training with teacher forcing
:return:
"""
# Batch size
batch_size = encoder_outputs.size(0)
# Decoder input - initialize with <bos> token index
decoder_input = torch.empty(batch_size, 1, dtype=torch.long, device=device).fill_(bos_token)
# Decoder hidden state - initialize with encoder hidden state
decoder_hidden = encoder_hidden
# List to store decoder outputs
decoder_outputs = []
# List to store attention weights
attention_weights = []
# Determine the maximum length of the sequence to generate
max_length = target_tensor.size(1) if target_tensor is not None else MAX_LENGTH
# Decoder loop
for i in range(max_length):
# Forward step
decoder_output, decoder_hidden, attn_weights = self.forward_step(decoder_input, decoder_hidden, encoder_outputs)
# Save output and attention weights
decoder_outputs.append(decoder_output)
attention_weights.append(attn_weights)
# If target tensor is provided, use it for next input
if target_tensor is not None:
# Teacher forcing: next input is current target
decoder_input = target_tensor[:, i].unsqueeze(1)
else:
# Otherwise use output from current step (own prediction)
_, topi = decoder_output.topk(1)
decoder_input = topi.squeeze(-1).detach()
# Break if decoder input is <eos> token
if torch.any(decoder_input == eos_token):
break
# Concatenate outputs
decoder_outputs = torch.cat(decoder_outputs, dim=1)
# Apply log softmax to get probabilities
decoder_outputs = F.log_softmax(decoder_outputs, dim=-1)
# Concatenate attention weights
attention_weights = torch.cat(attention_weights, dim=1)
return decoder_outputs, decoder_hidden, attention_weights
def forward_step(self, decoder_input, decoder_hidden, encoder_outputs):
"""
Forward step of decoder
:param decoder_input: current input tensor for decoder
:param decoder_hidden: current hidden state of decoder
:param encoder_outputs: output from encoder
:return: output and hidden state
"""
# Transform input (as tensor of word indexes) to embeddings vectors
embedded = self.embedding(decoder_input)
# Apply dropout to embeddings
embedded = self.dropout(embedded)
# Query
query = decoder_hidden.permute(1, 0, 2)
# Context vector and attention weights
context, attn_weights = self.attention(query, encoder_outputs)
# Concatenate embeddings and context vector
input_gru = torch.cat((embedded, context), dim=2)
# GRU
output, hidden = self.gru(input_gru, decoder_hidden)
# Pass output through linear layer to get final output
output = self.out(output)
return output, hidden, attn_weights
Model training
# Model training
def train(dataloader, encoder, decoder, epochs: int = 100, learning_rate: float = 0.01, info_every: int = 10, plot_every: int = 10):
"""
:param dataloader: DataLoader with training data
:param encoder: Encoder model
:param decoder: Decoder model
:param epochs: Number of epochs
:param learning_rate: Learning rate
:param info_every: Specify how often to print information about training (default: every 10 epochs)
:param plot_every: Specify how often to plot loss (default: every 10 epochs)
:return: None
"""
# Set models to training mode
encoder.train()
decoder.train()
# Initialize optimizer
encoder_optimizer = optim.Adam(encoder.parameters(), lr=learning_rate)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=learning_rate)
# Initialize loss function
criterion = nn.NLLLoss()
# Initialize loss lists
plot_losses = []
# Training loop
for epoch in tqdm(range(epochs + 1)):
total_loss = 0
for data in dataloader:
source_tensor, target_tensor = data
# Zero gradients
encoder_optimizer.zero_grad()
decoder_optimizer.zero_grad()
# Forward pass
encoder_outputs, encoder_hidden = encoder(source_tensor)
decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden, target_tensor)
# Calculate loss
loss = criterion(
decoder_outputs.view(-1, decoder_outputs.size(-1)),
target_tensor.view(-1)
)
# Backward pass
loss.backward()
# Update weights
encoder_optimizer.step()
decoder_optimizer.step()
total_loss += loss.item()
plot_losses.append(total_loss / len(dataloader))
if epoch % info_every == 0:
print(f'Epoch: {epoch}, Loss: {total_loss / len(dataloader)}')
# Model predictions
def predict(encoder, decoder, sentence, source_lang, target_lang):
"""
:param encoder: Encoder model
:param decoder: Decoder model
:param sentence: Sentence to translate
:param source_lang: Source language vocabulary
:param target_lang: Target language vocabulary
:return: predicted sentence
"""
# Set models to evaluation mode
encoder.eval()
decoder.eval()
with torch.no_grad():
# Prepare input tensor
input_tensor = tensorFromSentence(source_lang, sentence).view(1, -1)
# Forward pass
encoder_outputs, encoder_hidden = encoder(input_tensor)
decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden)
# Get indexes of the most probable words
_, topi = decoder_outputs.topk(1)
decoded_ids = topi.squeeze()
# Check if tensor if zero-dimensional
if decoded_ids.dim() == 0:
decoded_ids = decoded_ids.view(1)
# Convert indexes to words
decoded_words = []
for idx in decoded_ids:
if idx.item() == eos_token:
decoded_words.append('<eos>')
break
decoded_words.append(target_lang.index2word[idx.item()])
return decoded_words
# Random evaluation
def random_evaluation(encoder, decoder, n: int = 10):
"""
:param encoder: Encoder model
:param decoder: Decoder model
:param n: Number of sentences to evaluate
:return: None
"""
# Set models to evaluation mode
encoder.eval()
decoder.eval()
# Get random pairs and make predictions
for i in range(n):
pair = random.choice(pairs)
print('[source]>', pair[0])
print('[target]=', pair[1])
output_words = predict(encoder, decoder, pair[0], source_lang, target_lang)
output_words = list(filter(lambda x: x not in ['<bos>', '<eos>'], output_words))
output_sentence = ' '.join(output_words)
print('[prediction]<', output_sentence)
print('')
# BLEU score
def calculate_bleu_score(encoder, decoder, pairs, source_lang, target_lang):
"""
:param encoder: Encoder model
:param decoder: Decoder model
:param pairs: List of pairs of sentences
:param source_lang: Source language vocabulary
:param target_lang: Target language vocabulary
:return: BLEU score
"""
# Initialize lists for references and candidates
references = []
candidates = []
# Loop through pairs
for idx, (source_sentence, target_sentence) in enumerate(pairs):
# Get predicted sentence
predicted_sentence = predict(encoder, decoder, source_sentence, source_lang, target_lang)
# Remove <bos> and <eos> tokens
predicted_sentence = list(filter(lambda x: x not in ['<bos>', '<eos>'], predicted_sentence))
# Add reference and candidate
references.append([target_sentence.split(' ')])
candidates.append(predicted_sentence)
return bleu_score(candidates, references)
# Model parameters
hidden_size = 128
batch_size = 32
# Prepare data
source_lang, target_lang, pairs, train_loader, MAX_LENGTH = prepareDataLoader(df, 'angielski', 'polski', batch_size=batch_size)
# Initialize encoder and decoder
encoder = EncoderRNN(source_lang.n_words, hidden_size).to(device)
decoder = DecoderRNN(hidden_size, target_lang.n_words).to(device)
# Train the model
train(train_loader, encoder, decoder, epochs=100, learning_rate=0.01, info_every=2, plot_every=10)
0%| | 0/1001 [00:00<?, ?it/s]
Epoch: 0, Loss: 0.347686683293432 Epoch: 100, Loss: 0.2592399111017585 Epoch: 200, Loss: 0.2301939323078841 Epoch: 300, Loss: 0.246875268407166 Epoch: 400, Loss: 0.260721294907853 Epoch: 500, Loss: 0.258150483481586 Epoch: 600, Loss: 0.23601281619630754 Epoch: 700, Loss: 0.24906805180944502 Epoch: 800, Loss: 0.22962150094099343 Epoch: 900, Loss: 0.22537698200903833 Epoch: 1000, Loss: 0.22563873510807753
Model evaluation
# Random evaluation
random_evaluation(encoder, decoder, n=10)
[source]> i shouted [target]= krzyknąłem [prediction]< krzyknąłem [source]> i ran [target]= pobiegłem [prediction]< pobiegłam [source]> we danced [target]= tańczyliśmy [prediction]< jestem [source]> i moved [target]= ruszyłem się [prediction]< jestem zły [source]> tom swore [target]= tom przysiągł [prediction]< tom przysiągł [source]> i failed [target]= poniosłem klęskę [prediction]< zawiodłem [source]> unlock it [target]= otwórz to [prediction]< otwórz to [source]> i inhaled [target]= wciągnąłem powietrze [prediction]< wciągnąłem powietrze [source]> who won [target]= kto wygrał [prediction]< kto wygrał [source]> tom works [target]= tom pracuje [prediction]< tom pracuje
# BLEU score
calculate_bleu_score(encoder, decoder, pairs, source_lang, target_lang)
0.5394429729650964