import pandas as pd import torch import torch.nn as nn import torch.optim as optim from torch.nn.utils.rnn import pad_sequence from collections import Counter #Wczytywanie danych z plików TSV def load_data(file_path): data = [] with open(file_path, 'r', encoding='utf-8') as f: for line in f: if line.strip(): data.append(line.strip().split()) else: data.append([""]) return data # Wczytaj dane train_sentences = load_data('en-ner-conll-2003/dev-0/in.tsv') train_labels = load_data('en-ner-conll-2003/dev-0/expected.tsv') test_sentences = load_data('en-ner-conll-2003/test-A/in.tsv') #słownik i mapowanie def build_vocab(sentences): counter = Counter(token for sentence in sentences for token in sentence if token != "") vocab = {token: idx + 2 for idx, (token, _) in enumerate(counter.most_common())} vocab[''] = 0 vocab[''] = 1 return vocab def build_label_mapping(labels): unique_labels = set(label for sentence in labels for label in sentence if label != "") label_map = {label: idx+1 for idx, label in enumerate(unique_labels)} label_map[''] = 0 return label_map vocab = build_vocab(train_sentences) label_map = build_label_mapping(train_labels) num_tags = len(label_map) #Przekształcanie danych def convert_to_tensor(sentences, vocab): return [torch.tensor([vocab.get(token, vocab['']) for token in sentence], dtype=torch.long) for sentence in sentences] def convert_labels_to_tensor(labels, label_map): return [torch.tensor([label_map[label] for label in sentence], dtype=torch.long) for sentence in labels] train_tokens_ids = convert_to_tensor(train_sentences, vocab) train_labels_ids = convert_labels_to_tensor(train_labels, label_map) test_tokens_ids = convert_to_tensor(test_sentences, vocab) #Padding def pad_data(sequences): return pad_sequence(sequences, batch_first=True, padding_value=0) padded_train_tokens_ids = pad_data(train_tokens_ids) padded_train_labels_ids = pad_data(train_labels_ids) padded_test_tokens_ids = pad_data(test_tokens_ids) #Definiowanie modelu LSTM class LSTMNER(nn.Module): def __init__(self, vocab_size, num_tags): super(LSTMNER, self).__init__() self.embedding = nn.Embedding(vocab_size, 100) self.lstm = nn.LSTM(100, 256, batch_first=True) self.fc = nn.Linear(256, num_tags) def forward(self, x): emb = self.embedding(x) lstm_out, _ = self.lstm(emb) out = self.fc(lstm_out) return out #Parametry model = LSTMNER(len(vocab), num_tags) criterion = nn.CrossEntropyLoss(ignore_index=0) optimizer = optim.Adam(model.parameters()) #Trenowanie num_epochs = 25 #NALEŻY ZMNIEJSZYĆ PRZED URUCHOMIENIEM for epoch in range(num_epochs): model.train() epoch_loss = 0 for x, y in zip(padded_train_tokens_ids, padded_train_labels_ids): x = x.unsqueeze(0) y = y.unsqueeze(0) optimizer.zero_grad() outputs = model(x) loss = criterion(outputs.view(-1, num_tags), y.view(-1)) loss.backward() optimizer.step() epoch_loss += loss.item() print(f"Epoch {epoch + 1}, Loss: {epoch_loss / len(train_tokens_ids)}") #Predykcja i zapis wyników do pliku def predict(model, data, label_map): model.eval() reverse_label_map = {v: k for k, v in label_map.items()} predictions = [] with torch.no_grad(): for sentence in data: output = model(sentence.unsqueeze(0)) predicted_tags = torch.argmax(output, dim=-1).squeeze().tolist() predictions.append([reverse_label_map[tag] for tag in predicted_tags]) return predictions def save_predictions(predictions, file_path): with open(file_path, 'w', encoding='utf-8') as file: for sentence_preds in predictions: sentence_output = ' '.join([tag for tag in sentence_preds if tag != '']) file.write(f"{sentence_output}\n") def correct_labels(predictions): corrected = [] for sentence in predictions: fixed_sentence = [] for i, tag in enumerate(sentence): if tag.startswith('I-') and (i == 0 or not sentence[i-1].endswith(tag[2:])): fixed_sentence.append('B-' + tag[2:]) else: fixed_sentence.append(tag) corrected.append(fixed_sentence) return corrected dev_predictions = predict(model, padded_train_tokens_ids, label_map) test_predictions = predict(model, padded_test_tokens_ids, label_map) corrected_dev_predictions = correct_labels(dev_predictions) corrected_test_predictions = correct_labels(test_predictions) save_predictions(corrected_dev_predictions, 'en-ner-conll-2003/dev-0/out.tsv') save_predictions(corrected_test_predictions, 'en-ner-conll-2003/test-A/out.tsv') print("Predykcje zapisane.")