archSpeechReco/src/reco.py

141 lines
4.6 KiB
Python

#from google.cloud import speech_v1
from google.cloud import speech_v1p1beta1
from google.cloud.speech_v1p1beta1 import enums
from google.cloud.speech_v1p1beta1 import types
from pymongo import MongoClient
import json
import argparse
from google.protobuf.json_format import MessageToJson,MessageToDict
from storageUpload import getMongoCollection
from bson.objectid import ObjectId
import datetime
import time
import concurrent.futures
import re
def main(args):
mongoUri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco"
dbName = "archSpeechReco"
colName = "moviesMeta"
global col
col = getMongoCollection(colName,dbName,mongoUri)
batch_size = int(args.batch_size)
waves = getWavList(col,batch_size)
uris = [ w['gcsWawLocation'] for w in waves ]
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor:
executor.map(run_reco, uris)
stop = time.perf_counter()
print(f'Finished in {round(stop-start, 2)} seconds')
def run_reco(uri):
reco = recognize(uri)
recoDict = MessageToDict(reco)
if (len(recoDict) != 0):
words = recoDict["results"][-1]["alternatives"][0]["words"]
transcript = "".join( [ trans["alternatives"][0]["transcript"] for trans in recoDict["results"][:-1] ] )
elif (len(recoDict) == 0):
words = {}
transcript = "film niemy"
now = datetime.datetime.now()
try:
col.update_one(
{"_id": ObjectId(uri.split('/')[4].split('.')[0])},
{"$set":{"gcTextReco.transcript":transcript,
"gcTextReco.words":words,
"gcTextReco.transcripted":now.strftime("%Y-%m-%d %H:%M:%S")}}
)
except Exception as e: print(e)
else:
print(f"mongo update OK {uri.split('/')[4].split('.')[0]}")
def recognize(storage_uri):
"""
Transcribe long audio file from Cloud Storage using asynchronous speech
recognition
Args:
storage_uri URI for audio file in Cloud Storage, e.g. gs://[BUCKET]/[FILE]
"""
#client = speech_v1.SpeechClient()
client = speech_v1p1beta1.SpeechClient()
# storage_uri = 'gs://cloud-samples-data/speech/brooklyn_bridge.raw'
# Sample rate in Hertz of the audio data sent
sample_rate_hertz = 44100
# The language of the supplied audio
language_code = "pl-PL"
# Encoding of audio data sent. This sample sets this explicitly.
# This field is optional for FLAC and WAV audio formats.
encoding = enums.RecognitionConfig.AudioEncoding.LINEAR16
enable_speaker_diarization = True
#config = {
#"sample_rate_hertz": sample_rate_hertz,
# "language_code": language_code,
# "encoding": encoding,
# "enableSpeakerDiarization": enable_speaker_diarization
#
d_config = types.SpeakerDiarizationConfig(
enable_speaker_diarization=True
)
config = types.RecognitionConfig(
encoding = enums.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz = 44100,
language_code = "pl-PL",
diarization_config=d_config
)
audio = {"uri": storage_uri}
operation = client.long_running_recognize(config, audio)
print(f'{storage_uri} has been sent to reco')
print(u"Waiting for operation to complete...")
response = operation.result()
return response
def getMongoCollection(colName,dbName,uri):
client = MongoClient(uri,maxPoolSize=512)
db = client[dbName]
col = db[colName]
return col
def getWavList(col,limit=32):
pipeline = []
#match phase, filetr documents withour gcTextReco field - voice not recognized
pipeline.append({"$match": {"$and":[
{"gcTextReco": {"$exists": False}},
{"gcsWav": {"$exists": True}},
{"description.details.Format dźwięku": {"$ne": "brak"}}
]}
}
)
#project phase, show only bucket name: gcsWav.location
pipeline.append({"$project": {
"gcsWawLocation": { "$concat": [ "gs://archspeechreco/","$gcsWav.location" ] }
}
})
#fetch only N documents
pipeline.append({"$limit":limit})
return col.aggregate(pipeline)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Google Cloud speech2text API client')
parser.add_argument("--batch_size", default=512, help="how many waves in the batch")
args = parser.parse_args()
main(args)