transcript in threadpool #2

Merged
s333949 merged 1 commits from reco into master 2020-02-22 12:48:19 +01:00

View File

@ -9,32 +9,52 @@ from google.protobuf.json_format import MessageToJson,MessageToDict
from storageUpload import getMongoCollection from storageUpload import getMongoCollection
from bson.objectid import ObjectId from bson.objectid import ObjectId
import datetime import datetime
import time
import concurrent.futures
import re
def main(args): def main(args):
uri = "gs://archspeechreco/wave/5df3e63d4c0402698d7837f3.wav"
reco = recognize(uri)
recoDict = MessageToDict(reco)
#print(json.dumps(transcript,indent=4,ensure_ascii=False))
words = recoDict["results"][-1]["alternatives"][0]["words"]
transcript = "".join( [ trans["alternatives"][0]["transcript"] for trans in recoDict["results"][:-1] ] )
mongoUri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco" mongoUri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco"
dbName = "archSpeechReco" dbName = "archSpeechReco"
colName = "moviesMeta" colName = "moviesMeta"
col = getMongoCollection(colName,dbName,mongoUri) global col
col = getMongoCollection(colName,dbName,mongoUri)
batch_size = int(args.batch_size)
waves = getWavList(col,batch_size)
uris = [ w['gcsWawLocation'] for w in waves ]
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor:
executor.map(run_reco, uris)
stop = time.perf_counter()
print(f'Finished in {round(stop-start, 2)} seconds')
def run_reco(uri):
reco = recognize(uri)
recoDict = MessageToDict(reco)
if (len(recoDict) != 0):
words = recoDict["results"][-1]["alternatives"][0]["words"]
transcript = "".join( [ trans["alternatives"][0]["transcript"] for trans in recoDict["results"][:-1] ] )
elif (len(recoDict) == 0):
words = {}
transcript = "film niemy"
now = datetime.datetime.now() now = datetime.datetime.now()
try: try:
col.update_one( col.update_one(
{"_id": ObjectId("5df3e63d4c0402698d7837f3")}, {"_id": ObjectId(uri.split('/')[4].split('.')[0])},
{"$set":{"gcTextReco.transcript":transcript, {"$set":{"gcTextReco.transcript":transcript,
"gcTextReco.words":words, "gcTextReco.words":words,
"gcTextReco.transcripted":now.strftime("%Y-%m-%d %H:%M:%S")}} "gcTextReco.transcripted":now.strftime("%Y-%m-%d %H:%M:%S")}}
) )
except Exception as e: print(e) except Exception as e: print(e)
else: else:
print("mongo update OK") print(f"mongo update OK {uri.split('/')[4].split('.')[0]}")
def recognize(storage_uri): def recognize(storage_uri):
""" """
@ -78,18 +98,43 @@ def recognize(storage_uri):
audio = {"uri": storage_uri} audio = {"uri": storage_uri}
operation = client.long_running_recognize(config, audio) operation = client.long_running_recognize(config, audio)
print(f'{storage_uri} has been sent to reco')
print(u"Waiting for operation to complete...") print(u"Waiting for operation to complete...")
response = operation.result() response = operation.result()
#for result in response.results:
# # First alternative is the most probable result
# alternative = result.alternatives[0]
# print(u"Transcript: {}".format(alternative.transcript))
return response return response
def getMongoCollection(colName,dbName,uri):
client = MongoClient(uri,maxPoolSize=512)
db = client[dbName]
col = db[colName]
return col
def getWavList(col,limit=32):
pipeline = []
#match phase, filetr documents withour gcTextReco field - voice not recognized
pipeline.append({"$match": {"$and":[
{"gcTextReco": {"$exists": False}},
{"gcsWav": {"$exists": True}},
{"description.details.Format dźwięku": {"$ne": "brak"}}
]}
}
)
#project phase, show only bucket name: gcsWav.location
pipeline.append({"$project": {
"gcsWawLocation": { "$concat": [ "gs://archspeechreco/","$gcsWav.location" ] }
}
})
#fetch only N documents
pipeline.append({"$limit":limit})
return col.aggregate(pipeline)
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Google Cloud speech2text API client') parser = argparse.ArgumentParser(description='Google Cloud speech2text API client')
parser.add_argument("--format", default='mp4', help="format to fetch and upload, [mp4, wav]") parser.add_argument("--batch_size", default=512, help="how many waves in the batch")
args = parser.parse_args() args = parser.parse_args()
main(args) main(args)