prawilny refaktor początek

This commit is contained in:
Wojciech Smolak 2020-06-06 00:56:55 +02:00
parent 849e2cf21c
commit f996eb58ed
7 changed files with 153 additions and 138 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
.idea
__pycache__ __pycache__

0
src/__init__.py Normal file
View File

0
src/mongo/__init__.py Normal file
View File

9
src/mongo/helpers.py Normal file
View File

@ -0,0 +1,9 @@
from pymongo import MongoClient
def get_mongo_collection(col_name, db_name, uri):
client = MongoClient(uri)
db = client[db_name]
col = db[col_name]
return col

View File

@ -1,17 +1,14 @@
#from google.cloud import speech_v1
from google.cloud import speech_v1p1beta1 from google.cloud import speech_v1p1beta1
from google.cloud.speech_v1p1beta1 import enums from google.cloud.speech_v1p1beta1 import enums
from google.cloud.speech_v1p1beta1 import types from google.cloud.speech_v1p1beta1 import types
from pymongo import MongoClient from pymongo import MongoClient
import json
import argparse import argparse
from google.protobuf.json_format import MessageToJson,MessageToDict from google.protobuf.json_format import MessageToDict
from storageUpload import getMongoCollection from src.mongo.helpers import get_mongo_collection
from bson.objectid import ObjectId from bson.objectid import ObjectId
import datetime import datetime
import time import time
import concurrent.futures import concurrent.futures
import re
def main(args): def main(args):
@ -19,26 +16,26 @@ def main(args):
dbName = "archSpeechReco" dbName = "archSpeechReco"
colName = "moviesMeta" colName = "moviesMeta"
global col global col
col = getMongoCollection(colName,dbName,mongoUri) col = get_mongo_collection(colName, dbName, mongoUri)
batch_size = int(args.batch_size) batch_size = int(args.batch_size)
waves = getWavList(col,batch_size) waves = getWavList(col, batch_size)
uris = [ w['gcsWawLocation'] for w in waves ] uris = [w['gcsWawLocation'] for w in waves]
start = time.perf_counter() start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=64) as executor:
executor.map(run_reco, uris) executor.map(run_reco, uris)
stop = time.perf_counter() stop = time.perf_counter()
print(f'Finished in {round(stop-start, 2)} seconds') print(f'Finished in {round(stop - start, 2)} seconds')
def run_reco(uri): def run_reco(uri):
reco = recognize(uri) reco = recognize(uri)
recoDict = MessageToDict(reco) recoDict = MessageToDict(reco)
if (len(recoDict) != 0): if (len(recoDict) != 0):
words = recoDict["results"][-1]["alternatives"][0]["words"] words = recoDict["results"][-1]["alternatives"][0]["words"]
transcript = "".join( [ trans["alternatives"][0]["transcript"] for trans in recoDict["results"][:-1] ] ) transcript = "".join([trans["alternatives"][0]["transcript"] for trans in recoDict["results"][:-1]])
elif (len(recoDict) == 0): elif (len(recoDict) == 0):
words = {} words = {}
transcript = "film niemy" transcript = "film niemy"
@ -46,12 +43,13 @@ def run_reco(uri):
now = datetime.datetime.now() now = datetime.datetime.now()
try: try:
col.update_one( col.update_one(
{"_id": ObjectId(uri.split('/')[4].split('.')[0])}, {"_id": ObjectId(uri.split('/')[4].split('.')[0])},
{"$set":{"gcTextReco.transcript":transcript, {"$set": {"gcTextReco.transcript": transcript,
"gcTextReco.words":words, "gcTextReco.words": words,
"gcTextReco.transcripted":now.strftime("%Y-%m-%d %H:%M:%S")}} "gcTextReco.transcripted": now.strftime("%Y-%m-%d %H:%M:%S")}}
) )
except Exception as e: print(e) except Exception as e:
print(e)
else: else:
print(f"mongo update OK {uri.split('/')[4].split('.')[0]}") print(f"mongo update OK {uri.split('/')[4].split('.')[0]}")
@ -65,7 +63,7 @@ def recognize(storage_uri):
storage_uri URI for audio file in Cloud Storage, e.g. gs://[BUCKET]/[FILE] storage_uri URI for audio file in Cloud Storage, e.g. gs://[BUCKET]/[FILE]
""" """
#client = speech_v1.SpeechClient() # client = speech_v1.SpeechClient()
client = speech_v1p1beta1.SpeechClient() client = speech_v1p1beta1.SpeechClient()
# storage_uri = 'gs://cloud-samples-data/speech/brooklyn_bridge.raw' # storage_uri = 'gs://cloud-samples-data/speech/brooklyn_bridge.raw'
@ -79,21 +77,21 @@ def recognize(storage_uri):
# This field is optional for FLAC and WAV audio formats. # This field is optional for FLAC and WAV audio formats.
encoding = enums.RecognitionConfig.AudioEncoding.LINEAR16 encoding = enums.RecognitionConfig.AudioEncoding.LINEAR16
enable_speaker_diarization = True enable_speaker_diarization = True
#config = { # config = {
#"sample_rate_hertz": sample_rate_hertz, # "sample_rate_hertz": sample_rate_hertz,
# "language_code": language_code, # "language_code": language_code,
# "encoding": encoding, # "encoding": encoding,
# "enableSpeakerDiarization": enable_speaker_diarization # "enableSpeakerDiarization": enable_speaker_diarization
# #
d_config = types.SpeakerDiarizationConfig( d_config = types.SpeakerDiarizationConfig(
enable_speaker_diarization=True enable_speaker_diarization=True
) )
config = types.RecognitionConfig( config = types.RecognitionConfig(
encoding = enums.RecognitionConfig.AudioEncoding.LINEAR16, encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz = 44100, sample_rate_hertz=44100,
language_code = "pl-PL", language_code="pl-PL",
diarization_config=d_config diarization_config=d_config
) )
audio = {"uri": storage_uri} audio = {"uri": storage_uri}
@ -105,31 +103,31 @@ def recognize(storage_uri):
return response return response
def getMongoCollection(colName,dbName,uri): def getMongoCollection(colName, dbName, uri):
client = MongoClient(uri,maxPoolSize=512) client = MongoClient(uri, maxPoolSize=512)
db = client[dbName] db = client[dbName]
col = db[colName] col = db[colName]
return col return col
def getWavList(col,limit=32): def getWavList(col, limit=32):
pipeline = [] pipeline = []
#match phase, filetr documents withour gcTextReco field - voice not recognized # match phase, filetr documents withour gcTextReco field - voice not recognized
pipeline.append({"$match": {"$and":[ pipeline.append({"$match": {"$and": [
{"gcTextReco": {"$exists": False}}, {"gcTextReco": {"$exists": False}},
{"gcsWav": {"$exists": True}}, {"gcsWav": {"$exists": True}},
{"description.details.Format dźwięku": {"$ne": "brak"}} {"description.details.Format dźwięku": {"$ne": "brak"}}
]} ]}
} }
) )
#project phase, show only bucket name: gcsWav.location # project phase, show only bucket name: gcsWav.location
pipeline.append({"$project": { pipeline.append({"$project": {
"gcsWawLocation": { "$concat": [ "gs://archspeechreco/","$gcsWav.location" ] } "gcsWawLocation": {"$concat": ["gs://archspeechreco/", "$gcsWav.location"]}
} }
}) })
#fetch only N documents # fetch only N documents
pipeline.append({"$limit":limit}) pipeline.append({"$limit": limit})
return col.aggregate(pipeline) return col.aggregate(pipeline)

