spacje
This commit is contained in:
parent
fc5486561a
commit
a7625d6ea4
72
src/reco.py
72
src/reco.py
@ -8,35 +8,42 @@ from mongo.helpers import get_mongo_collection
|
||||
import datetime
|
||||
import time
|
||||
import concurrent.futures
|
||||
import logging
|
||||
|
||||
|
||||
def main(args):
|
||||
mongoUri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco"
|
||||
dbName = "archSpeechReco"
|
||||
colName = "moviesMeta"
|
||||
loglevel = args.loglevel
|
||||
numeric_level = getattr(logging, loglevel.upper(), 10)
|
||||
logging.basicConfig(filename='reco.log', format='%(asctime)s [%(levelname)s] - %(message)s', level=numeric_level)
|
||||
|
||||
mongo_uri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco"
|
||||
db_name = "archSpeechReco"
|
||||
col_name = "moviesMeta"
|
||||
global col
|
||||
col = get_mongo_collection(colName, dbName, mongoUri)
|
||||
col = get_mongo_collection(col_name, db_name, mongo_uri)
|
||||
batch_size = int(args.batch_size)
|
||||
waves = getWavList(col, batch_size)
|
||||
source = args.source
|
||||
channels = args.channels
|
||||
waves = get_wav_list(col, batch_size, source)
|
||||
uris = [w for w in waves]
|
||||
print(uris)
|
||||
uris = list(map(lambda x: dict(x, **{'channels': channels}), uris))
|
||||
|
||||
start = time.perf_counter()
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor:
|
||||
executor.map(run_reco, uris)
|
||||
stop = time.perf_counter()
|
||||
|
||||
print(f'Finished in {round(stop - start, 2)} seconds')
|
||||
logging.info(f'Finished in {round(stop - start, 2)} seconds')
|
||||
|
||||
|
||||
def run_reco(uri):
|
||||
reco = recognize(uri['gcsWawLocation'])
|
||||
recoDict = MessageToDict(reco)
|
||||
reco = recognize(uri['gcsWawLocation'], uri['channels'])
|
||||
reco_dict = MessageToDict(reco)
|
||||
|
||||
if len(recoDict) != 0:
|
||||
words = recoDict["results"][-1]["alternatives"][0]["words"]
|
||||
transcript = " ".join([trans["alternatives"][0]["transcript"] for trans in recoDict["results"][:-1]])
|
||||
elif len(recoDict) == 0:
|
||||
if len(reco_dict) != 0:
|
||||
words = reco_dict["results"][-1]["alternatives"][0]["words"]
|
||||
transcript = " ".join([trans["alternatives"][0]["transcript"] for trans in reco_dict["results"][:-1]])
|
||||
elif len(reco_dict) == 0:
|
||||
words = {}
|
||||
transcript = "film niemy"
|
||||
|
||||
@ -49,12 +56,13 @@ def run_reco(uri):
|
||||
"gcTextReco.transcripted": now.strftime("%Y-%m-%d %H:%M:%S")}}
|
||||
)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
logging.error(e)
|
||||
else:
|
||||
print(f"mongo update OK {uri.split('/')[4].split('.')[0]}")
|
||||
logging.info(f"mongo update OK {uri['_id']}")
|
||||
logging.debug(f"transcript for {uri['gcsWawLocation']}: {transcript}")
|
||||
|
||||
|
||||
def recognize(storage_uri):
|
||||
def recognize(storage_uri, channels):
|
||||
"""
|
||||
Transcribe long audio file from Cloud Storage using asynchronous speech
|
||||
recognition
|
||||
@ -73,50 +81,50 @@ def recognize(storage_uri):
|
||||
sample_rate_hertz=44100,
|
||||
language_code="pl-PL",
|
||||
diarization_config=d_config,
|
||||
audio_channel_count=2
|
||||
audio_channel_count=channels
|
||||
)
|
||||
|
||||
audio = {"uri": storage_uri}
|
||||
|
||||
operation = client.long_running_recognize(config, audio)
|
||||
print(f'{storage_uri} has been sent to reco')
|
||||
print(u"Waiting for operation to complete...")
|
||||
logging.info(f'{storage_uri} has been sent to reco')
|
||||
logging.info(u"Waiting for operation to complete...")
|
||||
response = operation.result()
|
||||
|
||||
return response
|
||||
|
||||
|
||||
def getMongoCollection(colName, dbName, uri):
|
||||
def get_mongo_collection(col_name, db_name, uri):
|
||||
client = MongoClient(uri, maxPoolSize=512)
|
||||
db = client[dbName]
|
||||
col = db[colName]
|
||||
db = client[db_name]
|
||||
col = db[col_name]
|
||||
|
||||
return col
|
||||
|
||||
|
||||
def getWavList(col, limit=32):
|
||||
pipeline = []
|
||||
# match phase, filter documents without gcTextReco field - voice not recognized
|
||||
pipeline.append({"$match": {"$and": [
|
||||
def get_wav_list(col, source, limit=32):
|
||||
pipeline = [{"$match": {"$and": [
|
||||
{"source": source},
|
||||
{"gcTextReco": {"$exists": False}},
|
||||
{"gcsWav": {"$exists": True}},
|
||||
{"description.details.Format dźwięku": {"$ne": "brak"}}
|
||||
]}
|
||||
}
|
||||
)
|
||||
# project phase, show only bucket name: gcsWav.location
|
||||
pipeline.append({"$project": {
|
||||
}, {"$project": {
|
||||
"_id": 1,
|
||||
"gcsWawLocation": {"$concat": ["gs://archspeechreco/", "$gcsWav.location"]}
|
||||
}
|
||||
})
|
||||
}, {"$limit": limit}]
|
||||
# match phase, filter documents without gcTextReco field - voice not recognized
|
||||
# project phase, show only bucket name: gcsWav.location
|
||||
# fetch only N documents
|
||||
pipeline.append({"$limit": limit})
|
||||
return col.aggregate(pipeline)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='Google Cloud speech2text API client')
|
||||
parser.add_argument("--batch_size", default=512, help="how many waves in the batch")
|
||||
parser.add_argument("--source", help="source of media [dtv, filmoteka, kronikiprl, sonda]")
|
||||
parser.add_argument("--channels", help="quantity of audio channels")
|
||||
parser.add_argument("--loglevel", help="log level: DEBUG INFO WARNING ERROR")
|
||||
args = parser.parse_args()
|
||||
main(args)
|
||||
|
Loading…
Reference in New Issue
Block a user