lstm_pos/Projekt.ipynb
2024-05-30 08:33:51 +02:00

38 KiB

POS Tagging using LSTM

import pandas as pd
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim

import warnings
warnings.filterwarnings('ignore')

import torchtext
from torchtext.vocab import vocab

from seqeval.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report

from tqdm.notebook import tqdm

import datasets

from collections import Counter
# Load the dataset
dataset = datasets.load_dataset('batterydata/pos_tagging')
# Convert the dataset to pandas DataFrame
train_dataset = dataset['train']
test_dataset = dataset['test']

train_dataset.set_format(type='pandas')
test_dataset.set_format(type='pandas')

df_train = pd.concat([train_dataset['words'], train_dataset['labels']], axis=1)
df_test = pd.concat([test_dataset['words'], test_dataset['labels']], axis=1)
len(df_test)
1451
len(df_train)
13054
# Method for building the vocabulary from DataFrame dataset
# Special tokens:
# <unk> - unknown token
# <pad> - padding token
# <bos> - beginning of sentence token
# <eos> - end of sentence token
def build_vocab(dataset):
    # Initialize the counter
    counter = Counter()
    
    # Iterate over the dataset and update the counter
    for idx, document in dataset.iterrows():
        counter.update(document['words'])
        
    # Return the vocabulary
    return vocab(counter, specials=['<unk>', '<pad>', '<bos>', '<eos>'])
# Build the vocabulary
v = build_vocab(df_train)
len(v)
24851
# Mapping from index to token
itos = v.get_itos()
# Set default index for unknown tokens
v.set_default_index(v["<unk>"])
# Get unique POS tags
pos_tags = df_train['labels'].explode().unique().tolist()
# Mapping from POS tag to index
label2idx = {label: idx for idx, label in enumerate(pos_tags)}

# Mapping from index to POS tag
idx2label = {idx: label for label, idx in label2idx.items()}
# Method for vectorizing text data using the vocabulary mapping
def text_to_vec(data):
    return [torch.tensor([v['<bos>']] + [v[token] for token in document] + [v['<eos>']], dtype=torch.long) for document in data]
# Method for vectorizing POS tags data using the POS tags mapping
def pos_tags_to_vec(data):
    return [torch.tensor([20] + [label2idx[tag] for tag in document] + [20], dtype=torch.long) for document in data]
# Vectorize the text data (input)
X_train = text_to_vec(df_train['words'])
X_test = text_to_vec(df_test['words'])
# Vectorize the POS tags data (output)
y_train = pos_tags_to_vec(df_train['labels'])
y_test = pos_tags_to_vec(df_test['labels'])

LSTM Models

# Basic LSTM model
class LSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(LSTM, self).__init__()
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        # LSTM layer
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first = True)
        
        # Fully connected layer
        self.fc = nn.Linear(hidden_dim, output_dim)
        
        self.relu = nn.ReLU()
        
    def forward(self, x):
        # Embedding
        embedding = self.relu(self.embedding(x))
        
        # LSTM
        output, (hidden, cell) = self.lstm(embedding)
        
        # Fully connected
        output = self.fc(output)
        
        return output
# LSTM model with dropout
class LSTMWithDropout(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, dropout_prob=0.5):
        super(LSTMWithDropout, self).__init__()
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        # LSTM layer
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        
        # Dropout layer
        self.dropout = nn.Dropout(dropout_prob)
        
        # Fully connected layer
        self.fc = nn.Linear(hidden_dim, output_dim)
        
        self.relu = nn.ReLU()
        
    def forward(self, x):
        # Embedding
        embedding = self.relu(self.embedding(x))
        
        # LSTM
        output, (hidden, cell) = self.lstm(embedding)
        
        # Dropout
        output = self.dropout(output)
        
        # Fully connected
        output = self.fc(output)
        
        return output
class StackedLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, num_layers=2):
        super(StackedLSTM, self).__init__()
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        # Stacked LSTM layers
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)
        
        # Fully connected layer
        self.fc = nn.Linear(hidden_dim, output_dim)
        
        self.relu = nn.ReLU()
        
    def forward(self, x):
        # Embedding
        embedding = self.relu(self.embedding(x))
        
        # LSTM
        output, (hidden, cell) = self.lstm(embedding)
        
        # Fully connected
        output = self.fc(output)
        
        return output
class BidirectionalLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(BidirectionalLSTM, self).__init__()
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        # Bidirectional LSTM layer
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True)
        
        # Fully connected layer
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        
        self.relu = nn.ReLU()
        
    def forward(self, x):
        # Embedding
        embedding = self.relu(self.embedding(x))
        
        # LSTM
        output, (hidden, cell) = self.lstm(embedding)
        
        # Concatenate the outputs from both directions
        output = self.fc(output)
        
        return output

Training and Evaluation Methods

# Segeval evaluation
def evaluate_model(model, X_test, y_test):
    """
    Method for evaluating the model
    :param model: model
    :param X: input data
    :param y: output data 
    :return: dictionary with metrics values
    """
    # Use GPU if available
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Move the model to the device
    model = model.to(device)
    
    # Move the data to the device
    X = [x.to(device) for x in X_test]
    y = [y.to(device) for y in y_test]
    
    # No gradients
    with torch.no_grad():
        # Predict the labels
        y_pred = [torch.argmax(model(x.unsqueeze(0)).squeeze(0), 1) for x in X]
    
    # Convert the labels to ner tags
    y_pred = [[idx2label[int(idx)] for idx in y] for y in y_pred]
    y_tags = [[idx2label[int(idx)] for idx in y] for y in y]
    
    # Calculate the metrics
    accuracy = accuracy_score(y_tags, y_pred)
    precision = precision_score(y_tags, y_pred)
    recall = recall_score(y_tags, y_pred)
    f1 = f1_score(y_tags, y_pred)
    
    return {'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1': f1}
import random

# Train model
def train(model, X_train, y_train, X_test, y_test, epochs = 5, seed=1234):
    """
    Method for training the model
    :param model: model
    :param X_train: input data for training
    :param y_train: output data for training
    :param X_test: input data for testing
    :param y_test: output data for testing
    :param epochs: number of epochs
    """
    # Seed for reproducibility
    torch.manual_seed(seed)
    random.seed(seed)
    np.random.seed(seed)

    # Use GPU if available
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters())
    
    # Move training to GPU
    model = model.to(device)
    X_train_device = [x.to(device) for x in X_train]
    y_train_device = [y.to(device) for y in y_train]
    X_test_device = [x.to(device) for x in X_test]
    y_test_device = [y.to(device) for y in y_test]
    
    # Training loop
    model.train()

    for epoch in range(epochs):
        for idx in tqdm(range(len(X_train_device))):
            # Zero the gradients
            optimizer.zero_grad()
            
            # Forward pass
            output = model(X_train_device[idx].unsqueeze(0))
    
            # Calculate the loss
            loss = criterion(output.squeeze(0), y_train_device[idx])
            
            # Backward pass
            loss.backward()
            
            # Update the weights
            optimizer.step()
            
        # Evaluate the model on the dev set
        metrics = evaluate_model(model, X_test_device, y_test_device)
        
        print(f'Epoch: {epoch+1}, Accuracy: {metrics["accuracy"]}, Precision: {metrics["precision"]}, Recall: {metrics["recall"]}, F1: {metrics["f1"]}')

Basic LSTM Model

# Model parameters
vocab_size = len(v)
embedding_dim = 64
hidden_dim = 128
output_dim = len(pos_tags)
epochs = 7
# Initialize the model
model = LSTM(vocab_size, embedding_dim, hidden_dim, output_dim)
# Train the model
train(model, X_train, y_train, X_test, y_test, epochs)
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 1, Accuracy: 0.8818851395991876, Precision: 0.8438832404066907, Recall: 0.8296350578924226, F1: 0.8366984952848316
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 2, Accuracy: 0.9159957912251939, Precision: 0.8855693514613502, Recall: 0.8801700131906786, F1: 0.8828614271853225
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 3, Accuracy: 0.9277656789096337, Precision: 0.9007945850500294, Recall: 0.8972299574967023, F1: 0.8990087377927893
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 4, Accuracy: 0.9327330119656446, Precision: 0.9065371180321132, Recall: 0.9052616151253114, F1: 0.9058989176028865
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 5, Accuracy: 0.9348129297477182, Precision: 0.9091175694301886, Recall: 0.9086911915579657, F1: 0.9089043304893424
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 6, Accuracy: 0.9357427753444099, Precision: 0.9102924799249751, Recall: 0.9104792613219991, F1: 0.910385861043129
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 7, Accuracy: 0.9371864829813786, Precision: 0.912434017595308, Recall: 0.9120328301333724, F1: 0.9122333797551859
# Evaluate the model
evaluate_model(model, X_test, y_test)
{'accuracy': 0.9371864829813786,
 'precision': 0.912434017595308,
 'recall': 0.9120328301333724,
 'f1': 0.9122333797551859}

