en-ner-conll-2003/solution.ipynb

11 KiB

import re
import pandas as pd
from transformers import pipeline
def correct_labels(input_file, output_file):
    df = pd.read_csv(input_file, sep="\t", names=["Text"])

    corrected_lines = []

    for line in df["Text"]:
        tokens = line.split(" ")
        corrected_tokens = []
        previous_token = "O"

        for token in tokens:
            if (
                token == "I-ORG"
                and previous_token != "B-ORG"
                and previous_token != "I-ORG"
            ):
                corrected_tokens.append("B-ORG")
            elif (
                token == "I-PER"
                and previous_token != "B-PER"
                and previous_token != "I-PER"
            ):
                corrected_tokens.append("B-PER")
            elif (
                token == "I-LOC"
                and previous_token != "B-LOC"
                and previous_token != "I-LOC"
            ):
                corrected_tokens.append("B-LOC")
            elif (
                token == "I-MISC"
                and previous_token != "B-MISC"
                and previous_token != "I-MISC"
            ):
                corrected_tokens.append("B-MISC")
            else:
                corrected_tokens.append(token)

            previous_token = token

        corrected_line = " ".join(corrected_tokens)
        corrected_lines.append(corrected_line)

    df["Text"] = corrected_lines
    df.to_csv(output_file, sep="\t", index=False, header=False)
ner_model = pipeline("ner", model = 'dbmdz/bert-large-cased-finetuned-conll03-english')
WARNING:tensorflow:From C:\Users\48690\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\tf_keras\src\losses.py:2976: The name tf.losses.sparse_softmax_cross_entropy is deprecated. Please use tf.compat.v1.losses.sparse_softmax_cross_entropy instead.

model.safetensors:   0%|          | 0.00/1.33G [00:00<?, ?B/s]
Some weights of the model checkpoint at dbmdz/bert-large-cased-finetuned-conll03-english were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
tokenizer_config.json:   0%|          | 0.00/60.0 [00:00<?, ?B/s]
vocab.txt:   0%|          | 0.00/213k [00:00<?, ?B/s]
def get_word_indices(string_to_search):
    pattern = "\s\S"
    matches = re.finditer(pattern, string_to_search)
    indices = [m.start(0) + 1 for m in matches]
    if not string_to_search[0].isspace():
        indices.insert(0, 0)
    return sorted(indices)

def get_word_beginning(string_to_search, letter_index):
    while letter_index > 0 and string_to_search[letter_index - 1] != " ":
        letter_index -= 1
    return letter_index

def wordpiece_tokenization(ner_tokenized, original_sentence):
    word_start_index_to_tag = {}
    formatted_results = []
    previous_tag = "O"

    for result in ner_tokenized:
        word = result["word"].replace("##", "")
        start, end = result["start"], result["start"] + len(word)

        if formatted_results and (original_sentence[result["start"] - 1] != " " or result["word"].startswith("##")):
            formatted_results[-1]["end"] = end
            formatted_results[-1]["word"] += word
        else:
            result["word"] = word
            result["start"] = get_word_beginning(original_sentence, start)
            result["end"] = end
            formatted_results.append(result)

    for result in formatted_results:
        start_index = result["start"]
        tag = result["entity"]

        if tag != "O":
            if previous_tag != tag:
                tag = f"B-{tag.split('-')[-1]}"
            else:
                tag = f"I-{tag.split('-')[-1]}"
        word_start_index_to_tag[start_index] = tag
        previous_tag = result["entity"]

    for index in get_word_indices(original_sentence):
        word_start_index_to_tag.setdefault(index, "O")

    return [word_start_index_to_tag[index] for index in sorted(word_start_index_to_tag.keys())]
from tqdm import tqdm

def get_input_file(input_file):
    with open(input_file, "r", encoding="utf-8") as f:
        original_sentences = f.readlines()
    return original_sentences

def save_output_file(output_file, processed_data):
    with open(output_file, "w", encoding="utf-8") as f:
        for line in processed_data:
            f.write(f"{line}\n")

def tokenize_file(input_file, output_file):
    original_sentences = get_input_file(input_file)

    processed_data = []
    for raw_sentence in tqdm(original_sentences, desc=f"Processing {input_file}"):
        model_out = ner_model(raw_sentence.strip())
        word_tokenization = wordpiece_tokenization(model_out, raw_sentence.strip())
        processed_line = " ".join(word_tokenization)
        processed_data.append(processed_line)
    
    save_output_file(output_file, processed_data)
tokenize_file("dev-0/in.tsv", "dev-0/out.tsv")
Processing dev-0/in.tsv: 100%|██████████| 215/215 [11:57<00:00,  3.34s/it]
tokenize_file("test-A/in.tsv", "test-A/out.tsv")
Processing test-A/in.tsv: 100%|██████████| 230/230 [12:39<00:00,  3.30s/it]
input_file = "dev-0/out.tsv"
output_file = "dev-0/out.tsv"
correct_labels(input_file, output_file)
input_file = "test-A/out.tsv"
output_file = "test-A/out.tsv"
correct_labels(input_file, output_file)
def dev0_accuracy():
    out_file = "dev-0/out.tsv"
    expected_file = "dev-0/expected.tsv"

    with open(out_file, "r", encoding="utf-8") as f:
        out_lines = f.readlines()
    
    with open(expected_file, "r", encoding="utf-8") as f:
        expected_lines = f.readlines()
    
    all_tags = 0
    correct_tags = 0

    for i in range(len(out_lines)):
        out_tags = out_lines[i].split()
        expected_tags = expected_lines[i].split()

        all_tags += len(expected_tags)
        correct_tags += sum(a == b for a, b in zip(out_tags, expected_tags))
    
    accuracy = correct_tags / all_tags
    print(f"Accuracy for dev0: {accuracy:.4f}")

dev0_accuracy()
Accuracy for dev0: 0.9566