en-ner-conll-2003/script.py
2021-06-09 11:43:47 +02:00

158 lines
4.6 KiB
Python

import torch
import numpy as np
import pandas as pd
from collections import Counter
from torchtext.vocab import Vocab
# Vars
labels = ['O', 'B-LOC', 'I-LOC', 'B-MISC', 'I-MISC', 'B-ORG', 'I-ORG', 'B-PER', 'I-PER']
# Functions
def process(dt):
return [torch.tensor([vocab['<bos>']] + [vocab[token] for token in document] + [vocab['<eos>']], dtype=torch.long)
for document in dt]
def to_vocab(dataset):
counter = Counter()
for document in dataset:
counter.update(document)
return Vocab(counter, specials=['<unk>', '<pad>', '<bos>', '<eos>'])
def process_lables(dt):
return [torch.tensor([0] + document + [0], dtype=torch.long) for document in dt]
def get_prediction(input_tokens, labels):
results = []
for i in range(len(input_tokens)):
line_results = []
for j in range(1, len(input_tokens[i]) - 1):
x = input_tokens[i][j - 1: j + 2]
predicted = ner_model(x.long())
result = torch.argmax(predicted)
label = labels[result]
line_results.append(label)
results.append(line_results)
return results
def features(data):
featurez = []
for sentence in data["tokens"]:
t_sentence = torch.tensor(())
for word in sentence:
temp = torch.tensor([word[0].isupper(), len(word)])
t_sentence = torch.cat((t_sentence, temp))
featurez.append(t_sentence)
return featurez
def merge_features(token_ids, tensors_list):
return [torch.cat((token, tensors_list[i])) for i, token in enumerate(token_ids)]
def process_output(lines):
result = []
for line in lines:
last_label = None
new_line = []
for label in line:
if label != "O" and label[0:2] == "I-":
if last_label is None or last_label == "O":
label = label.replace('I-', 'B-')
else:
label = "I-" + last_label[2:]
last_label = label
new_line.append(label)
result.append(" ".join(new_line))
return result
def infer(path_in, path_out, labels):
df = pd.read_csv(path_in, sep='\t', names=['tokens'])
df_token_ids = process(df["tokens"].apply(lambda x: x.split()))
df_infer = merge_features(df_token_ids, features(df))
infers = get_prediction(df_infer, labels)
infers_processed = process_output(infers)
with open(path_out, "w") as file_out:
for inf in infers_processed:
file_out.write(inf + "\n")
df = pd.read_csv('train/train.tsv.xz', compression='xz', sep='\t', names=['iob', 'tokens'])
df["iob"] = df["iob"].apply(lambda x: [labels.index(y) for y in x.split()])
df["tokens"] = df["tokens"].apply(lambda x: x.split())
vocab = to_vocab(df['tokens'])
# Model
class ModelNet(torch.nn.Module):
def __init__(self, ):
super(ModelNet, self).__init__()
self.emb = torch.nn.Embedding(23628, 200)
self.fc1 = torch.nn.Linear(600, 9)
def forward(self, x):
x = self.emb(x)
x = x.reshape(600)
x = self.fc1(x)
return x
ner_model = ModelNet()
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(ner_model.parameters())
train_labels = process_lables(df['iob'])
train_tokens_ids = process(df['tokens'])
df_features = features(df)
train_tensors = merge_features(train_tokens_ids, df_features)
# Train
print('Training')
for epoch in range(2):
ner_model.train()
print(epoch)
for i in range(len(train_labels)):
for j in range(1, len(train_labels[i]) - 1):
X = train_tensors[i][j - 1: j + 2]
Y = train_labels[i][j: j + 1]
Y_predictions = ner_model(X.long())
optimizer.zero_grad()
loss = criterion(Y_predictions.unsqueeze(0), Y)
loss.backward()
optimizer.step()
dev0 = df = pd.read_csv('dev-0/in.tsv', sep='\t', names=['tokens'])
df_token_ids = process(df["tokens"].apply(lambda x: x.split()))
merged_features = merge_features(df_token_ids, features(df))
predictions = get_prediction(merged_features, labels)
processed_output = process_output(predictions)
dev_predictions = np.asarray(processed_output)
testA = df = pd.read_csv('test-A/in.tsv', sep='\t', names=['tokens'])
df_token_ids = process(df["tokens"].apply(lambda x: x.split()))
merged_features = merge_features(df_token_ids, features(df))
predictions = get_prediction(merged_features, labels)
processed_output = process_output(predictions)
test_predictions = np.asarray(processed_output)
dev_predictions.tofile("dev-0/out.tsv", sep="\n", format="%s")
test_predictions.tofile("test-A/out.tsv", sep="\n", format="%s")