From a7625d6ea4f4712d7830dbd5be945b282b90dbfe Mon Sep 17 00:00:00 2001 From: Wojciech Smolak Date: Fri, 21 Aug 2020 22:49:52 +0200 Subject: [PATCH] spacje --- src/reco.py | 72 +++++++++++++++++++++++++++++------------------------ 1 file changed, 40 insertions(+), 32 deletions(-) diff --git a/src/reco.py b/src/reco.py index 9eab3981..31a4fe47 100644 --- a/src/reco.py +++ b/src/reco.py @@ -8,35 +8,42 @@ from mongo.helpers import get_mongo_collection import datetime import time import concurrent.futures +import logging def main(args): - mongoUri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco" - dbName = "archSpeechReco" - colName = "moviesMeta" + loglevel = args.loglevel + numeric_level = getattr(logging, loglevel.upper(), 10) + logging.basicConfig(filename='reco.log', format='%(asctime)s [%(levelname)s] - %(message)s', level=numeric_level) + + mongo_uri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco" + db_name = "archSpeechReco" + col_name = "moviesMeta" global col - col = get_mongo_collection(colName, dbName, mongoUri) + col = get_mongo_collection(col_name, db_name, mongo_uri) batch_size = int(args.batch_size) - waves = getWavList(col, batch_size) + source = args.source + channels = args.channels + waves = get_wav_list(col, batch_size, source) uris = [w for w in waves] - print(uris) + uris = list(map(lambda x: dict(x, **{'channels': channels}), uris)) start = time.perf_counter() with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor: executor.map(run_reco, uris) stop = time.perf_counter() - print(f'Finished in {round(stop - start, 2)} seconds') + logging.info(f'Finished in {round(stop - start, 2)} seconds') def run_reco(uri): - reco = recognize(uri['gcsWawLocation']) - recoDict = MessageToDict(reco) + reco = recognize(uri['gcsWawLocation'], uri['channels']) + reco_dict = MessageToDict(reco) - if len(recoDict) != 0: - words = recoDict["results"][-1]["alternatives"][0]["words"] - transcript = " ".join([trans["alternatives"][0]["transcript"] for trans in recoDict["results"][:-1]]) - elif len(recoDict) == 0: + if len(reco_dict) != 0: + words = reco_dict["results"][-1]["alternatives"][0]["words"] + transcript = " ".join([trans["alternatives"][0]["transcript"] for trans in reco_dict["results"][:-1]]) + elif len(reco_dict) == 0: words = {} transcript = "film niemy" @@ -49,12 +56,13 @@ def run_reco(uri): "gcTextReco.transcripted": now.strftime("%Y-%m-%d %H:%M:%S")}} ) except Exception as e: - print(e) + logging.error(e) else: - print(f"mongo update OK {uri.split('/')[4].split('.')[0]}") + logging.info(f"mongo update OK {uri['_id']}") + logging.debug(f"transcript for {uri['gcsWawLocation']}: {transcript}") -def recognize(storage_uri): +def recognize(storage_uri, channels): """ Transcribe long audio file from Cloud Storage using asynchronous speech recognition @@ -73,50 +81,50 @@ def recognize(storage_uri): sample_rate_hertz=44100, language_code="pl-PL", diarization_config=d_config, - audio_channel_count=2 + audio_channel_count=channels ) audio = {"uri": storage_uri} operation = client.long_running_recognize(config, audio) - print(f'{storage_uri} has been sent to reco') - print(u"Waiting for operation to complete...") + logging.info(f'{storage_uri} has been sent to reco') + logging.info(u"Waiting for operation to complete...") response = operation.result() return response -def getMongoCollection(colName, dbName, uri): +def get_mongo_collection(col_name, db_name, uri): client = MongoClient(uri, maxPoolSize=512) - db = client[dbName] - col = db[colName] + db = client[db_name] + col = db[col_name] return col -def getWavList(col, limit=32): - pipeline = [] - # match phase, filter documents without gcTextReco field - voice not recognized - pipeline.append({"$match": {"$and": [ +def get_wav_list(col, source, limit=32): + pipeline = [{"$match": {"$and": [ + {"source": source}, {"gcTextReco": {"$exists": False}}, {"gcsWav": {"$exists": True}}, {"description.details.Format dźwięku": {"$ne": "brak"}} ]} - } - ) - # project phase, show only bucket name: gcsWav.location - pipeline.append({"$project": { + }, {"$project": { "_id": 1, "gcsWawLocation": {"$concat": ["gs://archspeechreco/", "$gcsWav.location"]} } - }) + }, {"$limit": limit}] + # match phase, filter documents without gcTextReco field - voice not recognized + # project phase, show only bucket name: gcsWav.location # fetch only N documents - pipeline.append({"$limit": limit}) return col.aggregate(pipeline) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Google Cloud speech2text API client') parser.add_argument("--batch_size", default=512, help="how many waves in the batch") + parser.add_argument("--source", help="source of media [dtv, filmoteka, kronikiprl, sonda]") + parser.add_argument("--channels", help="quantity of audio channels") + parser.add_argument("--loglevel", help="log level: DEBUG INFO WARNING ERROR") args = parser.parse_args() main(args)