From 0c7623b87167044e0106edf7c725e03077cd9941 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Sat, 22 Feb 2020 11:44:58 +0000 Subject: [PATCH] transcript in threadpool --- src/reco.py | 85 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 65 insertions(+), 20 deletions(-) diff --git a/src/reco.py b/src/reco.py index 0e9ec109..6b6d6734 100644 --- a/src/reco.py +++ b/src/reco.py @@ -9,32 +9,52 @@ from google.protobuf.json_format import MessageToJson,MessageToDict from storageUpload import getMongoCollection from bson.objectid import ObjectId import datetime +import time +import concurrent.futures +import re def main(args): - uri = "gs://archspeechreco/wave/5df3e63d4c0402698d7837f3.wav" - reco = recognize(uri) - recoDict = MessageToDict(reco) - #print(json.dumps(transcript,indent=4,ensure_ascii=False)) - - words = recoDict["results"][-1]["alternatives"][0]["words"] - transcript = "".join( [ trans["alternatives"][0]["transcript"] for trans in recoDict["results"][:-1] ] ) - mongoUri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco" dbName = "archSpeechReco" colName = "moviesMeta" - col = getMongoCollection(colName,dbName,mongoUri) + global col + col = getMongoCollection(colName,dbName,mongoUri) + batch_size = int(args.batch_size) + waves = getWavList(col,batch_size) + uris = [ w['gcsWawLocation'] for w in waves ] + + start = time.perf_counter() + with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor: + executor.map(run_reco, uris) + stop = time.perf_counter() + + print(f'Finished in {round(stop-start, 2)} seconds') + + +def run_reco(uri): + reco = recognize(uri) + recoDict = MessageToDict(reco) + + if (len(recoDict) != 0): + words = recoDict["results"][-1]["alternatives"][0]["words"] + transcript = "".join( [ trans["alternatives"][0]["transcript"] for trans in recoDict["results"][:-1] ] ) + elif (len(recoDict) == 0): + words = {} + transcript = "film niemy" + now = datetime.datetime.now() try: col.update_one( - {"_id": ObjectId("5df3e63d4c0402698d7837f3")}, + {"_id": ObjectId(uri.split('/')[4].split('.')[0])}, {"$set":{"gcTextReco.transcript":transcript, "gcTextReco.words":words, "gcTextReco.transcripted":now.strftime("%Y-%m-%d %H:%M:%S")}} ) except Exception as e: print(e) else: - print("mongo update OK") + print(f"mongo update OK {uri.split('/')[4].split('.')[0]}") + def recognize(storage_uri): """ @@ -78,18 +98,43 @@ def recognize(storage_uri): audio = {"uri": storage_uri} operation = client.long_running_recognize(config, audio) - + print(f'{storage_uri} has been sent to reco') print(u"Waiting for operation to complete...") response = operation.result() - #for result in response.results: - # # First alternative is the most probable result - # alternative = result.alternatives[0] - # print(u"Transcript: {}".format(alternative.transcript)) return response + +def getMongoCollection(colName,dbName,uri): + client = MongoClient(uri,maxPoolSize=512) + db = client[dbName] + col = db[colName] + + return col + + +def getWavList(col,limit=32): + pipeline = [] + #match phase, filetr documents withour gcTextReco field - voice not recognized + pipeline.append({"$match": {"$and":[ + {"gcTextReco": {"$exists": False}}, + {"gcsWav": {"$exists": True}}, + {"description.details.Format dźwięku": {"$ne": "brak"}} + ]} + } + ) + #project phase, show only bucket name: gcsWav.location + pipeline.append({"$project": { + "gcsWawLocation": { "$concat": [ "gs://archspeechreco/","$gcsWav.location" ] } + } + }) + #fetch only N documents + pipeline.append({"$limit":limit}) + return col.aggregate(pipeline) + + if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Google Cloud speech2text API client') - parser.add_argument("--format", default='mp4', help="format to fetch and upload, [mp4, wav]") - args = parser.parse_args() - main(args) + parser = argparse.ArgumentParser(description='Google Cloud speech2text API client') + parser.add_argument("--batch_size", default=512, help="how many waves in the batch") + args = parser.parse_args() + main(args)