en-ner-conll-2003/seq.py
2021-06-20 22:24:17 +02:00

210 lines
6.1 KiB
Python

import numpy as np
import gensim
import torch
import pandas as pd
import csv
import seaborn as sns
from sklearn.model_selection import train_test_split
from torchtext.vocab import Vocab
from collections import Counter
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import accuracy_score
DATA_PATH = ['train/train.tsv', 'dev-0/in.tsv', 'dev-0/expected.tsv', 'test-A/in.tsv']
DATA_PATH_OUTPUT = ['dev-0/out.tsv', 'test-A/out.tsv']
def get_data(path):
train = pd.read_table(path, error_bad_lines=False, header=None, quoting=csv.QUOTE_NONE)
return train
def split(x):
return x.split()
def replace(x):
newList = []
for word in x:
if word == 'O':
newList.append(0)
if word == 'B-LOC':
newList.append(1)
if word == 'I-LOC':
newList.append(2)
if word == 'B-MISC':
newList.append(3)
if word == 'B-ORG':
newList.append(4)
if word == 'I-ORG':
newList.append(5)
if word == 'B-PER':
newList.append(6)
if word == 'I-PER':
newList.append(7)
return newList
def build_vocab(dataset):
counter = Counter()
for document in dataset:
counter.update(document)
return Vocab(counter, specials=['<unk>', '<pad>', '<bos>', '<eos>'])
def labels_process(dt):
return [ torch.tensor([0] + document + [0], dtype = torch.long) for document in dt]
def data_process(dt, vocab):
return [ torch.tensor([vocab['<bos>']] +[vocab[token] for token in document ] + [vocab['<eos>']], dtype = torch.long) for document in dt]
class NeuralNetwork(torch.nn.Module):
def __init__(self, train_tokens_ids):
super(NeuralNetwork, self).__init__()
self.fc1 = torch.nn.Linear(10_000,len(train_tokens_ids))
self.softmax = torch.nn.Softmax(dim=0)
def forward(self, x):
x = self.fc1(x)
x = self.softmax(x)
return x
class NERModel(torch.nn.Module):
def __init__(self,):
super(NERModel, self).__init__()
self.emb = torch.nn.Embedding(23627,200)
self.fc1 = torch.nn.Linear(600,9)
def forward(self, x):
x = self.emb(x)
x = x.reshape(600)
x = self.fc1(x)
return x
def configure(train, vocab):
train_labels = labels_process(train[0])
train_tokens_ids = data_process(train[1], vocab)
ner_model = NERModel()
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(ner_model.parameters())
nn_model = NeuralNetwork(train_tokens_ids)
return train_labels, train_tokens_ids, ner_model, criterion, optimizer, nn_model
def training(nn_model, train_labels, train_tokens_ids, ner_model, optimizer, criterion):
for epoch in range(2):
loss_score = 0
acc_score = 0
prec_score = 0
selected_items = 0
recall_score = 0
relevant_items = 0
items_total = 0
nn_model.train()
for i in range(100):
for j in range(1, len(train_labels[i]) - 1):
X = train_tokens_ids[i][j-1: j+2]
Y = train_labels[i][j: j+1]
Y_predictions = ner_model(X)
acc_score += int(torch.argmax(Y_predictions) == Y)
if torch.argmax(Y_predictions) != 0:
selected_items +=1
if torch.argmax(Y_predictions) != 0 and torch.argmax(Y_predictions) == Y.item():
prec_score += 1
if Y.item() != 0:
relevant_items +=1
if Y.item() != 0 and torch.argmax(Y_predictions) == Y.item():
recall_score += 1
items_total += 1
optimizer.zero_grad()
loss = criterion(Y_predictions.unsqueeze(0), Y)
loss.backward()
optimizer.step()
loss_score += loss.item()
def eval_dev(nn_model, dev_tokens_ids, dev_labels, ner_model):
result = []
nn_model.eval()
for i in range(len(dev_tokens_ids)):
result.append([])
for j in range(1, len(dev_labels[i]) - 1):
X = dev_tokens_ids[i][j-1: j+2]
Y = dev_labels[i][j: j+1]
Y_predictions = ner_model(X)
result[i].append(int(torch.argmax(Y_predictions)))
return result
def eval_test(nn_model, test_tokens_ids, ner_model):
result = []
nn_model.eval()
for i in range(len(test_tokens_ids)):
result.append([])
for j in range(1, len(test_tokens_ids[i]) - 1):
X = test_tokens_ids[i][j-1: j+2]
Y_predictions = ner_model(X)
result[i].append(int(torch.argmax(Y_predictions)))
return result
def generate_result(result,path):
features = ['O', 'B-LOC', 'I-LOC', 'B-MISC', 'B-ORG', 'I-ORG', 'B-PER', 'B-PER', 'I-PER']
final_result = []
for i in range(len(result)):
final_result.append([])
for j in range(len(result[i])):
final_result[i].append(features[result[i][j]])
f = open(path, "a")
for i in final_result:
f.write(' '.join(i) + '\n')
f.close()
def main():
#prepare train
train = get_data(DATA_PATH[0])
train[0] = train[0].map(split)
train[1] = train[1].map(split)
train[0] = train[0].map(replace)
#configure
vocab = build_vocab(train[1])
train_labels, train_tokens_ids, ner_model, criterion, optimizer, nn_model = configure(train, vocab)
#train
training(nn_model, train_labels, train_tokens_ids, ner_model, optimizer, criterion)
#dev
dev_in = get_data(DATA_PATH[1])
dev_ex = get_data(DATA_PATH[2])
dev_in[0] = dev_in[0].map(split)
dev_ex[0] = dev_ex[0].map(split)
dev_ex[0] = dev_ex[0].map(replace)
dev_labels = labels_process(dev_ex[0])
dev_tokens_ids = data_process(dev_in[0], vocab)
result_dev = eval_dev(nn_model, dev_tokens_ids, dev_labels, ner_model)
#test
test_in = get_data(DATA_PATH[3])
test_in[0] = test_in[0].map(split)
test_tokens_ids = data_process(test_in[0], vocab)
result_test = eval_test(nn_model, test_tokens_ids, ner_model)
#results
generate_result(result_dev, DATA_PATH_OUTPUT[0])
generate_result(result_test, DATA_PATH_OUTPUT[1])
if __name__ == '__main__':
main()