from collections import Counter import pandas as pd import torch from torchtext.vocab import Vocab class NERModel(torch.nn.Module): def __init__(self, ): super(NERModel, self).__init__() self.emb = torch.nn.Embedding(23628, 200) self.fc1 = torch.nn.Linear(600, 9) def forward(self, x): x = self.emb(x) x = x.reshape(600) x = self.fc1(x) return x def data_process(dt): return [torch.tensor([vocab['']] + [vocab[token] for token in document] + [vocab['']], dtype=torch.long) for document in dt] def build_vocab(dataset): counter = Counter() for document in dataset: counter.update(document) return Vocab(counter, specials=['', '', '', '']) def labels_process(dt): return [torch.tensor([0] + document + [0], dtype=torch.long) for document in dt] def predict(input_tokens, labels): results = [] for i in range(len(input_tokens)): line_results = [] for j in range(1, len(input_tokens[i]) - 1): x = input_tokens[i][j - 1: j + 2].to(device) predicted = ner_model(x.long()) result = torch.argmax(predicted) label = labels[result] line_results.append(label) results.append(line_results) return results def features(data): featurez = [] for sentence in data["tokens"]: t_sentence = torch.tensor(()) for word in sentence: temp = torch.tensor([word[0].isupper(), len(word)]) t_sentence = torch.cat((t_sentence, temp)) featurez.append(t_sentence) return featurez def merge_features(token_ids, tensors_list): return [torch.cat((token, tensors_list[i])) for i, token in enumerate(token_ids)] def process_output(lines): result = [] for line in lines: last_label = None new_line = [] for label in line: if label != "O" and label[0:2] == "I-": if last_label is None or last_label == "O": label = label.replace('I-', 'B-') else: label = "I-" + last_label[2:] last_label = label new_line.append(label) result.append(" ".join(new_line)) return result def infer(path_in, path_out, labels): df = pd.read_csv(path_in, sep='\t', names=['tokens']) df_token_ids = data_process(df["tokens"].apply(lambda x: x.split())) df_infer = merge_features(df_token_ids, features(df)) infers = predict(df_infer, labels) infers_processed = process_output(infers) with open(path_out, "w") as file_out: for inf in infers_processed: file_out.write(inf + "\n") labels = ['O', 'B-LOC', 'I-LOC', 'B-MISC', 'I-MISC', 'B-ORG', 'I-ORG', 'B-PER', 'I-PER'] df = pd.read_csv('train/train.tsv.xz', compression='xz', sep='\t', names=['iob', 'tokens']) df["iob"] = df["iob"].apply(lambda x: [labels.index(y) for y in x.split()]) df["tokens"] = df["tokens"].apply(lambda x: x.split()) vocab = build_vocab(df['tokens']) device = torch.device("cuda:0") ner_model = NERModel().to(device) criterion = torch.nn.CrossEntropyLoss() optimizer = torch.optim.Adam(ner_model.parameters()) train_labels = labels_process(df['iob']) train_tokens_ids = data_process(df['tokens']) df_features = features(df) train_tensors = merge_features(train_tokens_ids, df_features) for epoch in range(5): acc_score = 0 prec_score = 0 selected_items = 0 recall_score = 0 relevant_items = 0 items_total = 0 ner_model.train() for i in range(len(train_labels)): for j in range(1, len(train_labels[i]) - 1): X = train_tensors[i][j - 1: j + 2].to(device) Y = train_labels[i][j: j + 1].to(device) Y_predictions = ner_model(X.long()) acc_score += int(torch.argmax(Y_predictions) == Y) if torch.argmax(Y_predictions) != 0: selected_items += 1 if torch.argmax(Y_predictions) != 0 and torch.argmax(Y_predictions) == Y.item(): prec_score += 1 if Y.item() != 0: relevant_items += 1 if Y.item() != 0 and torch.argmax(Y_predictions) == Y.item(): recall_score += 1 items_total += 1 optimizer.zero_grad() loss = criterion(Y_predictions.unsqueeze(0), Y) loss.backward() optimizer.step() precision = prec_score / selected_items recall = recall_score / relevant_items f1_score = (2 * precision * recall) / (precision + recall) print(f'epoch: {epoch}') print(f'f1: {f1_score}') print(f'acc: {acc_score / items_total}') infer('dev-0/in.tsv', 'dev-0/out.tsv', labels=labels) infer('test-A/in.tsv', 'test-A/out.tsv', labels=labels)