import torch
from torchtext.vocab import Vocab
import lzma
from torchcrf import CRF
import numpy as np
import re
import itertools
from datasets import load_dataset
from torchtext.vocab import Vocab
from collections import Counter
# zadanie domowe oparte na jupyter notebook z zajec https://git.wmi.amu.edu.pl/filipg/aitech-eks-pub/src/branch/master/cw/11_NER_RNN_ODPOWIEDZI.ipynb
# i https://git.wmi.amu.edu.pl/filipg/aitech-eks-pub/src/branch/master/cw/09_sequence_labeling_ODPOWIEDZI.ipynb
# model sieci GRU
class GRU(torch.nn.Module):
def __init__(self):
super(GRU, self).__init__()
self.emb = torch.nn.Embedding(len(vocab.itos), 100)
self.dropout = torch.nn.Dropout(0.2)
self.rec = torch.nn.GRU(100, 256, 2, batch_first = True, bidirectional = True)
self.fc1 = torch.nn.Linear(2* 256 , 9)
def forward(self, x):
emb = torch.relu(self.emb(x))
emb = self.dropout(emb)
gru_output, h_n = self.rec(emb)
out_weights = self.fc1(gru_output)
return out_weights
class NERModel(torch.nn.Module):
def __init__(self,):
super(NERModel, self).__init__()
self.emb = torch.nn.Embedding(23627,200)
self.fc1 = torch.nn.Linear(600,9)
def forward(self, x):
x = self.emb(x)
x = x.reshape(600)
x = self.fc1(x)
return x
def read_data(filename):
data = lzma.open(filename).read().decode('UTF-8').split('\n')
return [line.split('\t') for line in data][:-1]
def data_process(dt):
return [ torch.tensor([vocab['<bos>']] +[vocab[token] for token in document ] + [vocab['<eos>']], dtype = torch.long) for document in dt]
def get_scores(y_true, y_pred):
acc_score = 0
tp = 0
selected_items = 0
relevant_items = 0
for p,t in zip(y_pred, y_true):
if p == t:
acc_score +=1
if p > 0 and p == t:
tp +=1
if p > 0:
selected_items += 1
if t > 0 :
relevant_items +=1
if selected_items == 0:
precision = 1.0
precision = tp / selected_items
if relevant_items == 0:
recall = 1.0
recall = tp / relevant_items
if precision + recall == 0.0 :
f1 = 0.0
f1 = 2* precision * recall / (precision + recall)
return precision, recall, f1
def labels_process(dt):
return [ torch.tensor([0] + document + [0], dtype = torch.long) for document in dt]
def build_vocab(dataset):
counter = Counter()
for document in dataset:
return Vocab(counter, specials=['<unk>', '<pad>', '<bos>', '<eos>'])
def predict_labels(dataset_tokens, dataset_labels, model):
print(len(dataset_tokens[0]), len(dataset_labels[0]))
Y_true = []
Y_pred = []
result = []
for i in tqdm(range(len(dataset_labels))):
batch_tokens = dataset_tokens[i].unsqueeze(1)
tags = list(dataset_labels[i].numpy())
emissions = gru(batch_tokens).squeeze(0)
tmp = crf.decode(emissions)[0]
Y_pred += tmp
result += [tmp]
Y_true += tags
print(get_scores(Y_true, Y_pred))
return result
def predict(path, model):
with open(path + '/in.tsv', "r", encoding="utf-8") as f:
data = [line.rstrip() for line in f]
data = [i.split() for i in data]
tokens_ids = data_process(data)
Y_true = []
Y_pred = []
result = []
for i in tqdm(range(len(tokens_ids))):
batch_tokens = tokens_ids[i].unsqueeze(1)
emissions = gru(batch_tokens).squeeze(0)
tmp = crf.decode(emissions)[0]
Y_pred += tmp
result += [tmp]
r = [[ner_tags_set[i] for i in tmp] for tmp in result]
r = [i[1:-1] for i in r]
for doc in r:
if doc[0] != 'O':
doc[0] = 'B' + doc[0][1:]
for i in range(len(doc))[:-1]:
if doc[i] == 'O':
if doc[i + 1] != 'O':
doc[i + 1] = 'B' + doc[i + 1][1:]
elif doc[i + 1] != 'O':
if doc[i][1:] == doc[i + 1][1:]:
doc[i + 1] = 'I' + doc[i + 1][1:]
doc[i + 1] = 'B' + doc[i + 1][1:]
f = open(path + "/out.tsv", "a")
for i in r:
f.write(' '.join(i) + '\n')
return result
# uzycie crf
def eval_model(dataset_tokens, dataset_labels, model):
Y_true = []
Y_pred = []
for i in tqdm(range(len(dataset_labels))):
batch_tokens = dataset_tokens[i].unsqueeze(1)
tags = list(dataset_labels[i].numpy())
emissions = gru(batch_tokens).squeeze(0)
Y_pred += crf.decode(emissions)[0]
Y_true += tags
return get_scores(Y_true, Y_pred)
device_gpu = torch.device("cuda:0")
gru = GRU()
criterion = torch.nn.CrossEntropyLoss()
params = list(gru.parameters()) + list(crf.parameters())
optimizer = torch.optim.Adam(gru.parameters())
train_set = read_data('train/train.tsv.xz')
tokens, ner_tags = [], []
for i in train_set:
vocab = build_vocab(tokens)
train_tokens_ids = data_process(tokens)
ner_tags_set = list(set(itertools.chain(*ner_tags)))
train_labels = labels_process([[ner_tags_set.index(token) for token in doc] for doc in ner_tags])
num_tags = max([max(x) for x in train_labels]) + 1
for i in range(EPOCHS):
for i in tqdm(range(len(train_labels))):
batch_tokens = train_tokens_ids[i].unsqueeze(1)
tags = train_labels[i].unsqueeze(1)
emissions = gru(batch_tokens).squeeze(0)
loss = -crf(emissions,tags.squeeze(0))
with open('dev-0/in.tsv', "r", encoding="utf-8") as f:
dev_0_data = [line.rstrip() for line in f]
dev_0_data = [i.split() for i in dev_0_data]
dev_0_tokens_ids = data_process(dev_0_data)
with open('dev-0/expected.tsv', "r", encoding="utf-8") as f:
dev_0_labels = [line.rstrip() for line in f]
dev_0_labels = [i.split() for i in dev_0_labels]
dev_0_labels = labels_process([[ner_tags_set.index(token) for token in doc] for doc in dev_0_labels])
tmp = predict_labels(dev_0_tokens_ids, dev_0_labels, gru)
r = [[ner_tags_set[i] for i in tmp2] for tmp2 in tmp]
r = [i[1:-1] for i in r]
for doc in r:
if doc[0] != 'O':
doc[0] = 'B' + doc[0][1:]
for i in range(len(doc))[:-1]:
if doc[i] == 'O':
if doc[i + 1] != 'O':
doc[i + 1] = 'B' + doc[i + 1][1:]
elif doc[i + 1] != 'O':
if doc[i][1:] == doc[i + 1][1:]:
doc[i + 1] = 'I' + doc[i + 1][1:]
doc[i + 1] = 'B' + doc[i + 1][1:]
predict('dev-0', gru)
predict('test-A', gru)
f = open("dev-0/out.tsv", "a")
for i in r:
f.write(' '.join(i) + '\n')