s434749-ner-conll-2003/Main.py

212 lines
7.5 KiB
Python

from itertools import islice
from gensim.models import Word2Vec
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
import gensim.downloader as api
corpus = api.load('text8')
print("Loading word embeddings")
w2v = Word2Vec(corpus)
def window(seq, n=2):
"Returns a sliding window (of width n) over data from the iterable"
" s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... "
it = iter(seq)
result = tuple(islice(it, n))
if len(result) == n:
yield result
for elem in it:
result = result[1:] + (elem,)
yield result
EXTRA_FEATURE_LEN = 6
WORD_FEATURES_LEN = w2v.wv.vector_size
SPAN = 5
PAD = torch.zeros(WORD_FEATURES_LEN)
LABEL_ARRAY = ['O', 'LOC', 'MISC', 'ORG', 'PER']
LABEL = {'O': 0, 'LOC': 1, 'MISC': 2, 'ORG': 3, 'PER': 4}
def label_to_word(lbl, beginning):
if lbl == 0:
return 'O'
lbl = LABEL_ARRAY[lbl]
if beginning:
return 'B-' + lbl
else:
return 'I-' + lbl
TRAIN_DATA = []
TEST_DATA = []
PUNC = {',', '<', '/', '>', '%', '$', '#', '@', '^', '*', '(', ')', '[', ']', '{', '}', ':'}
SENTENCE_END = {'.', '?', '!', ';'}
N_GRAM_LEN = SPAN + 1 + SPAN
OUT_OF_VOCABULARY = torch.ones(WORD_FEATURES_LEN)
def vectorize(word):
extra_features = torch.tensor([word[0].isupper(), word[0].isdigit(), len(word) == 1,
len(word) == 2,
word[0] in PUNC, word[0] in SENTENCE_END])
word = word.lower()
if word in w2v.wv:
vec = w2v.wv[word]
vec = torch.from_numpy(vec)
else:
vec = OUT_OF_VOCABULARY
return extra_features, vec
def process_line(words, labels, data):
assert len(words) == len(labels), str(words) + '\n\n' + str(labels)
word_array, label_array, extra_array = [], [], []
for word, label in zip(words, labels):
if label.startswith('B-'):
label = LABEL[label[len('B-'):]]
elif label.startswith('I-'):
label = LABEL[label[len('I-'):]]
else:
label = 0
extra_features, vec = vectorize(word)
assert len(extra_features) == EXTRA_FEATURE_LEN
word_array.append(vec)
label_array.append(label)
extra_array.append(extra_features)
pad = [PAD] * SPAN
for words_and_extras, label in zip(window(zip(pad + word_array + pad,
[torch.zeros(EXTRA_FEATURE_LEN)] * SPAN + extra_array + [
torch.zeros(EXTRA_FEATURE_LEN)] * SPAN),
N_GRAM_LEN), label_array):
data.append((words_and_extras, torch.tensor(label, dtype=torch.long)))
with open('train/train.tsv') as f:
for line in tqdm(f, desc="Loading training data", total=945):
labels, words = line.split('\t')
words, labels = words.split(), labels.split()
process_line(words, labels, TRAIN_DATA)
with open('dev-0/expected.tsv') as l, open('dev-0/in.tsv') as w:
for labels, words in tqdm(zip(l, w), desc="Loading testing data", total=215):
words, labels = words.split(), labels.split()
process_line(words, labels, TEST_DATA)
def collate(batch):
batch_size = len(batch)
in_vec = torch.empty(batch_size, N_GRAM_LEN, WORD_FEATURES_LEN + EXTRA_FEATURE_LEN)
out_vec = torch.empty(batch_size, dtype=torch.long)
for n_gram_idx, (words_and_extras, label) in enumerate(batch):
out_vec[n_gram_idx] = label
for entry_idx, (word, extra) in enumerate(words_and_extras):
word_and_extra = torch.cat([word, extra])
in_vec[n_gram_idx, entry_idx, :] = word_and_extra
return in_vec, out_vec
traindataloader = torch.utils.data.DataLoader(TRAIN_DATA, batch_size=128, shuffle=True, collate_fn=collate)
testdataloader = torch.utils.data.DataLoader(TEST_DATA, batch_size=128, collate_fn=collate)
NUM_LABELS = len(LABEL)
class Net(torch.nn.Module):
def __init__(self):
super(Net, self).__init__()
self.lin1 = nn.Linear(N_GRAM_LEN * (WORD_FEATURES_LEN + EXTRA_FEATURE_LEN), 128)
self.lin2 = nn.Linear(128, NUM_LABELS)
def forward(self, x):
x = self.lin1(x)
x = F.relu(x, True)
x = self.lin2(x)
x = torch.log_softmax(x, dim=1)
return x
DEVICE = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
model = Net().to(DEVICE)
label_criterion = torch.nn.NLLLoss()
optimizer = torch.optim.Adam(model.parameters())
def predict_line(words):
word_array, extra_array = [], []
for word in words:
extra_features, vec = vectorize(word)
assert len(extra_features) == EXTRA_FEATURE_LEN
word_array.append(vec)
extra_array.append(extra_features)
pad = [PAD] * SPAN
batch = list(window(zip(pad + word_array + pad,
[torch.zeros(EXTRA_FEATURE_LEN)] * SPAN + extra_array + [
torch.zeros(EXTRA_FEATURE_LEN)] * SPAN),N_GRAM_LEN))
batch_size = len(batch)
in_vec = torch.empty(batch_size, N_GRAM_LEN, WORD_FEATURES_LEN + EXTRA_FEATURE_LEN)
for n_gram_idx, words_and_extras in enumerate(batch):
for entry_idx, (word, extra) in enumerate(words_and_extras):
word_and_extra = torch.cat([word, extra])
in_vec[n_gram_idx, entry_idx, :] = word_and_extra
in_vec = in_vec.to(DEVICE)
in_vec = in_vec.reshape(batch_size, N_GRAM_LEN * (WORD_FEATURES_LEN + EXTRA_FEATURE_LEN))
labels = model(in_vec)
labels = labels.argmax(dim=1)
assert len(words) == len(labels)
str_labels = [''] * len(labels)
for i in range(1, len(labels)):
str_labels[i] = label_to_word(labels[i], labels[i - 1] != labels[i])
str_labels[0] = label_to_word(labels[0], True)
return str_labels
def predict_file(dir_path='dev-0'):
with open(dir_path + '/in.tsv') as w, open(dir_path + '/out.tsv', 'w+') as l:
for words in tqdm(w, desc=dir_path + " predictions", total=215):
words = words.split()
print(' '.join(predict_line(words)), file=l)
for epoch in range(100):
train_label_accuracy = 0
for in_vec, out_vec in tqdm(traindataloader, desc="training"):
batch_size = in_vec.size()[0]
in_vec = in_vec.to(DEVICE)
out_vec = out_vec.to(DEVICE)
in_vec = in_vec.reshape(batch_size, N_GRAM_LEN * (WORD_FEATURES_LEN + EXTRA_FEATURE_LEN))
out_labels = model(in_vec)
loss = label_criterion(out_labels, out_vec)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Stats
out_labels = out_labels.argmax(dim=1)
train_label_accuracy += (out_labels == out_vec).sum().item()
train_label_accuracy = train_label_accuracy / len(TRAIN_DATA)
test_label_accuracy = 0
for in_vec, out_vec in tqdm(testdataloader, desc="testing"):
batch_size = in_vec.size()[0]
in_vec = in_vec.to(DEVICE)
out_vec = out_vec.to(DEVICE)
in_vec = in_vec.reshape(batch_size, N_GRAM_LEN * (WORD_FEATURES_LEN + EXTRA_FEATURE_LEN))
out_labels = model(in_vec)
# Stats
out_labels = out_labels.argmax(dim=1)
test_label_accuracy += (out_labels == out_vec).sum().item()
test_label_accuracy = test_label_accuracy / len(TEST_DATA)
print()
print("Epoch: ", epoch)
print('train label accuracy: ', train_label_accuracy)
print('test label accuracy: ', test_label_accuracy)
torch.save(model.state_dict(), 'epoch_' + str(epoch) + '.pth')
predict_file('dev-0')
predict_file('test-A')