View File

@ -1,12 +1,14 @@
from google.cloud import storage from google.cloud import storage
import sys
import urllib import urllib
from pymongo import MongoClient from src.mongo.helpers import get_mongo_collection
from bson.objectid import ObjectId from bson.objectid import ObjectId
import os import os
import datetime import datetime
from subprocess import run,DEVNULL from subprocess import run, DEVNULL, CalledProcessError
import argparse import argparse
from urllib.error import URLError, HTTPError, ContentTooShortError
import logging
def main(args): def main(args):
uri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco" uri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco"
@ -14,51 +16,52 @@ def main(args):
colName = "moviesMeta" colName = "moviesMeta"
bucket = 'archspeechreco' bucket = 'archspeechreco'
col = getMongoCollection(colName,dbName,uri) col = get_mongo_collection(colName, dbName, uri)
fileFormat = args.format fileFormat = args.format
if (fileFormat == 'mp4'): if (fileFormat == 'mp4'):
uploadMp4(col,bucket) upload_mp4(col, bucket)
elif (fileFormat == 'wav'): elif (fileFormat == 'wav'):
uploadWave(col,bucket) upload_wave(col, bucket)
def uploadMp4(col,bucket):
toUpload = getUploadList(col)
for i in toUpload: def upload_mp4(col, bucket):
fileName = ObjectId(i['_id']) to_upload = get_upload_list(col)
getVid( i['url'], ObjectId( i['_id'] ) )
upload_blob(bucket, "{}.mp4".format(fileName), "mp4/{}.mp4".format(fileName),col,"Mp4") for i in to_upload:
file_name = ObjectId(i['_id'])
get_vid(i['url'], ObjectId(i['_id']))
upload_blob(bucket, "{}.mp4".format(file_name), "mp4/{}.mp4".format(file_name), col, "Mp4")
try: try:
os.remove("{}.mp4".format(fileName)) os.remove("{}.mp4".format(file_name))
except: except:
print("{}.mp4 has NOT been removed".format(fileName)) print("{}.mp4 has NOT been removed".format(file_name))
else: else:
print("{}.mp4 has been removed".format(fileName)) print("{}.mp4 has been removed".format(file_name))
def uploadWave(col,bucket): def upload_wave(col, bucket):
toUpload = getWavUploadList(col) to_upload = get_wav_upload_list(col)
for i in toUpload: for i in to_upload:
fileName = ObjectId(i['_id']) file_name = ObjectId(i['_id'])
getVid( i['url'], ObjectId( i['_id'] ) ) get_vid(i['url'], ObjectId(i['_id']))
getWave("{}.mp4".format(fileName)) get_wave("{}.mp4".format(file_name))
upload_blob(bucket, "{}.wav".format(fileName), "wave/{}.wav".format(fileName),col,"Wav") upload_blob(bucket, "{}.wav".format(file_name), "wave/{}.wav".format(file_name), col, "Wav")
try: try:
os.remove("{}.wav".format(fileName)) os.remove("{}.wav".format(file_name))
except: except:
print("{}.wav has NOT been removed".format(fileName)) print("{}.wav has NOT been removed".format(file_name))
else: else:
print("{}.wav has been removed".format(fileName)) print("{}.wav has been removed".format(file_name))
def upload_blob(bucket_name, source_file_name, destination_blob_name,col,fileFormat): def upload_blob(bucket_name, source_file_name, destination_blob_name, col, file_format):
"""Uploads a file to the bucket.""" """Uploads a file to the bucket."""
storage_client = storage.Client() storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name) bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(destination_blob_name) blob = bucket.blob(destination_blob_name)
try: try:
blob.upload_from_filename(source_file_name) blob.upload_from_filename(source_file_name)
except: except:
@ -66,90 +69,87 @@ def upload_blob(bucket_name, source_file_name, destination_blob_name,col,fileFor
else: else:
print('File {}.{} uploaded to {}.'.format( print('File {}.{} uploaded to {}.'.format(
source_file_name, source_file_name,
fileFormat, file_format,
destination_blob_name)) destination_blob_name))
now = datetime.datetime.now() now = datetime.datetime.now()
try: try:
col.update_one( col.update_one(
{"_id": ObjectId(source_file_name.split('.')[0])}, {"_id": ObjectId(source_file_name.split('.')[0])},
{"$set":{ {"$set": {
"gcs{}".format(fileFormat):{ "gcs{}".format(file_format): {
"location":destination_blob_name, "location": destination_blob_name,
"uploadDate":now.strftime("%Y-%m-%d %H:%M:%S") "uploadDate": now.strftime("%Y-%m-%d %H:%M:%S")
} }
} }
} }
) )
except: except:
print("mongo update failed") print("mongo update failed")
else: else:
print("mongo update OK") print("mongo update OK")
def getMongoCollection(colName,dbName,uri): def get_upload_list(col):
client = MongoClient(uri) pipeline = [{"$match": {
db = client[dbName] "gcsMp4": {"$exists": False}
col = db[colName] }
}, {"$project": {
return col "url": {"$concat": ["http://repozytorium.fn.org.pl/", {"$arrayElemAt": ["$mp4", 0]}]}
}
}]
# $match phase, filetr documents withour gcs field - movies not uploaded to gcs
# project phase, show only url and _id keys
# skip first N documents
# pipeline.append({"$skip":362})
# fetch only N documents
# pipeline.append({"$limit":20})
def getUploadList(col):
pipeline = []
#$match phase, filetr documents withour gcs field - movies not uploaded to gcs
pipeline.append({"$match": {
"gcsMp4": {"$exists": False}
}
})
#project phase, show only url and _id keys
pipeline.append({"$project": {
"url": { "$concat": [ "http://repozytorium.fn.org.pl/",{"$arrayElemAt": [ "$mp4",0 ]}] }
}
})
#skip first N documents
#pipeline.append({"$skip":362})
#fetch only N documents
#pipeline.append({"$limit":20})
return col.aggregate(pipeline) return col.aggregate(pipeline)
def getWavUploadList(col):
pipeline = []
#$match phase, filetr documents withour gcs field - movies not uploaded to gcs
pipeline.append({"$match": {
"gcsWav": {"$exists": False}
}
})
#project phase, show only url and _id keys
pipeline.append({"$project": {
"url": { "$concat": [ "http://repozytorium.fn.org.pl/",{"$arrayElemAt": [ "$mp4",0 ]}] }
}
})
#skip first N documents
#pipeline.append({"$skip":362})
#fetch only N documents
#pipeline.append({"$limit":500})
return col.aggregate(pipeline) def get_wav_upload_list(col):
pipeline = [{"$match": {
"gcsWav": {"$exists": False}
}
}, {"$project": {
"url": {"$concat": ["http://repozytorium.fn.org.pl/", {"$arrayElemAt": ["$mp4", 0]}]}
}
}]
# $match phase, filter documents without gcs field - movies not uploaded to gcs
# project phase, show only url and _id keys
# skip first N documents
# pipeline.append({"$skip":362})
# fetch only N documents
# pipeline.append({"$limit":500})
return col.aggregate(pipeline)
def getVid(url,out): def get_vid(url, out):
try: try:
urllib.request.urlretrieve(url, "{}.mp4".format(out)) urllib.request.urlretrieve(url, "{}.mp4".format(out))
except: except URLError as e:
print("wrong URL, can't download") print("can't download, {}".format(e.reason))
except HTTPError as e:
print("reason:{}, Http code: {}", format(e.reason, e.code))
def getWave(filename): except ContentTooShortError:
try: print("content too short error")
run(['ffmpeg','-i', filename, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '1', filename.replace("mp4","wav")],stdout=DEVNULL)
except:
print("problem with ffmpeg")
else: else:
print("file {}.mp4 has been downloaded from {}".format(out, url))
def get_wave(filename):
try:
run(['ffmpeg', '-i', filename, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '1',
filename.replace("mp4", "wav")], stdout=DEVNULL, check=True)
except CalledProcessError as e:
print("problem with ffmpeg, {} exited with {} code".format(e.cmd, e.returncode))
else:
print("file {} has been decoded to waw format".format(filename))
try: try:
os.remove(filename) os.remove(filename)
except: except OSError as e:
print("{} has NOT been removed".format(filename)) print("{} has NOT been removed, {}".format(filename, e.strerror))
else: else:
print("{} has been removed".format(filename)) print("{} has been removed".format(filename))
@ -159,4 +159,3 @@ if __name__ == '__main__':
parser.add_argument("--format", default='mp4', help="format to fetch and upload, [mp4, wav]") parser.add_argument("--format", default='mp4', help="format to fetch and upload, [mp4, wav]")
args = parser.parse_args() args = parser.parse_args()
main(args) main(args)

View File

@ -2,14 +2,14 @@ mongoUri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco"
dbName = "archSpeechReco" dbName = "archSpeechReco"
colName = "moviesMeta" colName = "moviesMeta"
def getMongoCollection(colName,dbName,uri): def get_mongo_collection(colName,dbName,uri):
client = MongoClient(uri,maxPoolSize=512) client = MongoClient(uri,maxPoolSize=512)
db = client[dbName] db = client[dbName]
col = db[colName] col = db[colName]
return col return col
col = getMongoCollection(colName,dbName,mongoUri) col = get_mongo_collection(colName,dbName,mongoUri)
col.aggregate(pipeline) col.aggregate(pipeline)
@ -42,3 +42,11 @@ var pipeline = [{"$match":{"gcTextReco": {"$exists": true}}}, {"
var pipeline = [{"$match":{"gcTextReco": {"$exists": true}}}, {"$project": {"_id":0, "durationStart": { "$concat": ["$description.date", "T00:00:00Z"] }, "wid":{ "$concat": ["Filmoteka_", {"$toString": "$_id"} ] }, "creator": { $ifNull: [ "$description.details.Produkcja", "null" ]}, "originalDate": "$description.date", "contents": ["$gcTextReco.transcript_fix"], "url":"$url", "title":"$title", "durationEnd": { "$concat": ["$description.date", "T23:59:59Z"] }}}, {$out: "export3"} ] var pipeline = [{"$match":{"gcTextReco": {"$exists": true}}}, {"$project": {"_id":0, "durationStart": { "$concat": ["$description.date", "T00:00:00Z"] }, "wid":{ "$concat": ["Filmoteka_", {"$toString": "$_id"} ] }, "creator": { $ifNull: [ "$description.details.Produkcja", "null" ]}, "originalDate": "$description.date", "contents": ["$gcTextReco.transcript_fix"], "url":"$url", "title":"$title", "durationEnd": { "$concat": ["$description.date", "T23:59:59Z"] }}}, {$out: "export3"} ]
db.moviesMeta.aggregate(pipeline) db.moviesMeta.aggregate(pipeline)
var pipeline = [ {$match: {$and: [ {"hash": /[abcd]0$/}, {"gcsWav.location": {"$exists": 1}}, {"gcTextReco.transcript_fix": {"$not": /^$/}} ] }},
{$project: {"_id":0, "hash":1, "plik":{ "$substr": ["$gcsWav.location", 5, -1]}, "opis": "$description.desc", "transkrypcja": "$gcTextReco.transcript_fix"}},
{$out: "sample100"}
]
5df3e63d4c0402698d7844e3