RNN/RNN_NER.ipynb
Mateusz Grzegorzewski 37d6713b67 RNN project
2024-05-27 18:44:12 +02:00

9.3 KiB

import pandas as pd
import torch
from torchtext.vocab import vocab
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm
from collections import Counter
def load_datasets():
    train_data = pd.read_csv(
        "train/train.tsv.xz", compression="xz", sep="\t", names=["Tag", "Sentence"]
    )
    dev_data = pd.read_csv("dev-0/in.tsv", sep="\t", names=["Sentence"])
    dev_labels = pd.read_csv("dev-0/expected.tsv", sep="\t", names=["Tag"])
    test_data = pd.read_csv("test-A/in.tsv", sep="\t", names=["Sentence"])

    return train_data, dev_data, dev_labels, test_data

train_data, dev_data, dev_labels, test_data = load_datasets()

train_sentences, val_sentences, train_tags, val_tags = train_test_split(
    train_data["Sentence"], train_data["Tag"], test_size=0.1, random_state=42
)

train_data = pd.DataFrame({"Sentence": train_sentences, "Tag": train_tags})
val_data = pd.DataFrame({"Sentence": val_sentences, "Tag": val_tags})

def tokenize_column(dataframe, column):
    return dataframe[column].apply(lambda x: x.split())

train_data["tokens"] = tokenize_column(train_data, "Sentence")
train_data["tag_tokens"] = tokenize_column(train_data, "Tag")
val_data["tokens"] = tokenize_column(val_data, "Sentence")
val_data["tag_tokens"] = tokenize_column(val_data, "Tag")
dev_data["tokens"] = tokenize_column(dev_data, "Sentence")
dev_labels["tag_tokens"] = tokenize_column(dev_labels, "Tag")
test_data["tokens"] = tokenize_column(test_data, "Sentence")
def create_vocab(token_list):
    token_counter = Counter()
    for tokens in token_list:
        token_counter.update(tokens)
    return vocab(token_counter, specials=["<unk>", "<pad>", "<bos>", "<eos>"])

vocab_obj = create_vocab(train_data["tokens"])

vocab_obj.set_default_index(vocab_obj["<unk>"])

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def convert_to_tensor(token_lists, vocab_obj, device):
    return [
        torch.tensor(
            [vocab_obj["<bos>"]] + [vocab_obj[token] for token in tokens] + [vocab_obj["<eos>"]],
            dtype=torch.long,
            device=device,
        )
        for tokens in token_lists
    ]

train_tensor = convert_to_tensor(train_data["tokens"], vocab_obj, device)
val_tensor = convert_to_tensor(val_data["tokens"], vocab_obj, device)
dev_tensor = convert_to_tensor(dev_data["tokens"], vocab_obj, device)
test_tensor = convert_to_tensor(test_data["tokens"], vocab_obj, device)
tag_list = ["O", "B-PER", "I-PER", "B-ORG", "I-ORG", "B-LOC", "I-LOC", "B-MISC", "I-MISC"]

tag_to_index = {tag: idx for idx, tag in enumerate(tag_list)}

def convert_tags_to_tensor(tag_tokens, tag_to_index, device):
    return [
        torch.tensor(
            [0] + [tag_to_index[tag] for tag in tags] + [0],
            dtype=torch.long,
            device=device,
        )
        for tags in tag_tokens
    ]

train_tag_tensor = convert_tags_to_tensor(train_data["tag_tokens"], tag_to_index, device)
val_tag_tensor = convert_tags_to_tensor(val_data["tag_tokens"], tag_to_index, device)
dev_tag_tensor = convert_tags_to_tensor(dev_labels["tag_tokens"], tag_to_index, device)
def calculate_metrics(true_labels, predicted_labels):
    true_positives = 0
    total_selected = 0
    total_relevant = 0

    for pred, true in zip(predicted_labels, true_labels):
        if pred == true:
            true_positives += 1
        if pred > 0:
            total_selected += 1
        if true > 0:
            total_relevant += 1

    precision = true_positives / total_selected if total_selected > 0 else 1.0
    recall = true_positives / total_relevant if total_relevant > 0 else 1.0
    f1_score = (2 * precision * recall / (precision + recall)) if (precision + recall) > 0 else 0.0

    return precision, recall, f1_score

max_tag_index = max(tag_to_index.values()) + 1
class BiLSTMModel(torch.nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers, output_size):
        super(BiLSTMModel, self).__init__()
        self.embedding = torch.nn.Embedding(vocab_size, embed_size)
        self.lstm = torch.nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.fc = torch.nn.Linear(hidden_size * 2, output_size)

    def forward(self, x):
        embedded = torch.relu(self.embedding(x))
        lstm_out, _ = self.lstm(embedded)
        logits = self.fc(lstm_out)
        return logits

model = BiLSTMModel(len(vocab_obj.get_itos()), 100, 100, 1, max_tag_index).to(device)
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

def evaluate_model(tokens, labels, model):
    true_labels = []
    predicted_labels = []
    for i in tqdm(range(len(labels))):
        inputs = tokens[i].unsqueeze(0)
        true = list(labels[i].cpu().numpy())
        true_labels += true

        with torch.no_grad():
            logits = model(inputs).squeeze(0)
            predicted = torch.argmax(logits, dim=1)
            predicted_labels += list(predicted.cpu().numpy())

    return calculate_metrics(true_labels, predicted_labels)

def predict_labels(tokens, model, tag_to_index):
    predictions = []
    index_to_tag = {v: k for k, v in tag_to_index.items()}

    for i in tqdm(range(len(tokens))):
        inputs = tokens[i].unsqueeze(0)
        with torch.no_grad():
            logits = model(inputs).squeeze(0)
            predicted = torch.argmax(logits, dim=1)
            tags = [index_to_tag[label.item()] for label in predicted[1:-1]]
            predictions.append(" ".join(tags))

    return predictions
EPOCHS = 10

for epoch in range(EPOCHS):
    model.train()
    for i in tqdm(range(len(train_tag_tensor))):
        inputs = train_tensor[i].unsqueeze(0)
        targets = train_tag_tensor[i].unsqueeze(1)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_fn(outputs.squeeze(0), targets.squeeze(1))
        loss.backward()
        optimizer.step()

    model.eval()
    print(evaluate_model(val_tensor, val_tag_tensor, model))
evaluate_model(val_tensor, val_tag_tensor, model)
evaluate_model(dev_tensor, dev_tag_tensor, model)

dev_predictions = predict_labels(dev_tensor, model, tag_to_index)
dev_predictions_df = pd.DataFrame(dev_predictions, columns=["Tag"])
dev_predictions_df.to_csv("dev-0/out.tsv", index=False, header=False)

test_predictions = predict_labels(test_tensor, model, tag_to_index)
test_predictions_df = pd.DataFrame(test_predictions, columns=["Tag"])
test_predictions_df.to_csv("test-A/out.tsv", index=False, header=False)