142 lines
5.6 KiB
Python
142 lines
5.6 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
import tensorflow as tf
|
|
from tensorflow.keras.preprocessing.sequence import pad_sequences
|
|
from tensorflow.keras.utils import to_categorical
|
|
from sklearn.metrics import classification_report
|
|
import sys
|
|
|
|
# Ustawienie kodowania na utf-8
|
|
sys.stdout.reconfigure(encoding='utf-8')
|
|
|
|
# Load data
|
|
def load_data(file_path):
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
lines = file.readlines()
|
|
sentences = [line.strip() for line in lines]
|
|
return sentences
|
|
|
|
train_data = pd.read_csv('./train/train.tsv', sep='\t', header=None, names=['label', 'sentence'], encoding='utf-8')
|
|
dev_sentences = load_data('./dev-0/in.tsv')
|
|
dev_labels = load_data('./dev-0/expected.tsv')
|
|
test_sentences = load_data('./test-A/in.tsv')
|
|
|
|
# Preprocess data
|
|
def preprocess_data(sentences, labels=None):
|
|
tokenized_sentences = [sentence.split() for sentence in sentences]
|
|
if labels is not None:
|
|
tokenized_labels = [label.split() for label in labels]
|
|
return tokenized_sentences, tokenized_labels
|
|
return tokenized_sentences
|
|
|
|
train_sentences, train_labels = preprocess_data(train_data['sentence'].values, train_data['label'].values)
|
|
dev_sentences, dev_labels = preprocess_data(dev_sentences, dev_labels)
|
|
test_sentences = preprocess_data(test_sentences)
|
|
|
|
# Create a word index and label index
|
|
special_tokens = ['<PAD>', '<UNK>', '<BOS>', '<EOS>']
|
|
word2idx = {w: i + len(special_tokens) for i, w in enumerate(set(word for sentence in train_sentences for word in sentence))}
|
|
for i, token in enumerate(special_tokens):
|
|
word2idx[token] = i
|
|
|
|
idx2word = {i: w for w, i in word2idx.items()}
|
|
|
|
label2idx = {
|
|
'O': 0,
|
|
'B-PER': 1, 'I-PER': 2,
|
|
'B-ORG': 3, 'I-ORG': 4,
|
|
'B-LOC': 5, 'I-LOC': 6,
|
|
'B-MISC': 7, 'I-MISC': 8
|
|
}
|
|
idx2label = {i: l for l, i in label2idx.items()}
|
|
|
|
# Convert words and labels to integers
|
|
def encode_data(sentences, labels=None):
|
|
encoded_sentences = [[word2idx.get(word, word2idx['<UNK>']) for word in sentence] for sentence in sentences]
|
|
if labels is not None:
|
|
encoded_labels = [[label2idx[label] for label in label_list] for label_list in labels]
|
|
return encoded_sentences, encoded_labels
|
|
return encoded_sentences
|
|
|
|
X_train, y_train = encode_data(train_sentences, train_labels)
|
|
X_dev, y_dev = encode_data(dev_sentences, dev_labels)
|
|
X_test = encode_data(test_sentences)
|
|
|
|
# Limit sequence length to avoid excessive memory usage
|
|
max_len = 1000 # You can adjust this value to a reasonable limit based on your data and memory
|
|
X_train = pad_sequences(X_train, padding='post', maxlen=max_len)
|
|
y_train = pad_sequences(y_train, padding='post', maxlen=max_len)
|
|
|
|
X_dev = pad_sequences(X_dev, padding='post', maxlen=max_len)
|
|
y_dev = pad_sequences(y_dev, padding='post', maxlen=max_len)
|
|
|
|
X_test = pad_sequences(X_test, padding='post', maxlen=max_len)
|
|
|
|
y_train = [to_categorical(i, num_classes=len(label2idx)) for i in y_train]
|
|
y_dev = [to_categorical(i, num_classes=len(label2idx)) for i in y_dev]
|
|
|
|
# Define the model with reduced complexity
|
|
model = tf.keras.models.Sequential([
|
|
tf.keras.layers.Embedding(input_dim=len(word2idx), output_dim=64, input_length=max_len),
|
|
tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=32, return_sequences=True)),
|
|
tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(len(label2idx), activation='softmax'))
|
|
])
|
|
|
|
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
|
|
|
|
# Train the model with a smaller batch size
|
|
history = model.fit(X_train, np.array(y_train), validation_data=(X_dev, np.array(y_dev)), epochs=25, batch_size=16)
|
|
|
|
# Evaluate the model
|
|
y_pred = model.predict(X_dev)
|
|
y_pred = np.argmax(y_pred, axis=-1)
|
|
y_true = np.argmax(np.array(y_dev), axis=-1)
|
|
|
|
# Map predictions and true labels to their original tags
|
|
y_pred_tags = [[idx2label[i] for i in row] for row in y_pred]
|
|
y_true_tags = [[idx2label[i] for i in row] for row in y_true]
|
|
|
|
# Print the classification report using UTF-8 encoding
|
|
print(classification_report(
|
|
[item for sublist in y_true_tags for item in sublist],
|
|
[item for sublist in y_pred_tags for item in sublist],
|
|
labels=list(label2idx.values())[1:], # Exclude the padding label from the report
|
|
target_names=[idx2label[i] for i in list(label2idx.values())[1:]]
|
|
))
|
|
|
|
# Correct IOB labels function
|
|
def correct_iob_labels(predictions):
|
|
corrected = []
|
|
for pred in predictions:
|
|
corrected_sentence = []
|
|
prev_label = 'O'
|
|
for label in pred:
|
|
if label.startswith('I-') and (prev_label == 'O' or prev_label[2:] != label[2:]):
|
|
corrected_sentence.append('B-' + label[2:])
|
|
else:
|
|
corrected_sentence.append(label)
|
|
prev_label = corrected_sentence[-1]
|
|
corrected.append(corrected_sentence)
|
|
return corrected
|
|
|
|
# Predict on test data
|
|
y_test_pred = model.predict(X_test)
|
|
y_test_pred = np.argmax(y_test_pred, axis=-1)
|
|
y_test_pred_tags = [[idx2label[i] for i in row] for row in y_test_pred]
|
|
|
|
# Correct the predicted tags
|
|
y_pred_tags_corrected = correct_iob_labels(y_pred_tags)
|
|
y_test_pred_tags_corrected = correct_iob_labels(y_test_pred_tags)
|
|
|
|
# Save dev predictions
|
|
dev_predictions = [' '.join(tags) for tags in y_pred_tags_corrected]
|
|
with open('./dev0/out.tsv', 'w', encoding='utf-8') as f:
|
|
for prediction in dev_predictions:
|
|
f.write("%s\n" % prediction)
|
|
|
|
# Save test predictions
|
|
test_predictions = [' '.join(tags) for tags in y_test_pred_tags_corrected]
|
|
with open('./testA/out.tsv', 'w', encoding='utf-8') as f:
|
|
for prediction in test_predictions:
|
|
f.write("%s\n" % prediction)
|