archSpeechReco/src/storageUpload.py

163 lines
5.0 KiB
Python

from google.cloud import storage
import sys
import urllib
from pymongo import MongoClient
from bson.objectid import ObjectId
import os
import datetime
from subprocess import run,DEVNULL
import argparse
def main(args):
uri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco"
dbName = "archSpeechReco"
colName = "moviesMeta"
bucket = 'archspeechreco'
col = getMongoCollection(colName,dbName,uri)
fileFormat = args.format
if (fileFormat == 'mp4'):
uploadMp4(col,bucket)
elif (fileFormat == 'wav'):
uploadWave(col,bucket)
def uploadMp4(col,bucket):
toUpload = getUploadList(col)
for i in toUpload:
fileName = ObjectId(i['_id'])
getVid( i['url'], ObjectId( i['_id'] ) )
upload_blob(bucket, "{}.mp4".format(fileName), "mp4/{}.mp4".format(fileName),col,"Mp4")
try:
os.remove("{}.mp4".format(fileName))
except:
print("{}.mp4 has NOT been removed".format(fileName))
else:
print("{}.mp4 has been removed".format(fileName))
def uploadWave(col,bucket):
toUpload = getWavUploadList(col)
for i in toUpload:
fileName = ObjectId(i['_id'])
getVid( i['url'], ObjectId( i['_id'] ) )
getWave("{}.mp4".format(fileName))
upload_blob(bucket, "{}.wav".format(fileName), "wave/{}.wav".format(fileName),col,"Wav")
try:
os.remove("{}.wav".format(fileName))
except:
print("{}.wav has NOT been removed".format(fileName))
else:
print("{}.wav has been removed".format(fileName))
def upload_blob(bucket_name, source_file_name, destination_blob_name,col,fileFormat):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
try:
blob.upload_from_filename(source_file_name)
except:
print("gcs upload failed")
else:
print('File {}.{} uploaded to {}.'.format(
source_file_name,
fileFormat,
destination_blob_name))
now = datetime.datetime.now()
try:
col.update_one(
{"_id": ObjectId(source_file_name.split('.')[0])},
{"$set":{
"gcs{}".format(fileFormat):{
"location":destination_blob_name,
"uploadDate":now.strftime("%Y-%m-%d %H:%M:%S")
}
}
}
)
except:
print("mongo update failed")
else:
print("mongo update OK")
def getMongoCollection(colName,dbName,uri):
client = MongoClient(uri)
db = client[dbName]
col = db[colName]
return col
def getUploadList(col):
pipeline = []
#$match phase, filetr documents withour gcs field - movies not uploaded to gcs
pipeline.append({"$match": {
"gcsMp4": {"$exists": False}
}
})
#project phase, show only url and _id keys
pipeline.append({"$project": {
"url": { "$concat": [ "http://repozytorium.fn.org.pl/",{"$arrayElemAt": [ "$mp4",0 ]}] }
}
})
#skip first N documents
#pipeline.append({"$skip":362})
#fetch only N documents
#pipeline.append({"$limit":20})
return col.aggregate(pipeline)
def getWavUploadList(col):
pipeline = []
#$match phase, filetr documents withour gcs field - movies not uploaded to gcs
pipeline.append({"$match": {
"gcsWav": {"$exists": False}
}
})
#project phase, show only url and _id keys
pipeline.append({"$project": {
"url": { "$concat": [ "http://repozytorium.fn.org.pl/",{"$arrayElemAt": [ "$mp4",0 ]}] }
}
})
#skip first N documents
#pipeline.append({"$skip":362})
#fetch only N documents
#pipeline.append({"$limit":500})
return col.aggregate(pipeline)
def getVid(url,out):
try:
urllib.request.urlretrieve(url, "{}.mp4".format(out))
except:
print("wrong URL, can't download")
def getWave(filename):
try:
run(['ffmpeg','-i', filename, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '1', filename.replace("mp4","wav")],stdout=DEVNULL)
except:
print("problem with ffmpeg")
else:
try:
os.remove(filename)
except:
print("{} has NOT been removed".format(filename))
else:
print("{} has been removed".format(filename))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='GCS uploader')
parser.add_argument("--format", default='mp4', help="format to fetch and upload, [mp4, wav]")
args = parser.parse_args()
main(args)