import pandas as pd import numpy as np import tensorflow as tf from tensorflow.keras.preprocessing.sequence import pad_sequences from tensorflow.keras.utils import to_categorical from sklearn.metrics import classification_report import sys # Ustawienie kodowania na utf-8 sys.stdout.reconfigure(encoding='utf-8') # Load data def load_data(file_path): with open(file_path, 'r', encoding='utf-8') as file: lines = file.readlines() sentences = [line.strip() for line in lines] return sentences train_data = pd.read_csv('./train/train.tsv', sep='\t', header=None, names=['label', 'sentence'], encoding='utf-8') dev_sentences = load_data('./dev-0/in.tsv') dev_labels = load_data('./dev-0/expected.tsv') test_sentences = load_data('./test-A/in.tsv') # Preprocess data def preprocess_data(sentences, labels=None): tokenized_sentences = [sentence.split() for sentence in sentences] if labels is not None: tokenized_labels = [label.split() for label in labels] return tokenized_sentences, tokenized_labels return tokenized_sentences train_sentences, train_labels = preprocess_data(train_data['sentence'].values, train_data['label'].values) dev_sentences, dev_labels = preprocess_data(dev_sentences, dev_labels) test_sentences = preprocess_data(test_sentences) # Create a word index and label index special_tokens = ['', '', '', ''] word2idx = {w: i + len(special_tokens) for i, w in enumerate(set(word for sentence in train_sentences for word in sentence))} for i, token in enumerate(special_tokens): word2idx[token] = i idx2word = {i: w for w, i in word2idx.items()} label2idx = { 'O': 0, 'B-PER': 1, 'I-PER': 2, 'B-ORG': 3, 'I-ORG': 4, 'B-LOC': 5, 'I-LOC': 6, 'B-MISC': 7, 'I-MISC': 8 } idx2label = {i: l for l, i in label2idx.items()} # Convert words and labels to integers def encode_data(sentences, labels=None): encoded_sentences = [[word2idx.get(word, word2idx['']) for word in sentence] for sentence in sentences] if labels is not None: encoded_labels = [[label2idx[label] for label in label_list] for label_list in labels] return encoded_sentences, encoded_labels return encoded_sentences X_train, y_train = encode_data(train_sentences, train_labels) X_dev, y_dev = encode_data(dev_sentences, dev_labels) X_test = encode_data(test_sentences) # Limit sequence length to avoid excessive memory usage max_len = 1000 # You can adjust this value to a reasonable limit based on your data and memory X_train = pad_sequences(X_train, padding='post', maxlen=max_len) y_train = pad_sequences(y_train, padding='post', maxlen=max_len) X_dev = pad_sequences(X_dev, padding='post', maxlen=max_len) y_dev = pad_sequences(y_dev, padding='post', maxlen=max_len) X_test = pad_sequences(X_test, padding='post', maxlen=max_len) y_train = [to_categorical(i, num_classes=len(label2idx)) for i in y_train] y_dev = [to_categorical(i, num_classes=len(label2idx)) for i in y_dev] # Define the model with reduced complexity model = tf.keras.models.Sequential([ tf.keras.layers.Embedding(input_dim=len(word2idx), output_dim=64, input_length=max_len), tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=32, return_sequences=True)), tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(len(label2idx), activation='softmax')) ]) model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy']) # Train the model with a smaller batch size history = model.fit(X_train, np.array(y_train), validation_data=(X_dev, np.array(y_dev)), epochs=25, batch_size=16) # Evaluate the model y_pred = model.predict(X_dev) y_pred = np.argmax(y_pred, axis=-1) y_true = np.argmax(np.array(y_dev), axis=-1) # Map predictions and true labels to their original tags y_pred_tags = [[idx2label[i] for i in row] for row in y_pred] y_true_tags = [[idx2label[i] for i in row] for row in y_true] # Print the classification report using UTF-8 encoding print(classification_report( [item for sublist in y_true_tags for item in sublist], [item for sublist in y_pred_tags for item in sublist], labels=list(label2idx.values())[1:], # Exclude the padding label from the report target_names=[idx2label[i] for i in list(label2idx.values())[1:]] )) # Correct IOB labels function def correct_iob_labels(predictions): corrected = [] for pred in predictions: corrected_sentence = [] prev_label = 'O' for label in pred: if label.startswith('I-') and (prev_label == 'O' or prev_label[2:] != label[2:]): corrected_sentence.append('B-' + label[2:]) else: corrected_sentence.append(label) prev_label = corrected_sentence[-1] corrected.append(corrected_sentence) return corrected # Predict on test data y_test_pred = model.predict(X_test) y_test_pred = np.argmax(y_test_pred, axis=-1) y_test_pred_tags = [[idx2label[i] for i in row] for row in y_test_pred] # Correct the predicted tags y_pred_tags_corrected = correct_iob_labels(y_pred_tags) y_test_pred_tags_corrected = correct_iob_labels(y_test_pred_tags) # Save dev predictions dev_predictions = [' '.join(tags) for tags in y_pred_tags_corrected] with open('./dev0/out.tsv', 'w', encoding='utf-8') as f: for prediction in dev_predictions: f.write("%s\n" % prediction) # Save test predictions test_predictions = [' '.join(tags) for tags in y_test_pred_tags_corrected] with open('./testA/out.tsv', 'w', encoding='utf-8') as f: for prediction in test_predictions: f.write("%s\n" % prediction)