3RNN/main.py

153 lines
5.5 KiB
Python
Raw Permalink Normal View History

2024-05-26 19:49:15 +02:00
import os
import lzma
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Bidirectional, TimeDistributed, Dense, Dropout
from tensorflow.keras.optimizers import Adam
# Load data
def load_data(filepath):
if filepath.endswith('.xz'):
with lzma.open(filepath, 'rt') as file:
data = file.read()
else:
with open(filepath, 'r') as file:
data = file.read()
return data
# Preprocess data
def preprocess(data, labels=None):
sentences = []
labels_list = []
sentence = []
label_sentence = []
data_lines = data.split('\n')
label_lines = [line.split() for line in labels.split('\n')] if labels else []
for i, line in enumerate(data_lines):
if line.strip() == '':
if len(sentence) > 0:
sentences.append(sentence)
if labels:
labels_list.append(label_sentence)
sentence = []
label_sentence = []
else:
parts = line.strip().split('\t')
if len(parts) == 2:
word = parts[0]
label = parts[1]
sentence.append(word)
if labels:
label_sentence.append(label)
else:
continue # Skip lines that don't have the expected format
return sentences, labels_list
# Extract unique tags from labels
def extract_tags(labels_list):
tags = set()
for label_seq in labels_list:
tags.update(label_seq)
return tags
# Prepare data for model
def prepare_data(sentences, labels, word2idx, tag2idx, max_len):
X = [[word2idx.get(word, word2idx["UNK"]) for word in sentence] for sentence in sentences]
X = pad_sequences(X, maxlen=max_len, padding='post')
y = [[tag2idx[label] for label in label_seq] for label_seq in labels]
y = pad_sequences(y, maxlen=max_len, padding='post')
y = [to_categorical(i, num_classes=len(tag2idx)) for i in y]
return X, y
# Define model
def define_bilstm_model(vocab_size, tag_size, max_len, embedding_dim=50):
model = Sequential()
model.add(Embedding(input_dim=vocab_size, output_dim=embedding_dim))
model.add(Bidirectional(LSTM(units=100, return_sequences=True, recurrent_dropout=0.1)))
model.add(TimeDistributed(Dense(tag_size, activation='softmax')))
model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
return model
# Write predictions
def write_predictions(filepath, sentences, predictions, idx2tag):
with open(filepath, 'w') as file:
for sentence, prediction in zip(sentences, predictions):
for word, pred in zip(sentence, prediction):
file.write(f"{word}\t{idx2tag[np.argmax(pred)]}\n")
file.write("\n")
# Evaluate with GEval
def evaluate_with_geval():
os.system('geval -t config.txt -r mnt/dev-0/expected.tsv -p mnt/dev-0/out.tsv')
# Paths
train_path = 'mnt/train/train.tsv'
dev_in_path = 'mnt/dev-0/in.tsv'
dev_expected_path = 'mnt/dev-0/expected.tsv'
test_in_path = 'mnt/test-A/in.tsv'
dev_out_path = 'mnt/dev-0/out.tsv'
test_out_path = 'mnt/test-A/out.tsv'
# Load data
train_data = load_data(train_path)
dev_input_data = load_data(dev_in_path)
dev_expected_data = load_data(dev_expected_path)
test_input_data = load_data(test_in_path)
# Preprocess data
train_sentences, train_labels = preprocess(train_data)
dev_sentences, dev_labels = preprocess(dev_input_data, dev_expected_data)
test_sentences, _ = preprocess(test_input_data)
# Debugging: Print the lengths of the sentences and labels
print(f"Number of training sentences: {len(train_sentences)}")
print(f"Number of training labels: {len(train_labels)}")
print(f"Number of dev sentences: {len(dev_sentences)}")
print(f"Number of dev labels: {len(dev_labels)}")
# Extract all unique tags from training and dev labels
all_tags = extract_tags(train_labels) | extract_tags(dev_labels)
# Create word and tag indices
all_words = set([word for sentence in train_sentences for word in sentence])
word2idx = {word: idx + 2 for idx, word in enumerate(all_words)}
word2idx["UNK"] = 1
word2idx["PAD"] = 0
tag2idx = {tag: idx for idx, tag in enumerate(all_tags)}
idx2tag = {idx: tag for tag, idx in tag2idx.items()}
print("Tag to Index Mapping:", tag2idx) # Debugging
# Prepare data for model
max_len = 100 # Example max length, should be based on actual data
X_train, y_train = prepare_data(train_sentences, train_labels, word2idx, tag2idx, max_len)
X_dev, y_dev = prepare_data(dev_sentences, dev_labels, word2idx, tag2idx, max_len)
X_test, _ = prepare_data(test_sentences, [[]]*len(test_sentences), word2idx, tag2idx, max_len)
# Debugging: Print the shapes of the prepared data
print(f"Shape of X_train: {X_train.shape}")
print(f"Shape of y_train: {len(y_train)}")
print(f"Shape of X_dev: {X_dev.shape}")
print(f"Shape of y_dev: {len(y_dev)}")
# Define model
vocab_size = len(word2idx)
tag_size = len(tag2idx)
model = define_bilstm_model(vocab_size, tag_size, max_len)
# Train model
model.fit(X_train, np.array(y_train), validation_data=(X_dev, np.array(y_dev)), epochs=3, batch_size=32, verbose=1)
# Predict
dev_predictions = model.predict(X_dev)
test_predictions = model.predict(X_test)
# Write predictions
write_predictions(dev_out_path, dev_sentences, dev_predictions, idx2tag)
write_predictions(test_out_path, test_sentences, test_predictions, idx2tag)