diff --git a/.gitignore b/.gitignore index bee8a64b..ee05595b 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ +.idea __pycache__ diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/mongo/__init__.py b/src/mongo/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/mongo/helpers.py b/src/mongo/helpers.py new file mode 100644 index 00000000..19db4c8f --- /dev/null +++ b/src/mongo/helpers.py @@ -0,0 +1,9 @@ +from pymongo import MongoClient + + +def get_mongo_collection(col_name, db_name, uri): + client = MongoClient(uri) + db = client[db_name] + col = db[col_name] + + return col diff --git a/src/reco.py b/src/reco.py index 6b6d6734..ddd0a803 100644 --- a/src/reco.py +++ b/src/reco.py @@ -1,17 +1,14 @@ -#from google.cloud import speech_v1 from google.cloud import speech_v1p1beta1 from google.cloud.speech_v1p1beta1 import enums from google.cloud.speech_v1p1beta1 import types from pymongo import MongoClient -import json import argparse -from google.protobuf.json_format import MessageToJson,MessageToDict -from storageUpload import getMongoCollection +from google.protobuf.json_format import MessageToDict +from src.mongo.helpers import get_mongo_collection from bson.objectid import ObjectId import datetime import time import concurrent.futures -import re def main(args): @@ -19,26 +16,26 @@ def main(args): dbName = "archSpeechReco" colName = "moviesMeta" global col - col = getMongoCollection(colName,dbName,mongoUri) + col = get_mongo_collection(colName, dbName, mongoUri) batch_size = int(args.batch_size) - waves = getWavList(col,batch_size) - uris = [ w['gcsWawLocation'] for w in waves ] + waves = getWavList(col, batch_size) + uris = [w['gcsWawLocation'] for w in waves] start = time.perf_counter() with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor: executor.map(run_reco, uris) stop = time.perf_counter() - print(f'Finished in {round(stop-start, 2)} seconds') + print(f'Finished in {round(stop - start, 2)} seconds') def run_reco(uri): reco = recognize(uri) recoDict = MessageToDict(reco) - + if (len(recoDict) != 0): words = recoDict["results"][-1]["alternatives"][0]["words"] - transcript = "".join( [ trans["alternatives"][0]["transcript"] for trans in recoDict["results"][:-1] ] ) + transcript = "".join([trans["alternatives"][0]["transcript"] for trans in recoDict["results"][:-1]]) elif (len(recoDict) == 0): words = {} transcript = "film niemy" @@ -46,12 +43,13 @@ def run_reco(uri): now = datetime.datetime.now() try: col.update_one( - {"_id": ObjectId(uri.split('/')[4].split('.')[0])}, - {"$set":{"gcTextReco.transcript":transcript, - "gcTextReco.words":words, - "gcTextReco.transcripted":now.strftime("%Y-%m-%d %H:%M:%S")}} + {"_id": ObjectId(uri.split('/')[4].split('.')[0])}, + {"$set": {"gcTextReco.transcript": transcript, + "gcTextReco.words": words, + "gcTextReco.transcripted": now.strftime("%Y-%m-%d %H:%M:%S")}} ) - except Exception as e: print(e) + except Exception as e: + print(e) else: print(f"mongo update OK {uri.split('/')[4].split('.')[0]}") @@ -65,7 +63,7 @@ def recognize(storage_uri): storage_uri URI for audio file in Cloud Storage, e.g. gs://[BUCKET]/[FILE] """ - #client = speech_v1.SpeechClient() + # client = speech_v1.SpeechClient() client = speech_v1p1beta1.SpeechClient() # storage_uri = 'gs://cloud-samples-data/speech/brooklyn_bridge.raw' @@ -79,21 +77,21 @@ def recognize(storage_uri): # This field is optional for FLAC and WAV audio formats. encoding = enums.RecognitionConfig.AudioEncoding.LINEAR16 enable_speaker_diarization = True - #config = { - #"sample_rate_hertz": sample_rate_hertz, + # config = { + # "sample_rate_hertz": sample_rate_hertz, # "language_code": language_code, # "encoding": encoding, # "enableSpeakerDiarization": enable_speaker_diarization # d_config = types.SpeakerDiarizationConfig( - enable_speaker_diarization=True - ) + enable_speaker_diarization=True + ) config = types.RecognitionConfig( - encoding = enums.RecognitionConfig.AudioEncoding.LINEAR16, - sample_rate_hertz = 44100, - language_code = "pl-PL", - diarization_config=d_config - ) + encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16, + sample_rate_hertz=44100, + language_code="pl-PL", + diarization_config=d_config + ) audio = {"uri": storage_uri} @@ -105,31 +103,31 @@ def recognize(storage_uri): return response -def getMongoCollection(colName,dbName,uri): - client = MongoClient(uri,maxPoolSize=512) +def getMongoCollection(colName, dbName, uri): + client = MongoClient(uri, maxPoolSize=512) db = client[dbName] col = db[colName] - + return col -def getWavList(col,limit=32): +def getWavList(col, limit=32): pipeline = [] - #match phase, filetr documents withour gcTextReco field - voice not recognized - pipeline.append({"$match": {"$and":[ - {"gcTextReco": {"$exists": False}}, - {"gcsWav": {"$exists": True}}, - {"description.details.Format dźwięku": {"$ne": "brak"}} - ]} - } - ) - #project phase, show only bucket name: gcsWav.location + # match phase, filetr documents withour gcTextReco field - voice not recognized + pipeline.append({"$match": {"$and": [ + {"gcTextReco": {"$exists": False}}, + {"gcsWav": {"$exists": True}}, + {"description.details.Format dźwięku": {"$ne": "brak"}} + ]} + } + ) + # project phase, show only bucket name: gcsWav.location pipeline.append({"$project": { - "gcsWawLocation": { "$concat": [ "gs://archspeechreco/","$gcsWav.location" ] } - } - }) - #fetch only N documents - pipeline.append({"$limit":limit}) + "gcsWawLocation": {"$concat": ["gs://archspeechreco/", "$gcsWav.location"]} + } + }) + # fetch only N documents + pipeline.append({"$limit": limit}) return col.aggregate(pipeline) diff --git a/src/storageUpload.py b/src/storageUpload.py index 6f7048d7..74def98b 100644 --- a/src/storageUpload.py +++ b/src/storageUpload.py @@ -1,12 +1,14 @@ from google.cloud import storage -import sys import urllib -from pymongo import MongoClient +from src.mongo.helpers import get_mongo_collection from bson.objectid import ObjectId import os import datetime -from subprocess import run,DEVNULL +from subprocess import run, DEVNULL, CalledProcessError import argparse +from urllib.error import URLError, HTTPError, ContentTooShortError +import logging + def main(args): uri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco" @@ -14,51 +16,52 @@ def main(args): colName = "moviesMeta" bucket = 'archspeechreco' - col = getMongoCollection(colName,dbName,uri) + col = get_mongo_collection(colName, dbName, uri) fileFormat = args.format if (fileFormat == 'mp4'): - uploadMp4(col,bucket) - elif (fileFormat == 'wav'): - uploadWave(col,bucket) + upload_mp4(col, bucket) + elif (fileFormat == 'wav'): + upload_wave(col, bucket) -def uploadMp4(col,bucket): - toUpload = getUploadList(col) - for i in toUpload: - fileName = ObjectId(i['_id']) - getVid( i['url'], ObjectId( i['_id'] ) ) - upload_blob(bucket, "{}.mp4".format(fileName), "mp4/{}.mp4".format(fileName),col,"Mp4") +def upload_mp4(col, bucket): + to_upload = get_upload_list(col) + + for i in to_upload: + file_name = ObjectId(i['_id']) + get_vid(i['url'], ObjectId(i['_id'])) + upload_blob(bucket, "{}.mp4".format(file_name), "mp4/{}.mp4".format(file_name), col, "Mp4") try: - os.remove("{}.mp4".format(fileName)) + os.remove("{}.mp4".format(file_name)) except: - print("{}.mp4 has NOT been removed".format(fileName)) + print("{}.mp4 has NOT been removed".format(file_name)) else: - print("{}.mp4 has been removed".format(fileName)) + print("{}.mp4 has been removed".format(file_name)) -def uploadWave(col,bucket): - toUpload = getWavUploadList(col) +def upload_wave(col, bucket): + to_upload = get_wav_upload_list(col) - for i in toUpload: - fileName = ObjectId(i['_id']) - getVid( i['url'], ObjectId( i['_id'] ) ) - getWave("{}.mp4".format(fileName)) - upload_blob(bucket, "{}.wav".format(fileName), "wave/{}.wav".format(fileName),col,"Wav") + for i in to_upload: + file_name = ObjectId(i['_id']) + get_vid(i['url'], ObjectId(i['_id'])) + get_wave("{}.mp4".format(file_name)) + upload_blob(bucket, "{}.wav".format(file_name), "wave/{}.wav".format(file_name), col, "Wav") try: - os.remove("{}.wav".format(fileName)) + os.remove("{}.wav".format(file_name)) except: - print("{}.wav has NOT been removed".format(fileName)) + print("{}.wav has NOT been removed".format(file_name)) else: - print("{}.wav has been removed".format(fileName)) + print("{}.wav has been removed".format(file_name)) -def upload_blob(bucket_name, source_file_name, destination_blob_name,col,fileFormat): +def upload_blob(bucket_name, source_file_name, destination_blob_name, col, file_format): """Uploads a file to the bucket.""" storage_client = storage.Client() bucket = storage_client.get_bucket(bucket_name) blob = bucket.blob(destination_blob_name) - + try: blob.upload_from_filename(source_file_name) except: @@ -66,90 +69,87 @@ def upload_blob(bucket_name, source_file_name, destination_blob_name,col,fileFor else: print('File {}.{} uploaded to {}.'.format( source_file_name, - fileFormat, + file_format, destination_blob_name)) now = datetime.datetime.now() try: col.update_one( - {"_id": ObjectId(source_file_name.split('.')[0])}, - {"$set":{ - "gcs{}".format(fileFormat):{ - "location":destination_blob_name, - "uploadDate":now.strftime("%Y-%m-%d %H:%M:%S") - } - } - } - ) + {"_id": ObjectId(source_file_name.split('.')[0])}, + {"$set": { + "gcs{}".format(file_format): { + "location": destination_blob_name, + "uploadDate": now.strftime("%Y-%m-%d %H:%M:%S") + } + } + } + ) except: print("mongo update failed") else: print("mongo update OK") -def getMongoCollection(colName,dbName,uri): - client = MongoClient(uri) - db = client[dbName] - col = db[colName] - - return col +def get_upload_list(col): + pipeline = [{"$match": { + "gcsMp4": {"$exists": False} + } + }, {"$project": { + "url": {"$concat": ["http://repozytorium.fn.org.pl/", {"$arrayElemAt": ["$mp4", 0]}]} + } + }] + # $match phase, filetr documents withour gcs field - movies not uploaded to gcs + # project phase, show only url and _id keys + # skip first N documents + # pipeline.append({"$skip":362}) + # fetch only N documents + # pipeline.append({"$limit":20}) - -def getUploadList(col): - pipeline = [] - #$match phase, filetr documents withour gcs field - movies not uploaded to gcs - pipeline.append({"$match": { - "gcsMp4": {"$exists": False} - } - }) - #project phase, show only url and _id keys - pipeline.append({"$project": { - "url": { "$concat": [ "http://repozytorium.fn.org.pl/",{"$arrayElemAt": [ "$mp4",0 ]}] } - } - }) - #skip first N documents - #pipeline.append({"$skip":362}) - #fetch only N documents - #pipeline.append({"$limit":20}) - return col.aggregate(pipeline) -def getWavUploadList(col): - pipeline = [] - #$match phase, filetr documents withour gcs field - movies not uploaded to gcs - pipeline.append({"$match": { - "gcsWav": {"$exists": False} - } - }) - #project phase, show only url and _id keys - pipeline.append({"$project": { - "url": { "$concat": [ "http://repozytorium.fn.org.pl/",{"$arrayElemAt": [ "$mp4",0 ]}] } - } - }) - #skip first N documents - #pipeline.append({"$skip":362}) - #fetch only N documents - #pipeline.append({"$limit":500}) - return col.aggregate(pipeline) +def get_wav_upload_list(col): + pipeline = [{"$match": { + "gcsWav": {"$exists": False} + } + }, {"$project": { + "url": {"$concat": ["http://repozytorium.fn.org.pl/", {"$arrayElemAt": ["$mp4", 0]}]} + } + }] + # $match phase, filter documents without gcs field - movies not uploaded to gcs + # project phase, show only url and _id keys + # skip first N documents + # pipeline.append({"$skip":362}) + # fetch only N documents + # pipeline.append({"$limit":500}) + + return col.aggregate(pipeline) -def getVid(url,out): +def get_vid(url, out): try: urllib.request.urlretrieve(url, "{}.mp4".format(out)) - except: - print("wrong URL, can't download") - - -def getWave(filename): - try: - run(['ffmpeg','-i', filename, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '1', filename.replace("mp4","wav")],stdout=DEVNULL) - except: - print("problem with ffmpeg") + except URLError as e: + print("can't download, {}".format(e.reason)) + except HTTPError as e: + print("reason:{}, Http code: {}", format(e.reason, e.code)) + except ContentTooShortError: + print("content too short error") else: + print("file {}.mp4 has been downloaded from {}".format(out, url)) + + +def get_wave(filename): + try: + run(['ffmpeg', '-i', filename, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '1', + filename.replace("mp4", "wav")], stdout=DEVNULL, check=True) + except CalledProcessError as e: + print("problem with ffmpeg, {} exited with {} code".format(e.cmd, e.returncode)) + else: + print("file {} has been decoded to waw format".format(filename)) try: os.remove(filename) - except: - print("{} has NOT been removed".format(filename)) + except OSError as e: + print("{} has NOT been removed, {}".format(filename, e.strerror)) else: print("{} has been removed".format(filename)) @@ -159,4 +159,3 @@ if __name__ == '__main__': parser.add_argument("--format", default='mp4', help="format to fetch and upload, [mp4, wav]") args = parser.parse_args() main(args) - diff --git a/src/temp b/src/temp index 2ade2f2c..f2a38c9c 100644 --- a/src/temp +++ b/src/temp @@ -2,14 +2,14 @@ mongoUri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco" dbName = "archSpeechReco" colName = "moviesMeta" -def getMongoCollection(colName,dbName,uri): +def get_mongo_collection(colName,dbName,uri): client = MongoClient(uri,maxPoolSize=512) db = client[dbName] col = db[colName] return col -col = getMongoCollection(colName,dbName,mongoUri) +col = get_mongo_collection(colName,dbName,mongoUri) col.aggregate(pipeline) @@ -42,3 +42,11 @@ var pipeline = [{"$match":{"gcTextReco": {"$exists": true}}}, {" var pipeline = [{"$match":{"gcTextReco": {"$exists": true}}}, {"$project": {"_id":0, "durationStart": { "$concat": ["$description.date", "T00:00:00Z"] }, "wid":{ "$concat": ["Filmoteka_", {"$toString": "$_id"} ] }, "creator": { $ifNull: [ "$description.details.Produkcja", "null" ]}, "originalDate": "$description.date", "contents": ["$gcTextReco.transcript_fix"], "url":"$url", "title":"$title", "durationEnd": { "$concat": ["$description.date", "T23:59:59Z"] }}}, {$out: "export3"} ] db.moviesMeta.aggregate(pipeline) + +var pipeline = [ {$match: {$and: [ {"hash": /[abcd]0$/}, {"gcsWav.location": {"$exists": 1}}, {"gcTextReco.transcript_fix": {"$not": /^$/}} ] }}, + {$project: {"_id":0, "hash":1, "plik":{ "$substr": ["$gcsWav.location", 5, -1]}, "opis": "$description.desc", "transkrypcja": "$gcTextReco.transcript_fix"}}, + {$out: "sample100"} + ] + + +5df3e63d4c0402698d7844e3 \ No newline at end of file