From 3f3a136fa2fee215ba6dc49e609470e0c162de4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Jaworski?= Date: Mon, 26 Jun 2017 13:45:10 +0200 Subject: [PATCH] import requests working --- .gitignore | 1 + INSTALL.txt | 4 + cat/concordia_gate.php_pattern | 3 - cat/css/concordia_cat.css | 1 - cat/tm_manager.php_pattern | 15 +- db/concordia_server.sql | 2 + import-requests/handle_requests.py | 257 +++++++++++++++++++++-------- mgiza-aligner/Makefile | 4 +- 8 files changed, 212 insertions(+), 75 deletions(-) diff --git a/.gitignore b/.gitignore index efdcffc..47bc588 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,4 @@ mgiza-aligner/mgiza/mgizapp/src/mkcls/CMakeFiles/ mgiza-aligner/mgiza/mgizapp/src/mkcls/Makefile mgiza-aligner/mgiza/mgizapp/src/mkcls/cmake_install.cmake __pycache__ +import-requests/request_handler.log diff --git a/INSTALL.txt b/INSTALL.txt index 37f1dd2..49f6848 100644 --- a/INSTALL.txt +++ b/INSTALL.txt @@ -88,3 +88,7 @@ mgiza-aligner: - cd mgiza, mgizapp - sudo apt-get install libboost-thread-dev - follow instructions in INSTALL + +import-requests: +crontab -e +* * * * * /path/to/handle_requests.py diff --git a/cat/concordia_gate.php_pattern b/cat/concordia_gate.php_pattern index 16018a0..d3ccbf3 100644 --- a/cat/concordia_gate.php_pattern +++ b/cat/concordia_gate.php_pattern @@ -2,9 +2,6 @@ $url = 'http://@concordia_host@:@concordia_port@'; $intervalsArray = array(); -foreach ($_POST["intervals"] as $interval) { - array_push($intervalsArray, [intval($interval[0]), intval($interval[1])]); -} $data = array("operation" => $_POST["operation"],"tmId" => intval($_POST["tmId"]),"pattern" => $_POST["pattern"],"intervals" => $intervalsArray); // use key 'http' even if you send the request to https://... diff --git a/cat/css/concordia_cat.css b/cat/css/concordia_cat.css index 375c99d..2cc6442 100644 --- a/cat/css/concordia_cat.css +++ b/cat/css/concordia_cat.css @@ -320,7 +320,6 @@ th a.desc:after { } table tr td { line-height:200%; - max-width:200px; padding: 6px; text-align: left; vertical-align: top; diff --git a/cat/tm_manager.php_pattern b/cat/tm_manager.php_pattern index 02c0e44..6715318 100644 --- a/cat/tm_manager.php_pattern +++ b/cat/tm_manager.php_pattern @@ -74,7 +74,14 @@ if ($_SERVER['REQUEST_METHOD'] == 'POST' ) { $errorMessage = addRequest($url, $_POST, $_FILES); } - +$statusesDictionary = array( + 0 => "new", + 1 => "processing - word alignment", + 2 => "processing - adding to Concordia", + 3 => "processing - regenerating index", + 4 => "complete", + 5 => "error" +); $tmsData = postJson($url, array("operation" =>"getTmsInfo")); $requestsData = postJson($url, array("operation" =>"getRequestsInfo")); @@ -149,9 +156,9 @@ $languagesData = postJson($url, array("operation" =>"getLanguages")); ?> - +
- @@ -226,7 +233,7 @@ $languagesData = postJson($url, array("operation" =>"getLanguages")); } ?> - +
+ status ?>status] ?> type == 0) { diff --git a/db/concordia_server.sql b/db/concordia_server.sql index c0aad11..88f4b43 100644 --- a/db/concordia_server.sql +++ b/db/concordia_server.sql @@ -48,3 +48,5 @@ CREATE TABLE alignment ( ); CREATE INDEX ON alignment(unit_id, source_token_pos); + +CREATE INDEX ON unit(tm_id); diff --git a/import-requests/handle_requests.py b/import-requests/handle_requests.py index a94de6a..41a6713 100755 --- a/import-requests/handle_requests.py +++ b/import-requests/handle_requests.py @@ -1,12 +1,10 @@ #!/usr/bin/python3 # -*- coding: utf-8 -*- -import psycopg2, os, shutil, subprocess, urllib3, time, json +import psycopg2, os, sys, shutil, subprocess, urllib3, time, json, logging, errno from importlib.machinery import SourceFileLoader -BUFFER_SIZE = 500 - def postJson(address, data): http = urllib3.PoolManager() response = http.request('POST', address, headers={'Content-Type': 'application/json'},body=json.dumps(data).encode('utf-8')) @@ -26,90 +24,219 @@ def file_len(fname): return i + 1 -mgiza_path = os.path.dirname(os.path.realpath(__file__))+'/../mgiza-aligner' +def pid_exists(pid): + """Check whether pid exists in the current process table. + UNIX only. + """ + if pid < 0: + return False + if pid == 0: + # According to "man 2 kill" PID 0 refers to every process + # in the process group of the calling process. + # On certain systems 0 is a valid PID but we have no way + # to know that in a portable fashion. + raise ValueError('invalid PID 0') + try: + os.kill(pid, 0) + except OSError as err: + if err.errno == errno.ESRCH: + # ESRCH == No such process + return False + elif err.errno == errno.EPERM: + # EPERM clearly means there's a process to deny access to + return True + else: + # According to "man 2 kill" possible error values are + # (EINVAL, EPERM, ESRCH) + raise + else: + return True -conn = psycopg2.connect("dbname='concordia_server' user='concordia' host='localhost' port='6543' password='concordia'") -cur = conn.cursor() -cur.execute("""select request.id, request.source_file_path, request.target_file_path, request.name, src_lang.id as src_lang_id, src_lang.code as src_code, trg_lang.id as trg_lang_id, trg_lang.code as trg_code, request.status, request.type, request.tm_id from request inner join language as src_lang on src_lang.id = request.source_lang_id inner join language as trg_lang on trg_lang.id = request.target_lang_id order by request.created limit 1""") +def updateRequestStatus(connection_string, request_id, new_status): + conn = psycopg2.connect(connection_string) + cur = conn.cursor() + cur.execute("UPDATE request SET status=%s WHERE id=%s", (new_status, request_id)) + conn.commit() + cur.close() + conn.close() + +def getPendingRequest(connection_string): + conn = psycopg2.connect(connection_string) + cur = conn.cursor() + cur.execute("""select request.id, request.source_file_path, request.target_file_path, request.name, src_lang.id as src_lang_id, src_lang.code as src_code, trg_lang.id as trg_lang_id, trg_lang.code as trg_code, request.status, request.type, request.tm_id from request inner join language as src_lang on src_lang.id = request.source_lang_id inner join language as trg_lang on trg_lang.id = request.target_lang_id where request.status = 0 order by request.created limit 1""") + + request = cur.fetchone() + cur.close() + conn.close() + return request + +def retrieve_and_append(connection_string, tm_id, src_file_path, trg_file_path): + conn = psycopg2.connect(connection_string) + cur = conn.cursor() + cur.execute("SELECT source_segment, target_segment FROM unit WHERE tm_id=%d" % tm_id) + with open(src_file_path, 'a') as src_file, open(trg_file_path, 'a') as trg_file: + for row in cur: + src_file.write(row[0]+'\n') + trg_file.write(row[1]+'\n') + + cur.close() + conn.close() + +def clearTm(connection_string, tm_id): + conn = psycopg2.connect(connection_string) + cur = conn.cursor() + cur.execute("DELETE FROM unit WHERE tm_id=%d" %tm_id) + conn.commit() + cur.close() + conn.close() + +def getLangsFromTM(connection_string, tm_id): + conn = psycopg2.connect(connection_string) + cur = conn.cursor() + cur.execute("select src_lang.code, trg_lang.code from tm inner join language as src_lang on tm.source_lang_id = src_lang.id inner join language as trg_lang on tm.target_lang_id = trg_lang.id where tm.id = %d" % tm_id) + langs = cur.fetchone() + cur.close() + conn.close() + return langs + +# ----------- main ------------------------------------------------------------- + +BUFFER_SIZE = 500 + +src_dir = os.path.dirname(os.path.realpath(__file__)) + +logger = logging.getLogger('request_handler') +hdlr = logging.FileHandler(src_dir+'/request_handler.log') +formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') +hdlr.setFormatter(formatter) +logger.addHandler(hdlr) +logger.setLevel(logging.INFO) -request = cur.fetchone() -request_id, src_file_path, trg_file_path, tm_name, src_lang_id, src_lang_code, trg_lang_id, trg_lang_code, status, tm_type, tm_id = request -request_corpus_path = mgiza_path+'/corpora/request_'+str(request_id) -os.makedirs(request_corpus_path) -shutil.copy(src_file_path, request_corpus_path+'/src.txt') -shutil.copy(trg_file_path, request_corpus_path+'/trg.txt') +pidfile_path = src_dir+'/request_handler.pid' +if os.path.isfile(pidfile_path): + with open(pidfile_path) as pidfile: + pid = int(pidfile.readline().strip()) + logger.info('Found pid file with pid: %d' % pid) + if pid_exists(pid): + logger.info('The process with pid=%d is actually running, exiting' % pid) + sys.exit(0) + else: + logger.warning('The process with pid=%d is not running anymore. Ignoring the pid file.' % pid) -subprocess.run(["make","SRC_LANG="+src_lang_code, "TRG_LANG="+trg_lang_code, "CORPUS_NAME=request_"+str(request_id)], cwd=mgiza_path) +with open(pidfile_path, 'w') as pidfile: + pidfile.write(str(os.getpid())) -cur.close() -conn.close() +mgiza_path = src_dir+'/../mgiza-aligner' -host = SourceFileLoader("host", os.path.dirname(os.path.realpath(__file__))+'/../tests/host.py').load_module() -address = 'http://'+host.concordia_host -if len(host.concordia_port) > 0: - address += ':'+host.concordia_port +connection_string = "dbname='concordia_server' user='concordia' host='localhost' port='6543' password='concordia'" + +try: + request = getPendingRequest(connection_string) + if request is not None: + request_id, src_file_path, trg_file_path, tm_name, src_lang_id, src_lang_code, trg_lang_id, trg_lang_code, status, tm_type, tm_id = request + + if tm_type == 1: + src_lang_code, trg_lang_code = getLangsFromTM(connection_string, tm_id) + + logger.info("Working on request: %d. Starting the mgiza aligner." % request_id) + updateRequestStatus(connection_string, request_id, 1) + + request_corpus_path = mgiza_path+'/corpora/request_'+str(request_id) + os.makedirs(request_corpus_path) + + shutil.copy(src_file_path, request_corpus_path+'/src.txt') + shutil.copy(trg_file_path, request_corpus_path+'/trg.txt') + + if tm_type == 1: + # extending existing TM. We must retrieve existing units from the db. + logger.info("This is an extend request. Appending units from the tm %d to the src and trg files."%tm_id) + retrieve_and_append(connection_string, tm_id, request_corpus_path+'/src.txt', request_corpus_path+'/trg.txt') + + subprocess.run(["make","SRC_LANG="+src_lang_code, "TRG_LANG="+trg_lang_code, "CORPUS_NAME=request_"+str(request_id)], cwd=mgiza_path, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + logger.info("Mgiza alignment complete, starting adding the TM to Concordia.") + updateRequestStatus(connection_string, request_id, 2) + + host = SourceFileLoader("host", os.path.dirname(os.path.realpath(__file__))+'/../tests/host.py').load_module() + address = 'http://'+host.concordia_host + if len(host.concordia_port) > 0: + address += ':'+host.concordia_port -sourceFile = request_corpus_path+'/src_final.txt' -targetFile = request_corpus_path+'/trg_final.txt' -alignmentsFile = request_corpus_path+'/aligned_final.txt' + sourceFile = request_corpus_path+'/src_final.txt' + targetFile = request_corpus_path+'/trg_final.txt' + alignmentsFile = request_corpus_path+'/aligned_final.txt' -if (file_len(sourceFile) != file_len(targetFile)): - raise Exception("source and target files are not of the same length!") + if (file_len(sourceFile) != file_len(targetFile)): + raise Exception("source and target files are not of the same length!") -if (file_len(alignmentsFile) != 3*file_len(sourceFile)): - raise Exception("alignments file is not exactly 3 times longer than source and target") + if (file_len(alignmentsFile) != 3*file_len(sourceFile)): + raise Exception("alignments file is not exactly 3 times longer than source and target") -data = { - 'operation': 'addTm', - 'sourceLangId':src_lang_id, - 'targetLangId':trg_lang_id, - 'name':tm_name, - 'tmLemmatized':True -} + if tm_type == 0: + # create new TM + data = { + 'operation': 'addTm', + 'sourceLangId':src_lang_id, + 'targetLangId':trg_lang_id, + 'name':tm_name, + 'tmLemmatized':True + } -response = postJson(address, data) + response = postJson(address, data) -tmId = int(response['newTmId']) -print("Added new tm: %d" % tmId) + tmId = int(response['newTmId']) + logger.info("Added new tm: %d" % tmId) + else: + clearTm(connection_string, tm_id) + tmId = tm_id -data = { - 'operation': 'addAlignedLemmatizedSentences', - 'tmId':tmId -} + data = { + 'operation': 'addAlignedLemmatizedSentences', + 'tmId':tmId + } -examples = [] -with open(sourceFile) as sf, open(targetFile) as tf, open(alignmentsFile) as af: - for sourceLine in sf: - sourceSentence = sourceLine.strip() - targetSentence = tf.readline().strip() + examples = [] + with open(sourceFile) as sf, open(targetFile) as tf, open(alignmentsFile) as af: + for sourceLine in sf: + sourceSentence = sourceLine.strip() + targetSentence = tf.readline().strip() - # skip to lines of the alignments file, these are lemmatized and we need the raw sentences from the source and target files. - af.readline() - af.readline() + # skip to lines of the alignments file, these are lemmatized and we need the raw sentences from the source and target files. + af.readline() + af.readline() - alignmentString = af.readline().strip() + alignmentString = af.readline().strip() - examples.append([sourceSentence, targetSentence, alignmentString]) + examples.append([sourceSentence, targetSentence, alignmentString]) - if len(examples) >= BUFFER_SIZE: + if len(examples) >= BUFFER_SIZE: + data['examples'] = examples + add_examples(address, data) + examples = [] + + if len(examples) > 0: data['examples'] = examples add_examples(address, data) - examples = [] -if len(examples) > 0: - data['examples'] = examples - add_examples(address, data) + updateRequestStatus(connection_string, request_id, 3) + logger.info("Generating index...") + start = time.time() + data = { + 'operation': 'refreshIndex', + 'tmId' : tmId + } -print("Generating index...") -start = time.time() -data = { - 'operation': 'refreshIndex', - 'tmId' : tmId -} + response = postJson(address, data) -response = postJson(address, data) + end = time.time() + logger.info("Index regeneration complete. The operation took %.4f s" % (end - start)) -end = time.time() -print("Index regeneration complete. The operation took %.4f s" % (end - start)) +except Exception as e: + logger.error("Error occured: "+str(e)) + updateRequestStatus(connection_string, request_id, 5) +else: + updateRequestStatus(connection_string, request_id, 4) +finally: + os.remove(pidfile_path) diff --git a/mgiza-aligner/Makefile b/mgiza-aligner/Makefile index f247dda..6630c6c 100644 --- a/mgiza-aligner/Makefile +++ b/mgiza-aligner/Makefile @@ -66,11 +66,11 @@ corpora/$(CORPUS_NAME)/src_clean.lem: corpora/$(CORPUS_NAME)/src_clean.tok corpora/$(CORPUS_NAME)/trg.tok: corpora/$(CORPUS_NAME)/trg.txt - concordia-sentence-tokenizer -c ../concordia.cfg < $< > $@ + /usr/local/bin/concordia-sentence-tokenizer -c ../concordia.cfg < $< > $@ corpora/$(CORPUS_NAME)/src.tok: corpora/$(CORPUS_NAME)/src.txt - concordia-sentence-tokenizer -c ../concordia.cfg < $< > $@ + /usr/local/bin/concordia-sentence-tokenizer -c ../concordia.cfg < $< > $@ corpora/$(CORPUS_NAME)/src_clean.txt corpora/$(CORPUS_NAME)/trg_clean.txt corpora/$(CORPUS_NAME)/src_clean.tok corpora/$(CORPUS_NAME)/trg_clean.tok: corpora/$(CORPUS_NAME)/src.txt corpora/$(CORPUS_NAME)/trg.txt corpora/$(CORPUS_NAME)/src.tok corpora/$(CORPUS_NAME)/trg.tok