#!/usr/bin/python3 # -*- coding: utf-8 -*- import psycopg2, os, sys, shutil, subprocess, urllib3, time, json, logging, errno from importlib.machinery import SourceFileLoader def postJson(address, data): http = urllib3.PoolManager() response = http.request('POST', address, headers={'Content-Type': 'application/json'},body=json.dumps(data).encode('utf-8')) return json.loads(response.data.decode('utf-8')) def add_examples(address, examplesData): response = postJson(address, examplesData) if response['status'] == 'error': raise Exception(response['message']) def file_len(fname): with open(fname) as f: for i, l in enumerate(f): pass return i + 1 def pid_exists(pid): """Check whether pid exists in the current process table. UNIX only. """ if pid < 0: return False if pid == 0: # According to "man 2 kill" PID 0 refers to every process # in the process group of the calling process. # On certain systems 0 is a valid PID but we have no way # to know that in a portable fashion. raise ValueError('invalid PID 0') try: os.kill(pid, 0) except OSError as err: if err.errno == errno.ESRCH: # ESRCH == No such process return False elif err.errno == errno.EPERM: # EPERM clearly means there's a process to deny access to return True else: # According to "man 2 kill" possible error values are # (EINVAL, EPERM, ESRCH) raise else: return True def updateRequestStatus(connection_string, request_id, new_status): conn = psycopg2.connect(connection_string) cur = conn.cursor() cur.execute("UPDATE request SET status=%s WHERE id=%s", (new_status, request_id)) conn.commit() cur.close() conn.close() def getPendingRequest(connection_string): conn = psycopg2.connect(connection_string) cur = conn.cursor() cur.execute("""select request.id, request.source_file_path, request.target_file_path, request.name, src_lang.id as src_lang_id, src_lang.code as src_code, trg_lang.id as trg_lang_id, trg_lang.code as trg_code, request.status, request.type, request.tm_id from request inner join language as src_lang on src_lang.id = request.source_lang_id inner join language as trg_lang on trg_lang.id = request.target_lang_id where request.status = 0 order by request.created limit 1""") request = cur.fetchone() cur.close() conn.close() return request def retrieve_and_append(connection_string, tm_id, src_file_path, trg_file_path): conn = psycopg2.connect(connection_string) cur = conn.cursor() cur.execute("SELECT source_segment, target_segment FROM unit WHERE tm_id=%d" % tm_id) with open(src_file_path, 'a') as src_file, open(trg_file_path, 'a') as trg_file: for row in cur: src_file.write(row[0]+'\n') trg_file.write(row[1]+'\n') cur.close() conn.close() def clearTm(connection_string, tm_id): conn = psycopg2.connect(connection_string) cur = conn.cursor() cur.execute("DELETE FROM unit WHERE tm_id=%d" %tm_id) conn.commit() cur.close() conn.close() def getLangsFromTM(connection_string, tm_id): conn = psycopg2.connect(connection_string) cur = conn.cursor() cur.execute("select src_lang.code, trg_lang.code from tm inner join language as src_lang on tm.source_lang_id = src_lang.id inner join language as trg_lang on tm.target_lang_id = trg_lang.id where tm.id = %d" % tm_id) langs = cur.fetchone() cur.close() conn.close() return langs # ----------- main ------------------------------------------------------------- BUFFER_SIZE = 500 src_dir = os.path.dirname(os.path.realpath(__file__)) logger = logging.getLogger('request_handler') hdlr = logging.FileHandler(src_dir+'/request_handler.log') formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') hdlr.setFormatter(formatter) logger.addHandler(hdlr) logger.setLevel(logging.INFO) pidfile_path = src_dir+'/request_handler.pid' if os.path.isfile(pidfile_path): with open(pidfile_path) as pidfile: pid = int(pidfile.readline().strip()) logger.info('Found pid file with pid: %d' % pid) if pid_exists(pid): logger.info('The process with pid=%d is actually running, exiting' % pid) sys.exit(0) else: logger.warning('The process with pid=%d is not running anymore. Ignoring the pid file.' % pid) with open(pidfile_path, 'w') as pidfile: pidfile.write(str(os.getpid())) mgiza_path = src_dir+'/../mgiza-aligner' connection_string = "dbname='concordia_server' user='concordia' host='localhost' port='6543' password='concordia'" try: request = getPendingRequest(connection_string) if request is not None: request_id, src_file_path, trg_file_path, tm_name, src_lang_id, src_lang_code, trg_lang_id, trg_lang_code, status, tm_type, tm_id = request if tm_type == 1: src_lang_code, trg_lang_code = getLangsFromTM(connection_string, tm_id) logger.info("Working on request: %d. Starting the mgiza aligner." % request_id) updateRequestStatus(connection_string, request_id, 1) request_corpus_path = mgiza_path+'/corpora/request_'+str(request_id) os.makedirs(request_corpus_path) shutil.copy(src_file_path, request_corpus_path+'/src.txt') shutil.copy(trg_file_path, request_corpus_path+'/trg.txt') if tm_type == 1: # extending existing TM. We must retrieve existing units from the db. logger.info("This is an extend request. Appending units from the tm %d to the src and trg files."%tm_id) retrieve_and_append(connection_string, tm_id, request_corpus_path+'/src.txt', request_corpus_path+'/trg.txt') subprocess.run(["make","SRC_LANG="+src_lang_code, "TRG_LANG="+trg_lang_code, "CORPUS_NAME=request_"+str(request_id)], cwd=mgiza_path, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) logger.info("Mgiza alignment complete, starting adding the TM to Concordia.") updateRequestStatus(connection_string, request_id, 2) host = SourceFileLoader("host", os.path.dirname(os.path.realpath(__file__))+'/../tests/host.py').load_module() address = 'http://'+host.concordia_host if len(host.concordia_port) > 0: address += ':'+host.concordia_port sourceFile = request_corpus_path+'/src_final.txt' targetFile = request_corpus_path+'/trg_final.txt' alignmentsFile = request_corpus_path+'/aligned_final.txt' if (file_len(sourceFile) != file_len(targetFile)): raise Exception("source and target files are not of the same length!") if (file_len(alignmentsFile) != 3*file_len(sourceFile)): raise Exception("alignments file is not exactly 3 times longer than source and target") if tm_type == 0: # create new TM data = { 'operation': 'addTm', 'sourceLangId':src_lang_id, 'targetLangId':trg_lang_id, 'name':tm_name, 'tmLemmatized':True } response = postJson(address, data) tmId = int(response['newTmId']) logger.info("Added new tm: %d" % tmId) else: clearTm(connection_string, tm_id) tmId = tm_id data = { 'operation': 'addAlignedLemmatizedSentences', 'tmId':tmId } examples = [] with open(sourceFile) as sf, open(targetFile) as tf, open(alignmentsFile) as af: for sourceLine in sf: sourceSentence = sourceLine.strip() targetSentence = tf.readline().strip() # skip to lines of the alignments file, these are lemmatized and we need the raw sentences from the source and target files. af.readline() af.readline() alignmentString = af.readline().strip() examples.append([sourceSentence, targetSentence, alignmentString]) if len(examples) >= BUFFER_SIZE: data['examples'] = examples add_examples(address, data) examples = [] if len(examples) > 0: data['examples'] = examples add_examples(address, data) updateRequestStatus(connection_string, request_id, 3) logger.info("Generating index...") start = time.time() data = { 'operation': 'refreshIndex', 'tmId' : tmId } response = postJson(address, data) end = time.time() logger.info("Index regeneration complete. The operation took %.4f s" % (end - start)) except Exception as e: logger.error("Error occured: "+str(e)) updateRequestStatus(connection_string, request_id, 5) else: updateRequestStatus(connection_string, request_id, 4) finally: os.remove(pidfile_path)