162 lines
5.1 KiB
Python
162 lines
5.1 KiB
Python
from google.cloud import storage
|
|
import urllib
|
|
from mongo.helpers import get_mongo_collection
|
|
from bson.objectid import ObjectId
|
|
import os
|
|
import datetime
|
|
from subprocess import run, DEVNULL, CalledProcessError
|
|
import argparse
|
|
from urllib.error import URLError, HTTPError, ContentTooShortError
|
|
import logging
|
|
|
|
|
|
def main(args):
|
|
uri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco"
|
|
dbName = "archSpeechReco"
|
|
colName = "moviesMeta"
|
|
bucket = 'archspeechreco'
|
|
|
|
col = get_mongo_collection(colName, dbName, uri)
|
|
fileFormat = args.format
|
|
|
|
if (fileFormat == 'mp4'):
|
|
upload_mp4(col, bucket)
|
|
elif (fileFormat == 'wav'):
|
|
upload_wave(col, bucket)
|
|
|
|
|
|
def upload_mp4(col, bucket):
|
|
to_upload = get_upload_list(col)
|
|
|
|
for i in to_upload:
|
|
file_name = ObjectId(i['_id'])
|
|
get_vid(i['url'], ObjectId(i['_id']))
|
|
upload_blob(bucket, "{}.mp4".format(file_name), "mp4/{}.mp4".format(file_name), col, "Mp4")
|
|
try:
|
|
os.remove("{}.mp4".format(file_name))
|
|
except:
|
|
print("{}.mp4 has NOT been removed".format(file_name))
|
|
else:
|
|
print("{}.mp4 has been removed".format(file_name))
|
|
|
|
|
|
def upload_wave(col, bucket):
|
|
to_upload = get_wav_upload_list(col)
|
|
|
|
for i in to_upload:
|
|
file_name = ObjectId(i['_id'])
|
|
get_vid(i['url'], ObjectId(i['_id']))
|
|
get_wave("{}.mp4".format(file_name))
|
|
upload_blob(bucket, "{}.wav".format(file_name), "wave/{}.wav".format(file_name), col, "Wav")
|
|
try:
|
|
os.remove("{}.wav".format(file_name))
|
|
except:
|
|
print("{}.wav has NOT been removed".format(file_name))
|
|
else:
|
|
print("{}.wav has been removed".format(file_name))
|
|
|
|
|
|
def upload_blob(bucket_name, source_file_name, destination_blob_name, col, file_format):
|
|
"""Uploads a file to the bucket."""
|
|
storage_client = storage.Client()
|
|
bucket = storage_client.get_bucket(bucket_name)
|
|
blob = bucket.blob(destination_blob_name)
|
|
|
|
try:
|
|
blob.upload_from_filename(source_file_name)
|
|
except:
|
|
print("gcs upload failed")
|
|
else:
|
|
print('File {}.{} uploaded to {}.'.format(
|
|
source_file_name,
|
|
file_format,
|
|
destination_blob_name))
|
|
now = datetime.datetime.now()
|
|
try:
|
|
col.update_one(
|
|
{"_id": ObjectId(source_file_name.split('.')[0])},
|
|
{"$set": {
|
|
"gcs{}".format(file_format): {
|
|
"location": destination_blob_name,
|
|
"uploadDate": now.strftime("%Y-%m-%d %H:%M:%S")
|
|
}
|
|
}
|
|
}
|
|
)
|
|
except:
|
|
print("mongo update failed")
|
|
else:
|
|
print("mongo update OK")
|
|
|
|
|
|
def get_upload_list(col):
|
|
pipeline = [{"$match": {
|
|
"gcsMp4": {"$exists": False}
|
|
}
|
|
}, {"$project": {
|
|
"url": {"$concat": ["http://repozytorium.fn.org.pl/", {"$arrayElemAt": ["$mp4", 0]}]}
|
|
}
|
|
}]
|
|
# $match phase, filetr documents withour gcs field - movies not uploaded to gcs
|
|
# project phase, show only url and _id keys
|
|
# skip first N documents
|
|
# pipeline.append({"$skip":362})
|
|
# fetch only N documents
|
|
# pipeline.append({"$limit":20})
|
|
|
|
return col.aggregate(pipeline)
|
|
|
|
|
|
def get_wav_upload_list(col):
|
|
pipeline = [{"$match": {
|
|
"gcsWav": {"$exists": False}
|
|
}
|
|
}, {"$project": {
|
|
"url": {"$concat": ["http://repozytorium.fn.org.pl/", {"$arrayElemAt": ["$mp4", 0]}]}
|
|
}
|
|
}]
|
|
# $match phase, filter documents without gcs field - movies not uploaded to gcs
|
|
# project phase, show only url and _id keys
|
|
# skip first N documents
|
|
# pipeline.append({"$skip":362})
|
|
# fetch only N documents
|
|
# pipeline.append({"$limit":500})
|
|
|
|
return col.aggregate(pipeline)
|
|
|
|
|
|
def get_vid(url, out):
|
|
try:
|
|
urllib.request.urlretrieve(url, "{}.mp4".format(out))
|
|
except URLError as e:
|
|
print("can't download, {}".format(e.reason))
|
|
except HTTPError as e:
|
|
print("reason:{}, Http code: {}", format(e.reason, e.code))
|
|
except ContentTooShortError:
|
|
print("content too short error")
|
|
else:
|
|
print("file {}.mp4 has been downloaded from {}".format(out, url))
|
|
|
|
|
|
def get_wave(filename):
|
|
try:
|
|
run(['ffmpeg', '-i', filename, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '1',
|
|
filename.replace("mp4", "wav")], stdout=DEVNULL, check=True)
|
|
except CalledProcessError as e:
|
|
print("problem with ffmpeg, {} exited with {} code".format(e.cmd, e.returncode))
|
|
else:
|
|
print("file {} has been decoded to waw format".format(filename))
|
|
try:
|
|
os.remove(filename)
|
|
except OSError as e:
|
|
print("{} has NOT been removed, {}".format(filename, e.strerror))
|
|
else:
|
|
print("{} has been removed".format(filename))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(description='GCS uploader')
|
|
parser.add_argument("--format", default='mp4', help="format to fetch and upload, [mp4, wav]")
|
|
args = parser.parse_args()
|
|
main(args)
|