from google.cloud import speech_v1p1beta1 from google.cloud.speech_v1p1beta1 import enums from google.cloud.speech_v1p1beta1 import types from pymongo import MongoClient import argparse from google.protobuf.json_format import MessageToDict from src.mongo.helpers import get_mongo_collection from bson.objectid import ObjectId import datetime import time import concurrent.futures def main(args): mongoUri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco" dbName = "archSpeechReco" colName = "moviesMeta" global col col = get_mongo_collection(colName, dbName, mongoUri) batch_size = int(args.batch_size) waves = getWavList(col, batch_size) uris = [w['gcsWawLocation'] for w in waves] start = time.perf_counter() with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor: executor.map(run_reco, uris) stop = time.perf_counter() print(f'Finished in {round(stop - start, 2)} seconds') def run_reco(uri): reco = recognize(uri) recoDict = MessageToDict(reco) if len(recoDict) != 0: words = recoDict["results"][-1]["alternatives"][0]["words"] transcript = "".join([trans["alternatives"][0]["transcript"] for trans in recoDict["results"][:-1]]) elif len(recoDict) == 0: words = {} transcript = "film niemy" now = datetime.datetime.now() try: col.update_one( {"_id": ObjectId(uri.split('/')[4].split('.')[0])}, {"$set": {"gcTextReco.transcript": transcript, "gcTextReco.words": words, "gcTextReco.transcripted": now.strftime("%Y-%m-%d %H:%M:%S")}} ) except Exception as e: print(e) else: print(f"mongo update OK {uri.split('/')[4].split('.')[0]}") def recognize(storage_uri): """ Transcribe long audio file from Cloud Storage using asynchronous speech recognition Args: storage_uri URI for audio file in Cloud Storage, e.g. gs://[BUCKET]/[FILE] """ # client = speech_v1.SpeechClient() client = speech_v1p1beta1.SpeechClient() # storage_uri = 'gs://cloud-samples-data/speech/brooklyn_bridge.raw' # Sample rate in Hertz of the audio data sent sample_rate_hertz = 44100 # The language of the supplied audio language_code = "pl-PL" # Encoding of audio data sent. This sample sets this explicitly. # This field is optional for FLAC and WAV audio formats. encoding = enums.RecognitionConfig.AudioEncoding.LINEAR16 enable_speaker_diarization = True # config = { # "sample_rate_hertz": sample_rate_hertz, # "language_code": language_code, # "encoding": encoding, # "enableSpeakerDiarization": enable_speaker_diarization # d_config = types.SpeakerDiarizationConfig( enable_speaker_diarization=True ) config = types.RecognitionConfig( encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16, sample_rate_hertz=44100, language_code="pl-PL", diarization_config=d_config ) audio = {"uri": storage_uri} operation = client.long_running_recognize(config, audio) print(f'{storage_uri} has been sent to reco') print(u"Waiting for operation to complete...") response = operation.result() return response def getMongoCollection(colName, dbName, uri): client = MongoClient(uri, maxPoolSize=512) db = client[dbName] col = db[colName] return col def getWavList(col, limit=32): pipeline = [] # match phase, filetr documents withour gcTextReco field - voice not recognized pipeline.append({"$match": {"$and": [ {"gcTextReco": {"$exists": False}}, {"gcsWav": {"$exists": True}}, {"description.details.Format dźwięku": {"$ne": "brak"}} ]} } ) # project phase, show only bucket name: gcsWav.location pipeline.append({"$project": { "gcsWawLocation": {"$concat": ["gs://archspeechreco/", "$gcsWav.location"]} } }) # fetch only N documents pipeline.append({"$limit": limit}) return col.aggregate(pipeline) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Google Cloud speech2text API client') parser.add_argument("--batch_size", default=512, help="how many waves in the batch") args = parser.parse_args() main(args)