seq2seq/seq2seq.ipynb
2024-05-23 19:23:32 +02:00

32 KiB
Raw Permalink Blame History

Seq2Seq translation

import pandas as pd
import numpy as np

# Pytorch
import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import TensorDataset, DataLoader, RandomSampler
import torch.nn.functional as F

import torchtext
torchtext.disable_torchtext_deprecation_warning()

from torchtext.data.metrics import bleu_score

from unidecode import unidecode

import regex as re
from string import punctuation
import random

from tqdm.notebook import tqdm
# Use GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

Load data corpus

# Load the data
df = pd.read_csv('pol-eng/pol.txt', sep='\t', header=None, usecols=[0, 1], names=['source', 'target'])
df_1000 = df.head(1000)
df = df_1000
df
source target
0 Go. Idź.
1 Hi. Cześć.
2 Run! Uciekaj!
3 Run. Biegnij.
4 Run. Uciekaj.
... ... ...
995 We walked. Poszliśmy pieszo.
996 We yawned. Ziewaliśmy.
997 We'll see. Zobaczymy.
998 We're men. Jesteśmy mężczyznami.
999 We're sad. Jesteśmy smutni.

1000 rows × 2 columns

Initial data preprocessing

# Helper class to create language vocabularies
# <bos> - beginning of sentence token - 0
# <eos> - end of sentence token - 1
bos_token = 0
eos_token = 1

class Lang:
    def __init__(self, name):
        self.name = name
        self.word2index = {'<bos>': 0, '<eos>': 1}
        self.word2count = {}
        self.index2word = {0: '<bos>', 1: '<eos>'}
        self.n_words = 2  # Count <bos> and <eos>
        
    def add_sentence(self, sentence):
        for word in sentence.split(' '):
            self.add_word(word)
            
    def add_word(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1
# Helper methods to preprocess data
def normalizeText(text, ascii: bool = False):
    # Convert to ASCII
    if ascii:
        text = unidecode(text)
        
    # Lowercase and trim whitespace
    text = text.lower().strip()
    
    # Remove non-letter characters
    text = re.sub(r"[" + punctuation + "]", "", text)
    
    return text
# Method for data preparation (vocabularies, pairs of sentences)
def prepareData(df, source_lang, target_lang, ascii: bool = False):
    # Normalize source and target sentences
    df['source'] = df['source'].apply(lambda x: normalizeText(x, ascii=ascii))
    df['target'] = df['target'].apply(lambda x: normalizeText(x, ascii=ascii))
    
    # Get pairs of sentences
    pairs = list(zip(df['source'], df['target']))
    
    # Create language vocabularies
    source_lang = Lang(source_lang)
    target_lang = Lang(target_lang)
    
    for source_sentence, target_sentence in pairs:
        source_lang.add_sentence(source_sentence)
        target_lang.add_sentence(target_sentence)
    
    return source_lang, target_lang, pairs

Prepare data for training

# Convert sentence to list of indexes (with <bos> and <eos> tokens)
def indexesFromSentence(lang, sentence):
    return [bos_token] + [lang.word2index[word] for word in sentence.split(' ')] + [eos_token]

# Convert sentence to tensor of indexes (with <bos> and <eos> tokens)
def tensorFromSentence(lang, sentence):
    return torch.tensor(indexesFromSentence(lang, sentence), dtype=torch.long, device=device).view(-1, 1)

# Data loader
def prepareDataLoader(df, source_lang, target_lang, batch_size: int = 32, ascii: bool = False):
    # Prepare data (vocabularies, pairs of sentences)
    source_lang, target_lang, pairs = prepareData(df, source_lang, target_lang, ascii=ascii)
    
    # Get maximum length of sentence
    MAX_LENGTH = max(np.max([len(sentence.split(' ')) for sentence in df['source']]), np.max([len(sentence.split(' ')) for sentence in df['target']])) + 2
    
    # Get number of pairs
    n_pairs = len(pairs)
    
    # Initialize tensors (source and target)
    source_indexes = np.zeros((n_pairs, MAX_LENGTH), dtype=np.int32)
    target_indexes = np.zeros((n_pairs, MAX_LENGTH), dtype=np.int32)
    
    # Fill tensors
    for idx, (source_sentence, target_sentence) in enumerate(pairs):
        source_idx = indexesFromSentence(source_lang, source_sentence)
        target_idx = indexesFromSentence(target_lang, target_sentence)
        
        source_indexes[idx, :len(source_idx)] = source_idx
        target_indexes[idx, :len(target_idx)] = target_idx
    
    # Tensor dataset
    train_data = TensorDataset(torch.LongTensor(source_indexes).to(device),
                               torch.LongTensor(target_indexes).to(device))
    
    # Sampler
    train_sampler = RandomSampler(train_data)
    
    # Data loader
    train_loader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)
    
    return source_lang, target_lang, pairs, train_loader, MAX_LENGTH

