from google.cloud import storage import urllib from mongo.helpers import get_mongo_collection from bson.objectid import ObjectId import os import datetime from subprocess import run, DEVNULL, CalledProcessError import argparse from urllib.error import URLError, HTTPError, ContentTooShortError import logging def main(args): uri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco" dbName = "archSpeechReco" colName = "moviesMeta" bucket = 'archspeechreco' col = get_mongo_collection(colName, dbName, uri) fileFormat = args.format if (fileFormat == 'mp4'): upload_mp4(col, bucket) elif (fileFormat == 'wav'): upload_wave(col, bucket) def upload_mp4(col, bucket): to_upload = get_upload_list(col) for i in to_upload: file_name = ObjectId(i['_id']) get_vid(i['url'], ObjectId(i['_id'])) upload_blob(bucket, "{}.mp4".format(file_name), "mp4/{}.mp4".format(file_name), col, "Mp4") try: os.remove("{}.mp4".format(file_name)) except: print("{}.mp4 has NOT been removed".format(file_name)) else: print("{}.mp4 has been removed".format(file_name)) def upload_wave(col, bucket): to_upload = get_wav_upload_list(col) for i in to_upload: file_name = ObjectId(i['_id']) get_vid(i['url'], ObjectId(i['_id'])) get_wave("{}.mp4".format(file_name)) upload_blob(bucket, "{}.wav".format(file_name), "wave/{}.wav".format(file_name), col, "Wav") try: os.remove("{}.wav".format(file_name)) except: print("{}.wav has NOT been removed".format(file_name)) else: print("{}.wav has been removed".format(file_name)) def upload_blob(bucket_name, source_file_name, destination_blob_name, col, file_format): """Uploads a file to the bucket.""" storage_client = storage.Client() bucket = storage_client.get_bucket(bucket_name) blob = bucket.blob(destination_blob_name) try: blob.upload_from_filename(source_file_name) except: print("gcs upload failed") else: print('File {}.{} uploaded to {}.'.format( source_file_name, file_format, destination_blob_name)) now = datetime.datetime.now() try: col.update_one( {"_id": ObjectId(source_file_name.split('.')[0])}, {"$set": { "gcs{}".format(file_format): { "location": destination_blob_name, "uploadDate": now.strftime("%Y-%m-%d %H:%M:%S") } } } ) except: print("mongo update failed") else: print("mongo update OK") def get_upload_list(col): pipeline = [{"$match": { "gcsMp4": {"$exists": False} } }, {"$project": { "url": {"$concat": ["http://repozytorium.fn.org.pl/", {"$arrayElemAt": ["$mp4", 0]}]} } }] # $match phase, filetr documents withour gcs field - movies not uploaded to gcs # project phase, show only url and _id keys # skip first N documents # pipeline.append({"$skip":362}) # fetch only N documents # pipeline.append({"$limit":20}) return col.aggregate(pipeline) def get_wav_upload_list(col): pipeline = [{"$match": { "gcsWav": {"$exists": False} } }, {"$project": { "url": {"$concat": ["http://repozytorium.fn.org.pl/", {"$arrayElemAt": ["$mp4", 0]}]} } }] # $match phase, filter documents without gcs field - movies not uploaded to gcs # project phase, show only url and _id keys # skip first N documents # pipeline.append({"$skip":362}) # fetch only N documents # pipeline.append({"$limit":500}) return col.aggregate(pipeline) def get_vid(url, out): try: urllib.request.urlretrieve(url, "{}.mp4".format(out)) except URLError as e: print("can't download, {}".format(e.reason)) except HTTPError as e: print("reason:{}, Http code: {}", format(e.reason, e.code)) except ContentTooShortError: print("content too short error") else: print("file {}.mp4 has been downloaded from {}".format(out, url)) def get_wave(filename): try: run(['ffmpeg', '-i', filename, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '1', filename.replace("mp4", "wav")], stdout=DEVNULL, check=True) except CalledProcessError as e: print("problem with ffmpeg, {} exited with {} code".format(e.cmd, e.returncode)) else: print("file {} has been decoded to waw format".format(filename)) try: os.remove(filename) except OSError as e: print("{} has NOT been removed, {}".format(filename, e.strerror)) else: print("{} has been removed".format(filename)) if __name__ == '__main__': parser = argparse.ArgumentParser(description='GCS uploader') parser.add_argument("--format", default='mp4', help="format to fetch and upload, [mp4, wav]") args = parser.parse_args() main(args)