243 lines
8.9 KiB
Python
Executable File
243 lines
8.9 KiB
Python
Executable File
#!/usr/bin/python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import psycopg2, os, sys, shutil, subprocess, urllib3, time, json, logging, errno
|
|
|
|
from importlib.machinery import SourceFileLoader
|
|
|
|
def postJson(address, data):
|
|
http = urllib3.PoolManager()
|
|
response = http.request('POST', address, headers={'Content-Type': 'application/json'},body=json.dumps(data).encode('utf-8'))
|
|
|
|
return json.loads(response.data.decode('utf-8'))
|
|
|
|
|
|
def add_examples(address, examplesData):
|
|
response = postJson(address, examplesData)
|
|
if response['status'] == 'error':
|
|
raise Exception(response['message'])
|
|
|
|
def file_len(fname):
|
|
with open(fname) as f:
|
|
for i, l in enumerate(f):
|
|
pass
|
|
return i + 1
|
|
|
|
|
|
def pid_exists(pid):
|
|
"""Check whether pid exists in the current process table.
|
|
UNIX only.
|
|
"""
|
|
if pid < 0:
|
|
return False
|
|
if pid == 0:
|
|
# According to "man 2 kill" PID 0 refers to every process
|
|
# in the process group of the calling process.
|
|
# On certain systems 0 is a valid PID but we have no way
|
|
# to know that in a portable fashion.
|
|
raise ValueError('invalid PID 0')
|
|
try:
|
|
os.kill(pid, 0)
|
|
except OSError as err:
|
|
if err.errno == errno.ESRCH:
|
|
# ESRCH == No such process
|
|
return False
|
|
elif err.errno == errno.EPERM:
|
|
# EPERM clearly means there's a process to deny access to
|
|
return True
|
|
else:
|
|
# According to "man 2 kill" possible error values are
|
|
# (EINVAL, EPERM, ESRCH)
|
|
raise
|
|
else:
|
|
return True
|
|
|
|
def updateRequestStatus(connection_string, request_id, new_status):
|
|
conn = psycopg2.connect(connection_string)
|
|
cur = conn.cursor()
|
|
cur.execute("UPDATE request SET status=%s WHERE id=%s", (new_status, request_id))
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
def getPendingRequest(connection_string):
|
|
conn = psycopg2.connect(connection_string)
|
|
cur = conn.cursor()
|
|
cur.execute("""select request.id, request.source_file_path, request.target_file_path, request.name, src_lang.id as src_lang_id, src_lang.code as src_code, trg_lang.id as trg_lang_id, trg_lang.code as trg_code, request.status, request.type, request.tm_id from request inner join language as src_lang on src_lang.id = request.source_lang_id inner join language as trg_lang on trg_lang.id = request.target_lang_id where request.status = 0 order by request.created limit 1""")
|
|
|
|
request = cur.fetchone()
|
|
cur.close()
|
|
conn.close()
|
|
return request
|
|
|
|
def retrieve_and_append(connection_string, tm_id, src_file_path, trg_file_path):
|
|
conn = psycopg2.connect(connection_string)
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT source_segment, target_segment FROM unit WHERE tm_id=%d" % tm_id)
|
|
with open(src_file_path, 'a') as src_file, open(trg_file_path, 'a') as trg_file:
|
|
for row in cur:
|
|
src_file.write(row[0]+'\n')
|
|
trg_file.write(row[1]+'\n')
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
def clearTm(connection_string, tm_id):
|
|
conn = psycopg2.connect(connection_string)
|
|
cur = conn.cursor()
|
|
cur.execute("DELETE FROM unit WHERE tm_id=%d" %tm_id)
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
def getLangsFromTM(connection_string, tm_id):
|
|
conn = psycopg2.connect(connection_string)
|
|
cur = conn.cursor()
|
|
cur.execute("select src_lang.code, trg_lang.code from tm inner join language as src_lang on tm.source_lang_id = src_lang.id inner join language as trg_lang on tm.target_lang_id = trg_lang.id where tm.id = %d" % tm_id)
|
|
langs = cur.fetchone()
|
|
cur.close()
|
|
conn.close()
|
|
return langs
|
|
|
|
# ----------- main -------------------------------------------------------------
|
|
|
|
BUFFER_SIZE = 500
|
|
|
|
src_dir = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
logger = logging.getLogger('request_handler')
|
|
hdlr = logging.FileHandler(src_dir+'/request_handler.log')
|
|
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
|
|
hdlr.setFormatter(formatter)
|
|
logger.addHandler(hdlr)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
pidfile_path = src_dir+'/request_handler.pid'
|
|
if os.path.isfile(pidfile_path):
|
|
with open(pidfile_path) as pidfile:
|
|
pid = int(pidfile.readline().strip())
|
|
logger.info('Found pid file with pid: %d' % pid)
|
|
if pid_exists(pid):
|
|
logger.info('The process with pid=%d is actually running, exiting' % pid)
|
|
sys.exit(0)
|
|
else:
|
|
logger.warning('The process with pid=%d is not running anymore. Ignoring the pid file.' % pid)
|
|
|
|
with open(pidfile_path, 'w') as pidfile:
|
|
pidfile.write(str(os.getpid()))
|
|
|
|
mgiza_path = src_dir+'/../mgiza-aligner'
|
|
|
|
connection_string = "dbname='concordia_server' user='concordia' host='localhost' port='6543' password='concordia'"
|
|
|
|
try:
|
|
request = getPendingRequest(connection_string)
|
|
if request is not None:
|
|
request_id, src_file_path, trg_file_path, tm_name, src_lang_id, src_lang_code, trg_lang_id, trg_lang_code, status, tm_type, tm_id = request
|
|
|
|
if tm_type == 1:
|
|
src_lang_code, trg_lang_code = getLangsFromTM(connection_string, tm_id)
|
|
|
|
logger.info("Working on request: %d. Starting the mgiza aligner." % request_id)
|
|
updateRequestStatus(connection_string, request_id, 1)
|
|
|
|
request_corpus_path = mgiza_path+'/corpora/request_'+str(request_id)
|
|
os.makedirs(request_corpus_path)
|
|
|
|
shutil.copy(src_file_path, request_corpus_path+'/src.txt')
|
|
shutil.copy(trg_file_path, request_corpus_path+'/trg.txt')
|
|
|
|
if tm_type == 1:
|
|
# extending existing TM. We must retrieve existing units from the db.
|
|
logger.info("This is an extend request. Appending units from the tm %d to the src and trg files."%tm_id)
|
|
retrieve_and_append(connection_string, tm_id, request_corpus_path+'/src.txt', request_corpus_path+'/trg.txt')
|
|
|
|
subprocess.run(["make","SRC_LANG="+src_lang_code, "TRG_LANG="+trg_lang_code, "CORPUS_NAME=request_"+str(request_id)], cwd=mgiza_path, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
logger.info("Mgiza alignment complete, starting adding the TM to Concordia.")
|
|
updateRequestStatus(connection_string, request_id, 2)
|
|
|
|
host = SourceFileLoader("host", os.path.dirname(os.path.realpath(__file__))+'/../tests/host.py').load_module()
|
|
address = 'http://'+host.concordia_host
|
|
if len(host.concordia_port) > 0:
|
|
address += ':'+host.concordia_port
|
|
|
|
|
|
sourceFile = request_corpus_path+'/src_final.txt'
|
|
targetFile = request_corpus_path+'/trg_final.txt'
|
|
alignmentsFile = request_corpus_path+'/aligned_final.txt'
|
|
|
|
if (file_len(sourceFile) != file_len(targetFile)):
|
|
raise Exception("source and target files are not of the same length!")
|
|
|
|
if (file_len(alignmentsFile) != 3*file_len(sourceFile)):
|
|
raise Exception("alignments file is not exactly 3 times longer than source and target")
|
|
|
|
if tm_type == 0:
|
|
# create new TM
|
|
data = {
|
|
'operation': 'addTm',
|
|
'sourceLangId':src_lang_id,
|
|
'targetLangId':trg_lang_id,
|
|
'name':tm_name,
|
|
'tmLemmatized':True
|
|
}
|
|
|
|
response = postJson(address, data)
|
|
|
|
tmId = int(response['newTmId'])
|
|
logger.info("Added new tm: %d" % tmId)
|
|
else:
|
|
clearTm(connection_string, tm_id)
|
|
tmId = tm_id
|
|
|
|
data = {
|
|
'operation': 'addAlignedLemmatizedSentences',
|
|
'tmId':tmId
|
|
}
|
|
|
|
examples = []
|
|
with open(sourceFile) as sf, open(targetFile) as tf, open(alignmentsFile) as af:
|
|
for sourceLine in sf:
|
|
sourceSentence = sourceLine.strip()
|
|
targetSentence = tf.readline().strip()
|
|
|
|
# skip to lines of the alignments file, these are lemmatized and we need the raw sentences from the source and target files.
|
|
af.readline()
|
|
af.readline()
|
|
|
|
alignmentString = af.readline().strip()
|
|
|
|
examples.append([sourceSentence, targetSentence, alignmentString])
|
|
|
|
if len(examples) >= BUFFER_SIZE:
|
|
data['examples'] = examples
|
|
add_examples(address, data)
|
|
examples = []
|
|
|
|
if len(examples) > 0:
|
|
data['examples'] = examples
|
|
add_examples(address, data)
|
|
|
|
updateRequestStatus(connection_string, request_id, 3)
|
|
logger.info("Generating index...")
|
|
start = time.time()
|
|
data = {
|
|
'operation': 'refreshIndex',
|
|
'tmId' : tmId
|
|
}
|
|
|
|
response = postJson(address, data)
|
|
|
|
end = time.time()
|
|
logger.info("Index regeneration complete. The operation took %.4f s" % (end - start))
|
|
|
|
except Exception as e:
|
|
logger.error("Error occured: "+str(e))
|
|
updateRequestStatus(connection_string, request_id, 5)
|
|
else:
|
|
updateRequestStatus(connection_string, request_id, 4)
|
|
finally:
|
|
os.remove(pidfile_path)
|