challenging-america-word-ga.../lab5.ipynb

8.3 KiB

import lzma
import pickle
from collections import Counter 
def clean_line(line):
    prefix = line.split('\t')[6].replace(r'\n', ' ')
    suffix = line.split('\t')[7].replace(r'\n', ' ')
    return f'{prefix} {suffix}'
def get_words(filename):
    with lzma.open(filename, mode='rt', encoding='utf-8') as file:
        count = 1
        print('Words')
        for line in file:
            print(f'\rProgress: {(count / 432022 * 100):2f}%', end='')
            text = clean_line(line)
            for word in text.split():
                yield word
            count += 1
        print()
def get_bigrams(filename, V):
    with lzma.open(filename, mode='rt', encoding='utf-8') as file:
        count = 1
        print('Bigrams')
        for line in file:
            print(f'\rProgress: {(count / 432022 * 100):2f}%', end='')
            text = clean_line(line)
            first_word = ''
            for second_word in text.split():
                if V.get(second_word) is None:
                    second_word = 'UNK'
                if second_word:
                    yield first_word, second_word
                first_word = second_word
            count += 1
        print()

def get_trigrams(filename, V):
    with lzma.open(filename, mode='rt', encoding='utf-8') as file:
        count = 1
        print('Trigrams')
        for line in file:
            print(f'\rProgress: {(count / 432022 * 100):2f}%', end='')
            text = clean_line(line)
            first_word = ''
            second_word = ''
            for third_word in text.split():
                if V.get(third_word) is None:
                    third_word = 'UNK'
                if first_word:
                    yield first_word, second_word, third_word
                first_word = second_word
                second_word = third_word
            count += 1
        print()

WORD_LIMIT = 3000
V = Counter(get_words('train/in.tsv.xz'))
V_common_dict = dict(V.most_common(WORD_LIMIT))
UNK = 0
for key, value in V.items():
    if V_common_dict.get(key) is None:
        UNK += value
V_common_dict['UNK'] = UNK
with open('V.pickle', 'wb') as handle:
    pickle.dump(V_common_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)
    
with open('V.pickle', 'rb') as handle:
    V_common_dict = pickle.load(handle)

total = sum(V_common_dict.values())
Words
Progress: 100.000000%
V2 = Counter(get_bigrams('train/in.tsv.xz', V_common_dict))
V2_dict = dict(V2)
with open('V2.pickle', 'wb') as handle:
    pickle.dump(V2_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)

with open('V2.pickle', 'rb') as handle:
    V2_dict = pickle.load(handle)
Bigrams
Progress: 100.000000%
V3 = Counter(get_trigrams('train/in.tsv.xz', V_common_dict))
V3_dict = dict(V3)
with open('V3.pickle', 'wb') as handle:
    pickle.dump(V3_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)

with open('V3.pickle', 'rb') as handle:
    V3_dict = pickle.load(handle)
Trigrams
Progress: 100.000000%

def calculate_probability(first_word, second_word=None, third_word=None):
    try:
        if second_word is None:
            return V_common_dict[first_word] / total
        if third_word is None:
            return V2_dict[(first_word, second_word)] / V_common_dict[first_word]
        else:
            return V3_dict[(first_word, second_word, third_word)] / V2_dict[(first_word, second_word)]
    except KeyError:
        return 0
def smoothed(trigrams):
    first, second, third = trigrams
    return 0.6 * calculate_probability(first, second, third) + 0.25 * calculate_probability(second, third) + 0.15 * calculate_probability(
        third)


def candidates(left_context, right_context):
    cand = {}
    first, second= left_context
    fourth, fifth  = right_context
    for word in V_common_dict:
        p1 = smoothed((first, second, word))
        p2 = smoothed((second,  word, fourth))
        p3 = smoothed((word,  fourth,fifth))
        cand[word] = p1 * p2 * p3 
    cand = sorted(list(cand.items()), key=lambda x: x[1], reverse=True)[:5]
    norm = [(x[0], float(x[1]) / sum([y[1] for y in cand])) for x in cand]
    for index, elem in enumerate(norm):
        unk = None
        if 'UNK' in elem:
            unk = norm.pop(index)
            norm.append(('', unk[1]))
            break
    if unk is None:
        norm[-1] = ('', norm[-1][1])
    return ' '.join([f'{x[0]}:{x[1]}' for x in norm])
def create_outputs(folder_name):
    print(f'Creating outputs in {folder_name}')
    with lzma.open(f'{folder_name}/in.tsv.xz', mode='rt', encoding='utf-8') as fid:
        with open(f'{folder_name}/out.tsv', 'w', encoding='utf-8') as f:
            for line in fid:
                separated = line.split('\t')
                prefix = separated[6].replace(r'\n', ' ').split()
                suffix = separated[7].replace(r'\n', ' ').split()
                left_context = [x if V_common_dict.get(x) else 'UNK' for x in prefix[-2:]]
                right_context = [x if V_common_dict.get(x) else 'UNK' for x in suffix[:2]]
                w = candidates(left_context, right_context)
                f.write(w + '\n')
create_outputs('dev-0')
create_outputs('test-A')
Creating outputs in dev-0
Creating outputs in test-A