LSTM Model with Dropout

# Model parameters
vocab_size = len(v)
embedding_dim = 64
hidden_dim = 128
output_dim = len(pos_tags)
epochs = 7
p = 0.2
# Initialize the model
model_dropout = LSTMWithDropout(vocab_size, embedding_dim, hidden_dim, output_dim, dropout_prob=p)
# Train the model
train(model_dropout, X_train, y_train, X_test, y_test, epochs)
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 1, Accuracy: 0.925294247192111, Precision: 0.8976234540700919, Recall: 0.8956763886853291, F1: 0.8966488643699748
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 2, Accuracy: 0.9295274916191548, Precision: 0.902512680681385, Recall: 0.9023010405979774, F1: 0.902406848230776
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 3, Accuracy: 0.9336873271833019, Precision: 0.9082087364409264, Recall: 0.9080756265572328, F1: 0.908142176621473
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 4, Accuracy: 0.9367949690459295, Precision: 0.9122472897743921, Recall: 0.9126483951341052, F1: 0.9124477983735073
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 5, Accuracy: 0.9367215601830328, Precision: 0.9114772328221936, Recall: 0.9130001465630954, F1: 0.9122380540952157
# Evaluate the model
evaluate_model(model_dropout, X_test, y_test)
{'accuracy': 0.9370396652555852,
 'precision': 0.9120747203841424,
 'recall': 0.9131173970394255,
 'f1': 0.912595760887079}

Stacked LSTM Model

# Model parameters
vocab_size = len(v)
embedding_dim = 64
hidden_dim = 128
output_dim = len(pos_tags)
epochs = 7
num_layers = 2
# Initialize the model
model_stacked = StackedLSTM(vocab_size, embedding_dim, hidden_dim, output_dim, num_layers=num_layers)
# Train the model
train(model_stacked, X_train, y_train, X_test, y_test, epochs)
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 1, Accuracy: 0.9349597474735116, Precision: 0.9083610673206048, Recall: 0.9120621427524549, F1: 0.9102078427357427
# Evaluate the model
evaluate_model(model_stacked, X_test, y_test)
{'accuracy': 0.9349597474735116,
 'precision': 0.9083610673206048,
 'recall': 0.9120621427524549,
 'f1': 0.9102078427357427}

Bidirectional LSTM Model

# Model parameters
vocab_size = len(v)
embedding_dim = 64
hidden_dim = 128
output_dim = len(pos_tags)
epochs = 5
# Initialize the model
model_bidirectional = BidirectionalLSTM(vocab_size, embedding_dim, hidden_dim, output_dim)
# Train the model
train(model_bidirectional, X_train, y_train, X_test, y_test, epochs)
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 1, Accuracy: 0.9022438642425429, Precision: 0.8723924915694291, Recall: 0.8568957936391617, F1: 0.8645747072045428
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 2, Accuracy: 0.9338586145300609, Precision: 0.9106548443161399, Recall: 0.9061703063168695, F1: 0.908407040639417
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 3, Accuracy: 0.940587760295593, Precision: 0.9189085996240601, Recall: 0.9171039132346475, F1: 0.9180053694819769
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 4, Accuracy: 0.9427900261824944, Precision: 0.9211352763347128, Recall: 0.9199472372856514, F1: 0.9205408734930924
  0%|          | 0/13054 [00:00<?, ?it/s]
Epoch: 5, Accuracy: 0.9433772970856682, Precision: 0.9217263652378156, Recall: 0.9202403634764766, F1: 0.920982764943161
# Evaluate the model
evaluate_model(model_bidirectional, X_test, y_test)
{'accuracy': 0.9433772970856682,
 'precision': 0.9217263652378156,
 'recall': 0.9202403634764766,
 'f1': 0.920982764943161}