import pandas as pd import numpy as np import csv import os.path import shutil import torch from tqdm import tqdm from itertools import islice from sklearn.model_selection import train_test_split from torchtext.vocab import Vocab from collections import Counter from nltk.tokenize import word_tokenize import gensim.downloader as api from gensim.models.word2vec import Word2Vec class NERModel(torch.nn.Module): def __init__(self,): super(NERModel, self).__init__() self.emb = torch.nn.Embedding(23628,200) self.fc1 = torch.nn.Linear(600,9) def forward(self, x): x = self.emb(x) x = x.reshape(600) x = self.fc1(x) return x def process_output(lines): result = [] for line in lines: last_label = None new_line = [] for label in line: if(label != "O" and label[0:2] == "I-"): if last_label == None or last_label == "O": label = label.replace('I-', 'B-') else: label = "I-" + last_label[2:] last_label = label new_line.append(label) x = (" ".join(new_line)) result.append(" ".join(new_line)) return result def build_vocab(dataset): counter = Counter() for document in dataset: counter.update(document) return Vocab(counter, specials=['', '', '', '']) def data_process(dt): return [ torch.tensor([vocab['']] +[vocab[token] for token in document ] + [vocab['']], dtype = torch.long) for document in dt] def labels_process(dt): return [ torch.tensor([0] + document + [0], dtype = torch.long) for document in dt] def predict(input_tokens, labels): results = [] for i in range(len(input_tokens)): line_results = [] for j in range(1, len(input_tokens[i]) - 1): x = input_tokens[i][j-1: j+2].to(device_gpu) predicted = ner_model(x.long()) result = torch.argmax(predicted) label = labels[result] line_results.append(label) results.append(line_results) return results train = pd.read_csv('train/train.tsv.xz', sep='\t', names=['a', 'b']) labels = ['O','B-LOC', 'I-LOC','B-MISC', 'I-MISC', 'B-ORG', 'I-ORG', 'B-PER', 'I-PER'] train["a"]=train["a"].apply(lambda x: [labels.index(y) for y in x.split()]) train["b"]=train["b"].apply(lambda x: x.split()) vocab = build_vocab(train['b']) tensors = [] for sent in train["b"]: sent_tensor = torch.tensor(()) for word in sent: temp = torch.tensor([word[0].isupper(), word[0].isdigit()]) sent_tensor = torch.cat((sent_tensor, temp)) tensors.append(sent_tensor) device_gpu = torch.device("cuda:0") ner_model = NERModel().to(device_gpu) criterion = torch.nn.CrossEntropyLoss() optimizer = torch.optim.Adam(ner_model.parameters()) train_labels = labels_process(train['a']) train_tokens_ids = data_process(train['b']) train_tensors = [torch.cat((token, tensors[i])) for i, token in enumerate(train_tokens_ids)] for epoch in range(5): acc_score = 0 prec_score = 0 selected_items = 0 recall_score = 0 relevant_items = 0 items_total = 0 ner_model.train() for i in range(len(train_labels)): for j in range(1, len(train_labels[i]) - 1): X = train_tensors[i][j - 1: j + 2].to(device_gpu) Y = train_labels[i][j: j + 1].to(device_gpu) Y_predictions = ner_model(X.long()) acc_score += int(torch.argmax(Y_predictions) == Y) if torch.argmax(Y_predictions) != 0: selected_items += 1 if torch.argmax(Y_predictions) != 0 and torch.argmax(Y_predictions) == Y.item(): prec_score += 1 if Y.item() != 0: relevant_items += 1 if Y.item() != 0 and torch.argmax(Y_predictions) == Y.item(): recall_score += 1 items_total += 1 optimizer.zero_grad() loss = criterion(Y_predictions.unsqueeze(0), Y) loss.backward() optimizer.step() precision = prec_score / selected_items recall = recall_score / relevant_items f1_score = (2 * precision * recall) / (precision + recall) print(f'epoch: {epoch}') print(f'f1: {f1_score}') print(f'acc: {acc_score / items_total}') def create_tensors_list(data): tensors = [] for sent in data["a"]: sent_tensor = torch.tensor(()) for word in sent: temp = torch.tensor([word[0].isupper(), word[0].isdigit()]) sent_tensor = torch.cat((sent_tensor, temp)) tensors.append(sent_tensor) return tensors dev = pd.read_csv('dev-0/in.tsv', sep='\t', names=['a']) dev["a"] = dev["a"].apply(lambda x: x.split()) dev_tokens_ids = data_process(dev["a"]) dev_extra_tensors = create_tensors_list(dev) dev_tensors = [torch.cat((token, dev_extra_tensors[i])) for i, token in enumerate(dev_tokens_ids)] results = predict(dev_tensors, labels) results_processed = process_output(results) with open("dev-0/out.tsv", "w") as f: for line in results_processed: f.write(line + "\n") test = pd.read_csv('test-A/in.tsv', sep='\t', names=['a']) test["a"] = test["a"].apply(lambda x: x.split()) test_tokens_ids = data_process(test["a"]) test_extra_tensors = create_tensors_list(test) test_tensors = [torch.cat((token, test_extra_tensors[i])) for i, token in enumerate(test_tokens_ids)] results = predict(test_tensors, labels) results_processed = process_output(results) with open("test-A/out.tsv", "w") as f: for line in results_processed: f.write(line + "\n") model_path = "seq_labeling.model" torch.save(ner_model.state_dict(), model_path)