mgr/preprare_corpus.py
2021-06-21 20:29:44 +02:00

100 lines
3.3 KiB
Python

import csv
import pandas as pd
import europarl
from sklearn.model_selection import train_test_split
# todo: add a parameter for probability of injection
def inject_translations(corpus, dictionary):
llist = []
corpus = strip_lower(corpus)
ctr = 0
for idx, sentence in enumerate(corpus):
possible_translations = []
for key in list(dictionary):
# todo: approximate matching
if (space_wrap(sentence)).find(space_wrap(key)) != -1:
possible_translations.append(key)
ctr += 1
if len(possible_translations) > 0:
chosen_key = choose_translation(possible_translations)
llist.append(add_translation(sentence, chosen_key, dictionary[chosen_key]))
else:
llist.append(sentence)
if idx % 50000 == 0:
print(idx)
print(f'injected {ctr} words.')
return llist
def strip_lower(corpus):
return [strip(sentence.lower()) for sentence in corpus]
def strip(sentence):
chars = '`~!@#$%^&*()-_=+[{]}\\|;:\'\",<.>/?'
for char in chars:
sentence = sentence.replace(char, '')
return sentence
def add_translation(sen, key, value):
return sen[:space_wrap(sen).find(key) + len(key) - 1] + ' ' + value + sen[space_wrap(sen).find(key) + len(key) - 1:]
def choose_translation(translations):
return sorted(translations, key=lambda x: len(x.split(' ')), reverse=True)[0]
def space_wrap(word):
return ' ' + word + ' '
language_code = 'pl'
europarl.maybe_download_and_extract(language_code=language_code)
data_src = europarl.load_data(english=True, language_code=language_code)
data_dest = europarl.load_data(english=False,
language_code=language_code)
test_size = 0.25
df_dict = pd.read_csv('kompendium.tsv', sep='\t', header=None, index_col=0)
# todo: divide dictionary with hashing
dtr, dts = train_test_split(df_dict, test_size=test_size, shuffle=True, random_state=42)
print('dictionary len: ', len(df_dict))
print('train dictionary len: ', len(dtr))
print('test dictionary len: ', len(dts))
pd.DataFrame(dtr).to_csv('data/dictionary_train.csv', header=False)
pd.DataFrame(dts).to_csv('data/dictionary_test.csv', header=False)
dict_reader_tr = csv.reader(open('data/dictionary_train.csv', 'r'))
dictionary_train = {}
for row in dict_reader_tr:
k, v = row
dictionary_train[k] = v
dict_reader_ts = csv.reader(open('data/dictionary_test.csv', 'r'))
dictionary_test = {}
for row in dict_reader_ts:
k, v = row
dictionary_test[k] = v
# todo: divide data with hashing
data_src_train, data_src_test, data_dest_train, data_dest_test = \
train_test_split(data_src, data_dest, test_size=test_size, random_state=42)
print('data len: ', len(data_src))
print('train len: ', len(data_src_train))
print('test len: ', len(data_src_test))
data_src_train = inject_translations(data_src_train, dictionary_train)
data_src_test = inject_translations(data_src_test, dictionary_test)
pd.DataFrame(data_src_train).to_csv('data/orig/train.en', header=False, index=False)
pd.DataFrame(data_src_test).to_csv('data/orig/test.en', header=False, index=False)
pd.DataFrame(data_dest_train).to_csv('data/orig/train.pl', header=False, index=False)
pd.DataFrame(data_dest_test).to_csv('data/orig/test.pl', header=False, index=False)