forked from kubapok/en-ner-conll-2003
206 lines
4.7 KiB
Python
206 lines
4.7 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
import torch
|
|
from tqdm import tqdm
|
|
import gensim.downloader as api
|
|
from gensim.models.word2vec import Word2Vec
|
|
|
|
|
|
# DF
|
|
|
|
train = pd.read_table(
|
|
"train/train.tsv.xz", error_bad_lines=False, header=None, quoting=3
|
|
)
|
|
testA = pd.read_table("test-A/in.tsv", error_bad_lines=False, header=None, quoting=3)
|
|
dev0 = pd.read_table("dev-0/in.tsv", error_bad_lines=False, header=None, quoting=3)
|
|
|
|
# VARS
|
|
|
|
w2v = Word2Vec(api.load("text8"))
|
|
X_train, X_test, X_dev, y_train, dev_predictions, test_predictions = (
|
|
[],
|
|
[],
|
|
[],
|
|
[],
|
|
[],
|
|
[],
|
|
)
|
|
|
|
|
|
## CONST
|
|
|
|
LABELS = {
|
|
"O": 0,
|
|
"B-ORG": 1,
|
|
"B-MISC": 2,
|
|
"B-PER": 3,
|
|
"I-PER": 4,
|
|
"I-MISC": 5,
|
|
"B-LOC": 6,
|
|
"I-ORG": 7,
|
|
"I-LOC": 8,
|
|
}
|
|
SPECIAL_CHARACTERS = (
|
|
",",
|
|
"<",
|
|
"/",
|
|
">",
|
|
"%",
|
|
"$",
|
|
"#",
|
|
"@",
|
|
"^",
|
|
"*",
|
|
"(",
|
|
")",
|
|
"[",
|
|
"]",
|
|
"{",
|
|
"}",
|
|
":",
|
|
)
|
|
ONES = np.ones(w2v.vector_size)
|
|
|
|
|
|
# FUNCTIONS
|
|
def get_key_by_value(value):
|
|
for key, dict_value in LABELS.items():
|
|
if dict_value == value:
|
|
return key
|
|
return 0
|
|
|
|
|
|
def to_vector(word):
|
|
features_array = np.array(
|
|
[
|
|
any(c for c in word if c in SPECIAL_CHARACTERS),
|
|
word.isalpha(),
|
|
word[0].isupper(),
|
|
len(word) == 1,
|
|
len(word) == 2,
|
|
],
|
|
).reshape(-1, 1)
|
|
word = word.lower()
|
|
vec = w2v.wv[word] if word in w2v.wv else ONES
|
|
vec = vec.reshape(-1, 1)
|
|
return np.concatenate((vec, features_array))
|
|
|
|
|
|
def stringify_prediction(prediction):
|
|
labels = [get_key_by_value(value) for value in prediction.tolist()]
|
|
output = []
|
|
previous_label = None
|
|
for label in labels:
|
|
if label != "O":
|
|
if previous_label:
|
|
if previous_label == label:
|
|
output.append(f"I-{label[2:]}")
|
|
else:
|
|
output.append(f"B-{label[2:]}")
|
|
else:
|
|
output.append(f"B-{label[2:]}")
|
|
else:
|
|
output.append(label)
|
|
previous_label = label
|
|
return " ".join(output)
|
|
|
|
|
|
# MODEL
|
|
|
|
|
|
class NERModel(torch.nn.Module):
|
|
def __init__(self, input_size, hidden_size, num_classes):
|
|
super(NERModel, self).__init__()
|
|
self.l1 = torch.nn.Linear(input_size, hidden_size)
|
|
self.l2 = torch.nn.Linear(hidden_size, num_classes)
|
|
|
|
def forward(self, x):
|
|
x = self.l1(x)
|
|
x = torch.relu(x)
|
|
x = self.l2(x)
|
|
x = torch.log_softmax(x, dim=1)
|
|
return x
|
|
|
|
|
|
model = NERModel(105, 600, len(LABELS))
|
|
criterion = torch.nn.NLLLoss()
|
|
optimizer = torch.optim.Adam(model.parameters())
|
|
batch_size = 64
|
|
|
|
# READ DATA
|
|
|
|
|
|
for index, row in tqdm(
|
|
train.iterrows(), desc="Loading train data", total=train.shape[0]
|
|
):
|
|
labels, words = row[0], row[1]
|
|
words, labels = words.split(), labels.split()
|
|
for word in words:
|
|
X_train.append(to_vector(word))
|
|
for label in labels:
|
|
if label.startswith("B-") or label.startswith("I-"):
|
|
y_train.append(LABELS[label])
|
|
else:
|
|
y_train.append(0)
|
|
|
|
for index, row in tqdm(dev0.iterrows(), desc="Loading dev data", total=dev0.shape[0]):
|
|
words = row[0]
|
|
words = words.split()
|
|
words = [to_vector(word) for word in words]
|
|
X_dev.append(words)
|
|
|
|
for index, row in tqdm(
|
|
testA.iterrows(), desc="Loading test data", total=testA.shape[0]
|
|
):
|
|
words = row[0]
|
|
words = words.split()
|
|
words = [to_vector(word) for word in words]
|
|
X_test.append(words)
|
|
|
|
|
|
print("TRAINING")
|
|
|
|
# TRAINING
|
|
for epoch in range(100):
|
|
model.train()
|
|
for i in range(0, len(y_train), batch_size):
|
|
X = X_train[i : i + batch_size]
|
|
X = np.array(X).reshape(len(X), 105)
|
|
X = torch.tensor(X)
|
|
y = y_train[i : i + batch_size]
|
|
y = np.array(y)
|
|
y = torch.tensor(y)
|
|
|
|
outputs = model(X.float())
|
|
loss = criterion(outputs, y.long())
|
|
|
|
optimizer.zero_grad()
|
|
loss.backward()
|
|
optimizer.step()
|
|
|
|
model.eval()
|
|
with torch.no_grad():
|
|
for i in range(0, len(X_dev)):
|
|
X = X_dev[i]
|
|
X = np.array(X).reshape(len(X), 105)
|
|
X = torch.tensor(X)
|
|
|
|
output = model(X.float())
|
|
prediction = torch.argmax(output, dim=1)
|
|
dev_predictions.append(stringify_prediction(prediction))
|
|
|
|
for i in range(0, len(X_test)):
|
|
X = X_test[i]
|
|
X = np.array(X).reshape(len(X), 105)
|
|
X = torch.tensor(X)
|
|
|
|
output = model(X.float())
|
|
prediction = torch.argmax(output, dim=1)
|
|
test_predictions.append(stringify_prediction(prediction))
|
|
|
|
dev_predictions = np.asarray(dev_predictions)
|
|
test_predictions = np.asarray(test_predictions)
|
|
|
|
dev_predictions.tofile("dev-0/out.tsv", sep="\n", format="%s")
|
|
test_predictions.tofile("test-A/out.tsv", sep="\n", format="%s")
|