2017-06-25 23:16:43 +02:00
#!/usr/bin/python3
# -*- coding: utf-8 -*-
2017-06-26 13:45:10 +02:00
import psycopg2 , os , sys , shutil , subprocess , urllib3 , time , json , logging , errno
2017-06-25 23:16:43 +02:00
from importlib . machinery import SourceFileLoader
def postJson ( address , data ) :
http = urllib3 . PoolManager ( )
response = http . request ( ' POST ' , address , headers = { ' Content-Type ' : ' application/json ' } , body = json . dumps ( data ) . encode ( ' utf-8 ' ) )
return json . loads ( response . data . decode ( ' utf-8 ' ) )
def add_examples ( address , examplesData ) :
response = postJson ( address , examplesData )
if response [ ' status ' ] == ' error ' :
raise Exception ( response [ ' message ' ] )
def file_len ( fname ) :
with open ( fname ) as f :
for i , l in enumerate ( f ) :
pass
return i + 1
2017-06-26 13:45:10 +02:00
def pid_exists ( pid ) :
""" Check whether pid exists in the current process table.
UNIX only .
"""
if pid < 0 :
return False
if pid == 0 :
# According to "man 2 kill" PID 0 refers to every process
# in the process group of the calling process.
# On certain systems 0 is a valid PID but we have no way
# to know that in a portable fashion.
raise ValueError ( ' invalid PID 0 ' )
try :
os . kill ( pid , 0 )
except OSError as err :
if err . errno == errno . ESRCH :
# ESRCH == No such process
return False
elif err . errno == errno . EPERM :
# EPERM clearly means there's a process to deny access to
return True
else :
# According to "man 2 kill" possible error values are
# (EINVAL, EPERM, ESRCH)
raise
else :
return True
def updateRequestStatus ( connection_string , request_id , new_status ) :
conn = psycopg2 . connect ( connection_string )
cur = conn . cursor ( )
cur . execute ( " UPDATE request SET status= %s WHERE id= %s " , ( new_status , request_id ) )
conn . commit ( )
cur . close ( )
conn . close ( )
def getPendingRequest ( connection_string ) :
conn = psycopg2 . connect ( connection_string )
cur = conn . cursor ( )
cur . execute ( """ select request.id, request.source_file_path, request.target_file_path, request.name, src_lang.id as src_lang_id, src_lang.code as src_code, trg_lang.id as trg_lang_id, trg_lang.code as trg_code, request.status, request.type, request.tm_id from request inner join language as src_lang on src_lang.id = request.source_lang_id inner join language as trg_lang on trg_lang.id = request.target_lang_id where request.status = 0 order by request.created limit 1 """ )
request = cur . fetchone ( )
cur . close ( )
conn . close ( )
return request
def retrieve_and_append ( connection_string , tm_id , src_file_path , trg_file_path ) :
conn = psycopg2 . connect ( connection_string )
cur = conn . cursor ( )
cur . execute ( " SELECT source_segment, target_segment FROM unit WHERE tm_id= %d " % tm_id )
with open ( src_file_path , ' a ' ) as src_file , open ( trg_file_path , ' a ' ) as trg_file :
for row in cur :
src_file . write ( row [ 0 ] + ' \n ' )
trg_file . write ( row [ 1 ] + ' \n ' )
cur . close ( )
conn . close ( )
def clearTm ( connection_string , tm_id ) :
conn = psycopg2 . connect ( connection_string )
cur = conn . cursor ( )
cur . execute ( " DELETE FROM unit WHERE tm_id= %d " % tm_id )
conn . commit ( )
cur . close ( )
conn . close ( )
def getLangsFromTM ( connection_string , tm_id ) :
conn = psycopg2 . connect ( connection_string )
cur = conn . cursor ( )
cur . execute ( " select src_lang.code, trg_lang.code from tm inner join language as src_lang on tm.source_lang_id = src_lang.id inner join language as trg_lang on tm.target_lang_id = trg_lang.id where tm.id = %d " % tm_id )
langs = cur . fetchone ( )
cur . close ( )
conn . close ( )
return langs
# ----------- main -------------------------------------------------------------
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
BUFFER_SIZE = 500
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
src_dir = os . path . dirname ( os . path . realpath ( __file__ ) )
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
logger = logging . getLogger ( ' request_handler ' )
hdlr = logging . FileHandler ( src_dir + ' /request_handler.log ' )
formatter = logging . Formatter ( ' %(asctime)s %(levelname)s %(message)s ' )
hdlr . setFormatter ( formatter )
logger . addHandler ( hdlr )
logger . setLevel ( logging . INFO )
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
pidfile_path = src_dir + ' /request_handler.pid '
if os . path . isfile ( pidfile_path ) :
with open ( pidfile_path ) as pidfile :
pid = int ( pidfile . readline ( ) . strip ( ) )
logger . info ( ' Found pid file with pid: %d ' % pid )
if pid_exists ( pid ) :
logger . info ( ' The process with pid= %d is actually running, exiting ' % pid )
sys . exit ( 0 )
else :
logger . warning ( ' The process with pid= %d is not running anymore. Ignoring the pid file. ' % pid )
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
with open ( pidfile_path , ' w ' ) as pidfile :
pidfile . write ( str ( os . getpid ( ) ) )
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
mgiza_path = src_dir + ' /../mgiza-aligner '
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
connection_string = " dbname= ' concordia_server ' user= ' concordia ' host= ' localhost ' port= ' 6543 ' password= ' concordia ' "
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
try :
request = getPendingRequest ( connection_string )
if request is not None :
request_id , src_file_path , trg_file_path , tm_name , src_lang_id , src_lang_code , trg_lang_id , trg_lang_code , status , tm_type , tm_id = request
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
if tm_type == 1 :
src_lang_code , trg_lang_code = getLangsFromTM ( connection_string , tm_id )
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
logger . info ( " Working on request: %d . Starting the mgiza aligner. " % request_id )
updateRequestStatus ( connection_string , request_id , 1 )
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
request_corpus_path = mgiza_path + ' /corpora/request_ ' + str ( request_id )
os . makedirs ( request_corpus_path )
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
shutil . copy ( src_file_path , request_corpus_path + ' /src.txt ' )
shutil . copy ( trg_file_path , request_corpus_path + ' /trg.txt ' )
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
if tm_type == 1 :
# extending existing TM. We must retrieve existing units from the db.
logger . info ( " This is an extend request. Appending units from the tm %d to the src and trg files. " % tm_id )
retrieve_and_append ( connection_string , tm_id , request_corpus_path + ' /src.txt ' , request_corpus_path + ' /trg.txt ' )
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
subprocess . run ( [ " make " , " SRC_LANG= " + src_lang_code , " TRG_LANG= " + trg_lang_code , " CORPUS_NAME=request_ " + str ( request_id ) ] , cwd = mgiza_path , check = True , stdout = subprocess . DEVNULL , stderr = subprocess . DEVNULL )
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
logger . info ( " Mgiza alignment complete, starting adding the TM to Concordia. " )
updateRequestStatus ( connection_string , request_id , 2 )
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
host = SourceFileLoader ( " host " , os . path . dirname ( os . path . realpath ( __file__ ) ) + ' /../tests/host.py ' ) . load_module ( )
address = ' http:// ' + host . concordia_host
if len ( host . concordia_port ) > 0 :
address + = ' : ' + host . concordia_port
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
sourceFile = request_corpus_path + ' /src_final.txt '
targetFile = request_corpus_path + ' /trg_final.txt '
alignmentsFile = request_corpus_path + ' /aligned_final.txt '
if ( file_len ( sourceFile ) != file_len ( targetFile ) ) :
raise Exception ( " source and target files are not of the same length! " )
if ( file_len ( alignmentsFile ) != 3 * file_len ( sourceFile ) ) :
raise Exception ( " alignments file is not exactly 3 times longer than source and target " )
if tm_type == 0 :
# create new TM
data = {
' operation ' : ' addTm ' ,
' sourceLangId ' : src_lang_id ,
' targetLangId ' : trg_lang_id ,
' name ' : tm_name ,
' tmLemmatized ' : True
}
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
response = postJson ( address , data )
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
tmId = int ( response [ ' newTmId ' ] )
logger . info ( " Added new tm: %d " % tmId )
else :
clearTm ( connection_string , tm_id )
tmId = tm_id
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
data = {
' operation ' : ' addAlignedLemmatizedSentences ' ,
' tmId ' : tmId
}
examples = [ ]
with open ( sourceFile ) as sf , open ( targetFile ) as tf , open ( alignmentsFile ) as af :
for sourceLine in sf :
sourceSentence = sourceLine . strip ( )
targetSentence = tf . readline ( ) . strip ( )
# skip to lines of the alignments file, these are lemmatized and we need the raw sentences from the source and target files.
af . readline ( )
af . readline ( )
alignmentString = af . readline ( ) . strip ( )
examples . append ( [ sourceSentence , targetSentence , alignmentString ] )
if len ( examples ) > = BUFFER_SIZE :
data [ ' examples ' ] = examples
add_examples ( address , data )
examples = [ ]
if len ( examples ) > 0 :
data [ ' examples ' ] = examples
add_examples ( address , data )
2017-06-25 23:16:43 +02:00
2017-06-26 13:45:10 +02:00
updateRequestStatus ( connection_string , request_id , 3 )
logger . info ( " Generating index... " )
start = time . time ( )
data = {
' operation ' : ' refreshIndex ' ,
' tmId ' : tmId
}
response = postJson ( address , data )
end = time . time ( )
logger . info ( " Index regeneration complete. The operation took %.4f s " % ( end - start ) )
except Exception as e :
logger . error ( " Error occured: " + str ( e ) )
updateRequestStatus ( connection_string , request_id , 5 )
else :
updateRequestStatus ( connection_string , request_id , 4 )
finally :
os . remove ( pidfile_path )