transcript in threadpool
This commit is contained in:
parent
478bb53b5b
commit
0c7623b871
85
src/reco.py
85
src/reco.py
@ -9,32 +9,52 @@ from google.protobuf.json_format import MessageToJson,MessageToDict
|
||||
from storageUpload import getMongoCollection
|
||||
from bson.objectid import ObjectId
|
||||
import datetime
|
||||
import time
|
||||
import concurrent.futures
|
||||
import re
|
||||
|
||||
|
||||
def main(args):
|
||||
uri = "gs://archspeechreco/wave/5df3e63d4c0402698d7837f3.wav"
|
||||
reco = recognize(uri)
|
||||
recoDict = MessageToDict(reco)
|
||||
#print(json.dumps(transcript,indent=4,ensure_ascii=False))
|
||||
|
||||
words = recoDict["results"][-1]["alternatives"][0]["words"]
|
||||
transcript = "".join( [ trans["alternatives"][0]["transcript"] for trans in recoDict["results"][:-1] ] )
|
||||
|
||||
mongoUri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco"
|
||||
dbName = "archSpeechReco"
|
||||
colName = "moviesMeta"
|
||||
col = getMongoCollection(colName,dbName,mongoUri)
|
||||
global col
|
||||
col = getMongoCollection(colName,dbName,mongoUri)
|
||||
batch_size = int(args.batch_size)
|
||||
waves = getWavList(col,batch_size)
|
||||
uris = [ w['gcsWawLocation'] for w in waves ]
|
||||
|
||||
start = time.perf_counter()
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor:
|
||||
executor.map(run_reco, uris)
|
||||
stop = time.perf_counter()
|
||||
|
||||
print(f'Finished in {round(stop-start, 2)} seconds')
|
||||
|
||||
|
||||
def run_reco(uri):
|
||||
reco = recognize(uri)
|
||||
recoDict = MessageToDict(reco)
|
||||
|
||||
if (len(recoDict) != 0):
|
||||
words = recoDict["results"][-1]["alternatives"][0]["words"]
|
||||
transcript = "".join( [ trans["alternatives"][0]["transcript"] for trans in recoDict["results"][:-1] ] )
|
||||
elif (len(recoDict) == 0):
|
||||
words = {}
|
||||
transcript = "film niemy"
|
||||
|
||||
now = datetime.datetime.now()
|
||||
try:
|
||||
col.update_one(
|
||||
{"_id": ObjectId("5df3e63d4c0402698d7837f3")},
|
||||
{"_id": ObjectId(uri.split('/')[4].split('.')[0])},
|
||||
{"$set":{"gcTextReco.transcript":transcript,
|
||||
"gcTextReco.words":words,
|
||||
"gcTextReco.transcripted":now.strftime("%Y-%m-%d %H:%M:%S")}}
|
||||
)
|
||||
except Exception as e: print(e)
|
||||
else:
|
||||
print("mongo update OK")
|
||||
print(f"mongo update OK {uri.split('/')[4].split('.')[0]}")
|
||||
|
||||
|
||||
def recognize(storage_uri):
|
||||
"""
|
||||
@ -78,18 +98,43 @@ def recognize(storage_uri):
|
||||
audio = {"uri": storage_uri}
|
||||
|
||||
operation = client.long_running_recognize(config, audio)
|
||||
|
||||
print(f'{storage_uri} has been sent to reco')
|
||||
print(u"Waiting for operation to complete...")
|
||||
response = operation.result()
|
||||
|
||||
#for result in response.results:
|
||||
# # First alternative is the most probable result
|
||||
# alternative = result.alternatives[0]
|
||||
# print(u"Transcript: {}".format(alternative.transcript))
|
||||
return response
|
||||
|
||||
|
||||
def getMongoCollection(colName,dbName,uri):
|
||||
client = MongoClient(uri,maxPoolSize=512)
|
||||
db = client[dbName]
|
||||
col = db[colName]
|
||||
|
||||
return col
|
||||
|
||||
|
||||
def getWavList(col,limit=32):
|
||||
pipeline = []
|
||||
#match phase, filetr documents withour gcTextReco field - voice not recognized
|
||||
pipeline.append({"$match": {"$and":[
|
||||
{"gcTextReco": {"$exists": False}},
|
||||
{"gcsWav": {"$exists": True}},
|
||||
{"description.details.Format dźwięku": {"$ne": "brak"}}
|
||||
]}
|
||||
}
|
||||
)
|
||||
#project phase, show only bucket name: gcsWav.location
|
||||
pipeline.append({"$project": {
|
||||
"gcsWawLocation": { "$concat": [ "gs://archspeechreco/","$gcsWav.location" ] }
|
||||
}
|
||||
})
|
||||
#fetch only N documents
|
||||
pipeline.append({"$limit":limit})
|
||||
return col.aggregate(pipeline)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='Google Cloud speech2text API client')
|
||||
parser.add_argument("--format", default='mp4', help="format to fetch and upload, [mp4, wav]")
|
||||
args = parser.parse_args()
|
||||
main(args)
|
||||
parser = argparse.ArgumentParser(description='Google Cloud speech2text API client')
|
||||
parser.add_argument("--batch_size", default=512, help="how many waves in the batch")
|
||||
args = parser.parse_args()
|
||||
main(args)
|
||||
|
Loading…
Reference in New Issue
Block a user