Seq2Seq model - Encoder and Decoder

# Encoder
class EncoderRNN(nn.Module):
    def __init__(self, input_size: int, hidden_size: int = 100, dropout: float = 0.1):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size
        
        # Embedding layer
        self.embedding = nn.Embedding(input_size, hidden_size)
        
        # GRU layer
        self.gru = nn.GRU(hidden_size, hidden_size, batch_first=True)
        
        # Dropout layer for regularization
        self.dropout = nn.Dropout(p=dropout)
        
    def forward(self, input):
        # Transform input (as tensor of word indexes) to embeddings vectors
        embedded = self.embedding(input)
        
        # Apply dropout to embeddings
        embedded = self.dropout(embedded)
        
        # Pass embeddings through GRU and get output and hidden state
        output, hidden = self.gru(embedded)
        
        return output, hidden
# What is Tearcher Forcing - https://saturncloud.io/glossary/teacher-forcing/#:~:text=What%20is%20Teacher%20Forcing%3F,%2C%20translation%2C%20and%20text%20generation.

class BahdanauAttention(nn.Module):
    def __init__(self, hidden_size):
        super(BahdanauAttention, self).__init__()
        self.hidden_size = hidden_size
        
        # Linear layer to transform encoder output to attention weights
        self.Wa = nn.Linear(hidden_size, hidden_size)
        self.Ua = nn.Linear(hidden_size, hidden_size)
        self.Va = nn.Linear(hidden_size, 1)
        
    def forward(self, query, keys):
        """
        :param query: hidden state from decoder 
        :param keys: output from encoder
        :return: context vector and attention weights
        """
        scores = self.Va(torch.tanh(self.Wa(query) + self.Ua(keys)))
        scores = scores.squeeze(2).unsqueeze(1)
        
        # Apply softmax to get attention weights
        weights = F.softmax(scores, dim=-1)
        
        # Calculate context vector
        context = torch.bmm(weights, keys)
        
        return context, weights
    
# Decoder
class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, dropout_p: float = 0.1):
        super(DecoderRNN, self).__init__()
        
        # Embedding layer
        self.embedding = nn.Embedding(output_size, hidden_size)
        
        # Attention layer
        self.attention = BahdanauAttention(hidden_size)
        
        # GRU layer - input is concatenation of embeddings and context vector
        self.gru = nn.GRU(2 * hidden_size, hidden_size, batch_first=True)
        
        # Linear layer to get output
        self.out = nn.Linear(hidden_size, output_size)
        
        # Dropout layer for regularization
        self.dropout = nn.Dropout(p=dropout_p)
        
    def forward(self, encoder_outputs, encoder_hidden, target_tensor=None):
        """
        :param encoder_outputs: output from encoder
        :param encoder_hidden: last hidden states from encoder, used as initial hidden states for decoder
        :param target_tensor: target tensor - used in training with teacher forcing
        :return: 
        """
        # Batch size
        batch_size = encoder_outputs.size(0)
        
        # Decoder input - initialize with <bos> token index
        decoder_input = torch.empty(batch_size, 1, dtype=torch.long, device=device).fill_(bos_token)
        
        # Decoder hidden state - initialize with encoder hidden state
        decoder_hidden = encoder_hidden
        
        # List to store decoder outputs
        decoder_outputs = []
        
        # List to store attention weights
        attention_weights = []
        
        # Determine the maximum length of the sequence to generate
        max_length = target_tensor.size(1) if target_tensor is not None else MAX_LENGTH
        
        # Decoder loop
        for i in range(max_length):
            # Forward step
            decoder_output, decoder_hidden, attn_weights = self.forward_step(decoder_input, decoder_hidden, encoder_outputs)
            
            # Save output and attention weights
            decoder_outputs.append(decoder_output)
            attention_weights.append(attn_weights)
            
            # If target tensor is provided, use it for next input
            if target_tensor is not None:
                # Teacher forcing: next input is current target
                decoder_input = target_tensor[:, i].unsqueeze(1)
            else:
                # Otherwise use output from current step (own prediction)
                _, topi = decoder_output.topk(1)
                decoder_input = topi.squeeze(-1).detach()
                
                # Break if decoder input is <eos> token
                if torch.any(decoder_input == eos_token):
                    break
            
        # Concatenate outputs
        decoder_outputs = torch.cat(decoder_outputs, dim=1)
        
        # Apply log softmax to get probabilities
        decoder_outputs = F.log_softmax(decoder_outputs, dim=-1)
        
        # Concatenate attention weights
        attention_weights = torch.cat(attention_weights, dim=1)
        
        return decoder_outputs, decoder_hidden, attention_weights
        
        
    def forward_step(self, decoder_input, decoder_hidden, encoder_outputs):
        """
        Forward step of decoder
        :param decoder_input: current input tensor for decoder
        :param decoder_hidden: current hidden state of decoder
        :param encoder_outputs: output from encoder
        :return: output and hidden state
        """
        # Transform input (as tensor of word indexes) to embeddings vectors
        embedded = self.embedding(decoder_input)
        
        # Apply dropout to embeddings
        embedded = self.dropout(embedded)
        
        # Query
        query = decoder_hidden.permute(1, 0, 2)
        
        # Context vector and attention weights
        context, attn_weights = self.attention(query, encoder_outputs)
        
        # Concatenate embeddings and context vector
        input_gru = torch.cat((embedded, context), dim=2)
        
        # GRU
        output, hidden = self.gru(input_gru, decoder_hidden)
        
        # Pass output through linear layer to get final output
        output = self.out(output)
        
        return output, hidden, attn_weights

