UG_2024/zad3/zad3.py

136 lines
4.9 KiB
Python
Raw Normal View History

2024-05-26 23:42:53 +02:00
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence
from collections import Counter
#Wczytywanie danych z plików TSV
def load_data(file_path):
data = []
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
data.append(line.strip().split())
else:
data.append(["<pad>"])
return data
# Wczytaj dane
train_sentences = load_data('en-ner-conll-2003/dev-0/in.tsv')
train_labels = load_data('en-ner-conll-2003/dev-0/expected.tsv')
test_sentences = load_data('en-ner-conll-2003/test-A/in.tsv')
#słownik i mapowanie
def build_vocab(sentences):
counter = Counter(token for sentence in sentences for token in sentence if token != "<pad>")
vocab = {token: idx + 2 for idx, (token, _) in enumerate(counter.most_common())}
vocab['<pad>'] = 0
vocab['<unk>'] = 1
return vocab
def build_label_mapping(labels):
unique_labels = set(label for sentence in labels for label in sentence if label != "<pad>")
label_map = {label: idx+1 for idx, label in enumerate(unique_labels)}
label_map['<pad>'] = 0
return label_map
vocab = build_vocab(train_sentences)
label_map = build_label_mapping(train_labels)
num_tags = len(label_map)
#Przekształcanie danych
def convert_to_tensor(sentences, vocab):
return [torch.tensor([vocab.get(token, vocab['<unk>']) for token in sentence], dtype=torch.long) for sentence in sentences]
def convert_labels_to_tensor(labels, label_map):
return [torch.tensor([label_map[label] for label in sentence], dtype=torch.long) for sentence in labels]
train_tokens_ids = convert_to_tensor(train_sentences, vocab)
train_labels_ids = convert_labels_to_tensor(train_labels, label_map)
test_tokens_ids = convert_to_tensor(test_sentences, vocab)
#Padding
def pad_data(sequences):
return pad_sequence(sequences, batch_first=True, padding_value=0)
padded_train_tokens_ids = pad_data(train_tokens_ids)
padded_train_labels_ids = pad_data(train_labels_ids)
padded_test_tokens_ids = pad_data(test_tokens_ids)
#Definiowanie modelu LSTM
class LSTMNER(nn.Module):
def __init__(self, vocab_size, num_tags):
super(LSTMNER, self).__init__()
self.embedding = nn.Embedding(vocab_size, 100)
self.lstm = nn.LSTM(100, 256, batch_first=True)
self.fc = nn.Linear(256, num_tags)
def forward(self, x):
emb = self.embedding(x)
lstm_out, _ = self.lstm(emb)
out = self.fc(lstm_out)
return out
#Parametry
model = LSTMNER(len(vocab), num_tags)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters())
#Trenowanie
num_epochs = 25 #NALEŻY ZMNIEJSZYĆ PRZED URUCHOMIENIEM
for epoch in range(num_epochs):
model.train()
epoch_loss = 0
for x, y in zip(padded_train_tokens_ids, padded_train_labels_ids):
x = x.unsqueeze(0)
y = y.unsqueeze(0)
optimizer.zero_grad()
outputs = model(x)
loss = criterion(outputs.view(-1, num_tags), y.view(-1))
loss.backward()
optimizer.step()
epoch_loss += loss.item()
print(f"Epoch {epoch + 1}, Loss: {epoch_loss / len(train_tokens_ids)}")
#Predykcja i zapis wyników do pliku
def predict(model, data, label_map):
model.eval()
reverse_label_map = {v: k for k, v in label_map.items()}
predictions = []
with torch.no_grad():
for sentence in data:
output = model(sentence.unsqueeze(0))
predicted_tags = torch.argmax(output, dim=-1).squeeze().tolist()
predictions.append([reverse_label_map[tag] for tag in predicted_tags])
return predictions
def save_predictions(predictions, file_path):
with open(file_path, 'w', encoding='utf-8') as file:
for sentence_preds in predictions:
sentence_output = ' '.join([tag for tag in sentence_preds if tag != '<pad>'])
file.write(f"{sentence_output}\n")
def correct_labels(predictions):
corrected = []
for sentence in predictions:
fixed_sentence = []
for i, tag in enumerate(sentence):
if tag.startswith('I-') and (i == 0 or not sentence[i-1].endswith(tag[2:])):
fixed_sentence.append('B-' + tag[2:])
else:
fixed_sentence.append(tag)
corrected.append(fixed_sentence)
return corrected
dev_predictions = predict(model, padded_train_tokens_ids, label_map)
test_predictions = predict(model, padded_test_tokens_ids, label_map)
corrected_dev_predictions = correct_labels(dev_predictions)
corrected_test_predictions = correct_labels(test_predictions)
save_predictions(corrected_dev_predictions, 'en-ner-conll-2003/dev-0/out.tsv')
save_predictions(corrected_test_predictions, 'en-ner-conll-2003/test-A/out.tsv')
print("Predykcje zapisane.")