9.3 KiB
9.3 KiB
import pandas as pd
import torch
from torchtext.vocab import vocab
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm
from collections import Counter
def load_datasets():
train_data = pd.read_csv(
"train/train.tsv.xz", compression="xz", sep="\t", names=["Tag", "Sentence"]
)
dev_data = pd.read_csv("dev-0/in.tsv", sep="\t", names=["Sentence"])
dev_labels = pd.read_csv("dev-0/expected.tsv", sep="\t", names=["Tag"])
test_data = pd.read_csv("test-A/in.tsv", sep="\t", names=["Sentence"])
return train_data, dev_data, dev_labels, test_data
train_data, dev_data, dev_labels, test_data = load_datasets()
train_sentences, val_sentences, train_tags, val_tags = train_test_split(
train_data["Sentence"], train_data["Tag"], test_size=0.1, random_state=42
)
train_data = pd.DataFrame({"Sentence": train_sentences, "Tag": train_tags})
val_data = pd.DataFrame({"Sentence": val_sentences, "Tag": val_tags})
def tokenize_column(dataframe, column):
return dataframe[column].apply(lambda x: x.split())
train_data["tokens"] = tokenize_column(train_data, "Sentence")
train_data["tag_tokens"] = tokenize_column(train_data, "Tag")
val_data["tokens"] = tokenize_column(val_data, "Sentence")
val_data["tag_tokens"] = tokenize_column(val_data, "Tag")
dev_data["tokens"] = tokenize_column(dev_data, "Sentence")
dev_labels["tag_tokens"] = tokenize_column(dev_labels, "Tag")
test_data["tokens"] = tokenize_column(test_data, "Sentence")
def create_vocab(token_list):
token_counter = Counter()
for tokens in token_list:
token_counter.update(tokens)
return vocab(token_counter, specials=["<unk>", "<pad>", "<bos>", "<eos>"])
vocab_obj = create_vocab(train_data["tokens"])
vocab_obj.set_default_index(vocab_obj["<unk>"])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def convert_to_tensor(token_lists, vocab_obj, device):
return [
torch.tensor(
[vocab_obj["<bos>"]] + [vocab_obj[token] for token in tokens] + [vocab_obj["<eos>"]],
dtype=torch.long,
device=device,
)
for tokens in token_lists
]
train_tensor = convert_to_tensor(train_data["tokens"], vocab_obj, device)
val_tensor = convert_to_tensor(val_data["tokens"], vocab_obj, device)
dev_tensor = convert_to_tensor(dev_data["tokens"], vocab_obj, device)
test_tensor = convert_to_tensor(test_data["tokens"], vocab_obj, device)
tag_list = ["O", "B-PER", "I-PER", "B-ORG", "I-ORG", "B-LOC", "I-LOC", "B-MISC", "I-MISC"]
tag_to_index = {tag: idx for idx, tag in enumerate(tag_list)}
def convert_tags_to_tensor(tag_tokens, tag_to_index, device):
return [
torch.tensor(
[0] + [tag_to_index[tag] for tag in tags] + [0],
dtype=torch.long,
device=device,
)
for tags in tag_tokens
]
train_tag_tensor = convert_tags_to_tensor(train_data["tag_tokens"], tag_to_index, device)
val_tag_tensor = convert_tags_to_tensor(val_data["tag_tokens"], tag_to_index, device)
dev_tag_tensor = convert_tags_to_tensor(dev_labels["tag_tokens"], tag_to_index, device)
def calculate_metrics(true_labels, predicted_labels):
true_positives = 0
total_selected = 0
total_relevant = 0
for pred, true in zip(predicted_labels, true_labels):
if pred == true:
true_positives += 1
if pred > 0:
total_selected += 1
if true > 0:
total_relevant += 1
precision = true_positives / total_selected if total_selected > 0 else 1.0
recall = true_positives / total_relevant if total_relevant > 0 else 1.0
f1_score = (2 * precision * recall / (precision + recall)) if (precision + recall) > 0 else 0.0
return precision, recall, f1_score
max_tag_index = max(tag_to_index.values()) + 1
class BiLSTMModel(torch.nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_layers, output_size):
super(BiLSTMModel, self).__init__()
self.embedding = torch.nn.Embedding(vocab_size, embed_size)
self.lstm = torch.nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
self.fc = torch.nn.Linear(hidden_size * 2, output_size)
def forward(self, x):
embedded = torch.relu(self.embedding(x))
lstm_out, _ = self.lstm(embedded)
logits = self.fc(lstm_out)
return logits
model = BiLSTMModel(len(vocab_obj.get_itos()), 100, 100, 1, max_tag_index).to(device)
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
def evaluate_model(tokens, labels, model):
true_labels = []
predicted_labels = []
for i in tqdm(range(len(labels))):
inputs = tokens[i].unsqueeze(0)
true = list(labels[i].cpu().numpy())
true_labels += true
with torch.no_grad():
logits = model(inputs).squeeze(0)
predicted = torch.argmax(logits, dim=1)
predicted_labels += list(predicted.cpu().numpy())
return calculate_metrics(true_labels, predicted_labels)
def predict_labels(tokens, model, tag_to_index):
predictions = []
index_to_tag = {v: k for k, v in tag_to_index.items()}
for i in tqdm(range(len(tokens))):
inputs = tokens[i].unsqueeze(0)
with torch.no_grad():
logits = model(inputs).squeeze(0)
predicted = torch.argmax(logits, dim=1)
tags = [index_to_tag[label.item()] for label in predicted[1:-1]]
predictions.append(" ".join(tags))
return predictions
EPOCHS = 10
for epoch in range(EPOCHS):
model.train()
for i in tqdm(range(len(train_tag_tensor))):
inputs = train_tensor[i].unsqueeze(0)
targets = train_tag_tensor[i].unsqueeze(1)
optimizer.zero_grad()
outputs = model(inputs)
loss = loss_fn(outputs.squeeze(0), targets.squeeze(1))
loss.backward()
optimizer.step()
model.eval()
print(evaluate_model(val_tensor, val_tag_tensor, model))
evaluate_model(val_tensor, val_tag_tensor, model)
evaluate_model(dev_tensor, dev_tag_tensor, model)
dev_predictions = predict_labels(dev_tensor, model, tag_to_index)
dev_predictions_df = pd.DataFrame(dev_predictions, columns=["Tag"])
dev_predictions_df.to_csv("dev-0/out.tsv", index=False, header=False)
test_predictions = predict_labels(test_tensor, model, tag_to_index)
test_predictions_df = pd.DataFrame(test_predictions, columns=["Tag"])
test_predictions_df.to_csv("test-A/out.tsv", index=False, header=False)