Model training

# Model training
def train(dataloader, encoder, decoder, epochs: int = 100, learning_rate: float = 0.01, info_every: int = 10, plot_every: int = 10):
    """
    :param dataloader: DataLoader with training data
    :param encoder: Encoder model
    :param decoder: Decoder model
    :param epochs: Number of epochs
    :param learning_rate: Learning rate
    :param info_every: Specify how often to print information about training (default: every 10 epochs)
    :param plot_every: Specify how often to plot loss (default: every 10 epochs)
    :return: None
    """
    # Set models to training mode
    encoder.train()
    decoder.train()
    
    # Initialize optimizer
    encoder_optimizer = optim.Adam(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.Adam(decoder.parameters(), lr=learning_rate)
    
    # Initialize loss function
    criterion = nn.NLLLoss()
    
    # Initialize loss lists
    plot_losses = []
    
    # Training loop
    for epoch in tqdm(range(epochs + 1)):
        total_loss = 0
        
        for data in dataloader:
            source_tensor, target_tensor = data
            
            # Zero gradients
            encoder_optimizer.zero_grad()
            decoder_optimizer.zero_grad()
            
            # Forward pass
            encoder_outputs, encoder_hidden = encoder(source_tensor)
            decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden, target_tensor)
            
            # Calculate loss
            loss = criterion(
                decoder_outputs.view(-1, decoder_outputs.size(-1)),
                target_tensor.view(-1)
            )
            
            # Backward pass
            loss.backward()
            
            # Update weights
            encoder_optimizer.step()
            decoder_optimizer.step()
            
            total_loss += loss.item()
        
        plot_losses.append(total_loss / len(dataloader))
        
        if epoch % info_every == 0:
            print(f'Epoch: {epoch}, Loss: {total_loss / len(dataloader)}')
# Model predictions
def predict(encoder, decoder, sentence, source_lang, target_lang):
    """
    :param encoder: Encoder model
    :param decoder: Decoder model
    :param sentence: Sentence to translate
    :param source_lang: Source language vocabulary
    :param target_lang: Target language vocabulary
    :return: predicted sentence
    """
    # Set models to evaluation mode
    encoder.eval()
    decoder.eval()
    
    with torch.no_grad():
        # Prepare input tensor
        input_tensor = tensorFromSentence(source_lang, sentence).view(1, -1)
        
        # Forward pass
        encoder_outputs, encoder_hidden = encoder(input_tensor)
        decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden)
        
        # Get indexes of the most probable words
        _, topi = decoder_outputs.topk(1)
        decoded_ids = topi.squeeze()
        
        # Check if tensor if zero-dimensional
        if decoded_ids.dim() == 0:
            decoded_ids = decoded_ids.view(1)
        
        # Convert indexes to words
        decoded_words = []
        
        for idx in decoded_ids:
            if idx.item() == eos_token:
                decoded_words.append('<eos>')
                break
            decoded_words.append(target_lang.index2word[idx.item()])
            
        return decoded_words
