#!/usr/bin/env python # coding: utf-8 # # Klasyfikacja wieloklasowa i sequence labelling # In[1]: import numpy as np import gensim import torch import pandas as pd import seaborn as sns from sklearn.model_selection import train_test_split from datasets import load_dataset from torchtext.vocab import Vocab from collections import Counter from sklearn.datasets import fetch_20newsgroups from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics import accuracy_score # ### Zadanie domowe # # - sklonować repozytorium https://git.wmi.amu.edu.pl/kubapok/en-ner-conll-2003 # - stworzyć klasyfikator bazujący na sieci neuronowej feed forward w pytorchu (można bazować na tym jupyterze lub nie). # - klasyfikator powinien obejmować dodatkowe cechy (np. długość wyrazu, czy wyraz zaczyna się od wielkiej litery, stemmming słowa, czy zawiera cyfrę) # - stworzyć predykcje w plikach dev-0/out.tsv oraz test-A/out.tsv # - wynik fscore sprawdzony za pomocą narzędzia geval (patrz poprzednie zadanie) powinien wynosić conajmniej 0.60 # - proszę umieścić predykcję oraz skrypty generujące (w postaci tekstowej a nie jupyter) w repo, a w MS TEAMS umieścić link do swojego repo # termin 08.06, 80 punktów # # # train # In[2]: import lzma import re import itertools import torch # In[3]: def read_data(filename): all_data = lzma.open(filename).read().decode('UTF-8').split('\n') return [line.split('\t') for line in all_data][:-1] train_data = read_data('train/train.tsv.xz') tokens, ner_tags = [], [] for i in train_data: ner_tags.append(i[0].split()) tokens.append(i[1].split()) # In[4]: ner_tags_set = list(set(itertools.chain(*ner_tags))) print(ner_tags_set) # In[5]: ner_tags_dic = {} for i in range(len(ner_tags_set)): ner_tags_dic[ner_tags_set[i]] = i print(ner_tags_dic) # In[6]: for i in range(len(ner_tags)): for j in range(len(ner_tags[i])): ner_tags[i][j] = ner_tags_dic[ner_tags[i][j]] # In[7]: def data_process(dt): return [ torch.tensor([vocab['']] +[vocab[token] for token in document ] + [vocab['']], dtype = torch.long) for document in dt] # In[8]: def labels_process(dt): return [ torch.tensor([0] + document + [0], dtype = torch.long) for document in dt] # In[9]: def build_vocab(dataset): counter = Counter() for document in dataset: counter.update(document) return Vocab(counter, specials=['', '', '', '']) # In[10]: vocab = build_vocab(tokens) # In[11]: train_tokens_ids = data_process(tokens) train_labels = labels_process(ner_tags) # In[12]: train_tokens_ids[0] # In[13]: class NeuralNetworkModel(torch.nn.Module): def __init__(self, output_size): super(NeuralNetworkModel, self).__init__() self.fc1 = torch.nn.Linear(10_000,len(train_tokens_ids)) self.softmax = torch.nn.Softmax(dim=0) def forward(self, x): x = self.fc1(x) x = self.softmax(x) return x # In[14]: class NERModel(torch.nn.Module): def __init__(self,): super(NERModel, self).__init__() self.emb = torch.nn.Embedding(23627,200) self.fc1 = torch.nn.Linear(600,9) def forward(self, x): x = self.emb(x) x = x.reshape(600) x = self.fc1(x) #x = self.softmax(x) return x # In[15]: nn_model = NeuralNetworkModel(len(train_tokens_ids)) # In[16]: train_tokens_ids[0][1:4] # In[17]: ner_model = NERModel() # In[18]: ner_model(train_tokens_ids[0][1:4]) # In[19]: criterion = torch.nn.CrossEntropyLoss() # In[20]: optimizer = torch.optim.Adam(ner_model.parameters()) # In[21]: len(train_labels) # In[22]: for epoch in range(2): loss_score = 0 acc_score = 0 prec_score = 0 selected_items = 0 recall_score = 0 relevant_items = 0 items_total = 0 nn_model.train() for i in range(100): for j in range(1, len(train_labels[i]) - 1): X = train_tokens_ids[i][j-1: j+2] Y = train_labels[i][j: j+1] Y_predictions = ner_model(X) acc_score += int(torch.argmax(Y_predictions) == Y) if torch.argmax(Y_predictions) != 0: selected_items +=1 if torch.argmax(Y_predictions) != 0 and torch.argmax(Y_predictions) == Y.item(): prec_score += 1 if Y.item() != 0: relevant_items +=1 if Y.item() != 0 and torch.argmax(Y_predictions) == Y.item(): recall_score += 1 items_total += 1 optimizer.zero_grad() loss = criterion(Y_predictions.unsqueeze(0), Y) loss.backward() optimizer.step() loss_score += loss.item() precision = prec_score / selected_items recall = recall_score / relevant_items f1_score = (2*precision * recall) / (precision + recall) display('epoch: ', epoch) display('loss: ', loss_score / items_total) display('acc: ', acc_score / items_total) display('prec: ', precision) display('recall: : ', recall) display('f1: ', f1_score) # # dev-0 # In[23]: with open('dev-0/in.tsv', "r", encoding="utf-8") as f: dev_0_data = [line.rstrip() for line in f] dev_0_data = [i.split() for i in dev_0_data] # In[24]: with open('dev-0/expected.tsv', "r", encoding="utf-8") as f: dev_0_tags = [line.rstrip() for line in f] dev_0_tags = [i.split() for i in dev_0_tags] # In[25]: for i in range(len(dev_0_tags)): for j in range(len(dev_0_tags[i])): dev_0_tags[i][j] = ner_tags_dic[dev_0_tags[i][j]] # In[26]: test_tokens_ids = data_process(dev_0_data) test_labels = labels_process(dev_0_tags) # In[27]: result = [] loss_score = 0 acc_score = 0 prec_score = 0 selected_items = 0 recall_score = 0 relevant_items = 0 items_total = 0 nn_model.eval() for i in range(len(test_tokens_ids)): result.append([]) for j in range(1, len(test_labels[i]) - 1): X = test_tokens_ids[i][j-1: j+2] Y = test_labels[i][j: j+1] Y_predictions = ner_model(X) acc_score += int(torch.argmax(Y_predictions) == Y) if torch.argmax(Y_predictions) != 0: selected_items +=1 if torch.argmax(Y_predictions) != 0 and torch.argmax(Y_predictions) == Y.item(): prec_score += 1 if Y.item() != 0: relevant_items +=1 if Y.item() != 0 and torch.argmax(Y_predictions) == Y.item(): recall_score += 1 items_total += 1 loss = criterion(Y_predictions.unsqueeze(0), Y) loss_score += loss.item() result[i].append(int(torch.argmax(Y_predictions))) precision = prec_score / selected_items recall = recall_score / relevant_items f1_score = (2*precision * recall) / (precision + recall) display('loss: ', loss_score / items_total) display('acc: ', acc_score / items_total) display('prec: ', precision) display('recall: : ', recall) display('f1: ', f1_score) # In[28]: tags = [] tmp = [] for i in ner_tags_dic: tmp.append(i) for i in range(len(result)): tags.append([]) for j in range(len(result[i])): tags[i].append(tmp[result[i][j]]) # In[29]: f = open("dev-0/out.tsv", "a") for i in tags: f.write(' '.join(i) + '\n') f.close() # In[30]: with open('dev-0/expected.tsv', "r", encoding="utf-8") as f: dev_0_tags = [line.rstrip() for line in f] dev_0_tags = [i.split() for i in dev_0_tags] # In[31]: import math t = 0 for i in range(len(tags)): for j in range(len(tags[i])): if tags[i][j] == dev_0_tags[i][j]: t += 1 print(t/len(list((itertools.chain(*tags))))) # # test # In[32]: with open('test-A/in.tsv', "r", encoding="utf-8") as f: test_data = [line.rstrip() for line in f] test_data = [i.split() for i in test_data] # In[33]: test_tokens_ids = data_process(test_data) # In[34]: result = [] loss_score = 0 acc_score = 0 prec_score = 0 selected_items = 0 recall_score = 0 relevant_items = 0 items_total = 0 nn_model.eval() for i in range(len(test_tokens_ids)): result.append([]) for j in range(1, len(test_tokens_ids[i]) - 1): X = test_tokens_ids[i][j-1: j+2] Y_predictions = ner_model(X) result[i].append(int(torch.argmax(Y_predictions))) # In[35]: tags = [] tmp = [] for i in ner_tags_dic: tmp.append(i) for i in range(len(result)): tags.append([]) for j in range(len(result[i])): tags[i].append(tmp[result[i][j]]) # In[36]: f = open("test-A/out.tsv", "a") for i in tags: f.write(' '.join(i) + '\n') f.close()