# Random evaluation
def random_evaluation(encoder, decoder, n: int = 10):
    """
    :param encoder: Encoder model
    :param decoder: Decoder model
    :param n: Number of sentences to evaluate
    :return: None
    """
    # Set models to evaluation mode
    encoder.eval()
    decoder.eval()
    
    # Get random pairs and make predictions
    for i in range(n):
        pair = random.choice(pairs)
        print('[source]>', pair[0])
        print('[target]=', pair[1])
        output_words = predict(encoder, decoder, pair[0], source_lang, target_lang)
        output_words = list(filter(lambda x: x not in ['<bos>', '<eos>'], output_words))
        output_sentence = ' '.join(output_words)
        print('[prediction]<', output_sentence)
        print('')
# BLEU score
def calculate_bleu_score(encoder, decoder, pairs, source_lang, target_lang):
    """
    :param encoder: Encoder model
    :param decoder: Decoder model
    :param pairs: List of pairs of sentences
    :param source_lang: Source language vocabulary
    :param target_lang: Target language vocabulary
    :return: BLEU score
    """
    # Initialize lists for references and candidates
    references = []
    candidates = []
    
    # Loop through pairs
    for idx, (source_sentence, target_sentence) in enumerate(pairs):
        # Get predicted sentence
        predicted_sentence = predict(encoder, decoder, source_sentence, source_lang, target_lang)
        
        # Remove <bos> and <eos> tokens
        predicted_sentence = list(filter(lambda x: x not in ['<bos>', '<eos>'], predicted_sentence))
        
        # Add reference and candidate
        references.append([target_sentence.split(' ')])
        candidates.append(predicted_sentence)
        
    return bleu_score(candidates, references)
# Model parameters
hidden_size = 128
batch_size = 32

# Prepare data
source_lang, target_lang, pairs, train_loader, MAX_LENGTH = prepareDataLoader(df, 'angielski', 'polski', batch_size=batch_size)

# Initialize encoder and decoder
encoder = EncoderRNN(source_lang.n_words, hidden_size).to(device)
decoder = DecoderRNN(hidden_size, target_lang.n_words).to(device)
# Train the model
train(train_loader, encoder, decoder, epochs=100, learning_rate=0.01, info_every=2, plot_every=10)
  0%|          | 0/1001 [00:00<?, ?it/s]
Epoch: 0, Loss: 0.347686683293432
Epoch: 100, Loss: 0.2592399111017585
Epoch: 200, Loss: 0.2301939323078841
Epoch: 300, Loss: 0.246875268407166
Epoch: 400, Loss: 0.260721294907853
Epoch: 500, Loss: 0.258150483481586
Epoch: 600, Loss: 0.23601281619630754
Epoch: 700, Loss: 0.24906805180944502
Epoch: 800, Loss: 0.22962150094099343
Epoch: 900, Loss: 0.22537698200903833
Epoch: 1000, Loss: 0.22563873510807753

Model evaluation

# Random evaluation
random_evaluation(encoder, decoder, n=10)
[source]> i shouted
[target]= krzyknąłem
[prediction]< krzyknąłem

[source]> i ran
[target]= pobiegłem
[prediction]< pobiegłam

[source]> we danced
[target]= tańczyliśmy
[prediction]< jestem

[source]> i moved
[target]= ruszyłem się
[prediction]< jestem zły

[source]> tom swore
[target]= tom przysiągł
[prediction]< tom przysiągł

[source]> i failed
[target]= poniosłem klęskę
[prediction]< zawiodłem

[source]> unlock it
[target]= otwórz to
[prediction]< otwórz to

[source]> i inhaled
[target]= wciągnąłem powietrze
[prediction]< wciągnąłem powietrze

[source]> who won
[target]= kto wygrał
[prediction]< kto wygrał

[source]> tom works
[target]= tom pracuje
[prediction]< tom pracuje
# BLEU score
calculate_bleu_score(encoder, decoder, pairs, source_lang, target_lang)
0.5394429729650964