spiders + gcs uploader #1

Merged
s333949 merged 1 commits from include_spiders into master 2020-01-06 12:57:11 +01:00

View File

@ -5,22 +5,30 @@ from pymongo import MongoClient
from bson.objectid import ObjectId from bson.objectid import ObjectId
import os import os
import datetime import datetime
from subprocess import run,DEVNULL
import argparse
def main(args):
def main():
uri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco" uri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco"
dbName = "archSpeechReco" dbName = "archSpeechReco"
colName = "moviesMeta" colName = "moviesMeta"
bucket = 'archspeechreco' bucket = 'archspeechreco'
col = getMongoCollection(colName,dbName,uri) col = getMongoCollection(colName,dbName,uri)
fileFormat = args.format
if (fileFormat == 'mp4'):
uploadMp4(col,bucket)
elif (fileFormat == 'wav'):
uploadWave(col,bucket)
def uploadMp4(col,bucket):
toUpload = getUploadList(col) toUpload = getUploadList(col)
for i in toUpload: for i in toUpload:
fileName = ObjectId(i['_id']) fileName = ObjectId(i['_id'])
getVid( i['url'], ObjectId( i['_id'] ) ) getVid( i['url'], ObjectId( i['_id'] ) )
upload_blob(bucket, fileName, "mp4/{}.mp4".format(fileName),col) upload_blob(bucket, "{}.mp4".format(fileName), "mp4/{}.mp4".format(fileName),col,"Mp4")
try: try:
os.remove("{}.mp4".format(fileName)) os.remove("{}.mp4".format(fileName))
except: except:
@ -29,26 +37,43 @@ def main():
print("{}.mp4 has been removed".format(fileName)) print("{}.mp4 has been removed".format(fileName))
def upload_blob(bucket_name, source_file_name, destination_blob_name,col): def uploadWave(col,bucket):
toUpload = getWavUploadList(col)
for i in toUpload:
fileName = ObjectId(i['_id'])
getVid( i['url'], ObjectId( i['_id'] ) )
getWave("{}.mp4".format(fileName))
upload_blob(bucket, "{}.wav".format(fileName), "wave/{}.wav".format(fileName),col,"Wav")
try:
os.remove("{}.wav".format(fileName))
except:
print("{}.wav has NOT been removed".format(fileName))
else:
print("{}.wav has been removed".format(fileName))
def upload_blob(bucket_name, source_file_name, destination_blob_name,col,fileFormat):
"""Uploads a file to the bucket.""" """Uploads a file to the bucket."""
storage_client = storage.Client() storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name) bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(destination_blob_name) blob = bucket.blob(destination_blob_name)
try: try:
blob.upload_from_filename("{}.mp4".format(source_file_name)) blob.upload_from_filename(source_file_name)
except: except:
print("gcs upload failed") print("gcs upload failed")
else: else:
print('File {}.mp4 uploaded to {}.'.format( print('File {}.{} uploaded to {}.'.format(
source_file_name, source_file_name,
fileFormat,
destination_blob_name)) destination_blob_name))
now = datetime.datetime.now() now = datetime.datetime.now()
try: try:
col.update_one( col.update_one(
{"_id": ObjectId(source_file_name)}, {"_id": ObjectId(source_file_name.split('.')[0])},
{"$set":{ {"$set":{
"gcs":{ "gcs{}".format(fileFormat):{
"location":destination_blob_name, "location":destination_blob_name,
"uploadDate":now.strftime("%Y-%m-%d %H:%M:%S") "uploadDate":now.strftime("%Y-%m-%d %H:%M:%S")
} }
@ -73,7 +98,7 @@ def getUploadList(col):
pipeline = [] pipeline = []
#$match phase, filetr documents withour gcs field - movies not uploaded to gcs #$match phase, filetr documents withour gcs field - movies not uploaded to gcs
pipeline.append({"$match": { pipeline.append({"$match": {
"gcs": {"$exists": False} "gcsMp4": {"$exists": False}
} }
}) })
#project phase, show only url and _id keys #project phase, show only url and _id keys
@ -88,6 +113,25 @@ def getUploadList(col):
return col.aggregate(pipeline) return col.aggregate(pipeline)
def getWavUploadList(col):
pipeline = []
#$match phase, filetr documents withour gcs field - movies not uploaded to gcs
pipeline.append({"$match": {
"gcsWav": {"$exists": False}
}
})
#project phase, show only url and _id keys
pipeline.append({"$project": {
"url": { "$concat": [ "http://repozytorium.fn.org.pl/",{"$arrayElemAt": [ "$mp4",0 ]}] }
}
})
#skip first N documents
#pipeline.append({"$skip":362})
#fetch only N documents
#pipeline.append({"$limit":500})
return col.aggregate(pipeline)
def getVid(url,out): def getVid(url,out):
try: try:
@ -96,6 +140,23 @@ def getVid(url,out):
print("wrong URL, can't download") print("wrong URL, can't download")
if __name__ == '__main__': def getWave(filename):
main() try:
run(['ffmpeg','-i', filename, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '1', filename.replace("mp4","wav")],stdout=DEVNULL)
except:
print("problem with ffmpeg")
else:
try:
os.remove(filename)
except:
print("{} has NOT been removed".format(filename))
else:
print("{} has been removed".format(filename))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='GCS uploader')
parser.add_argument("--format", default='mp4', help="format to fetch and upload, [mp4, wav]")
args = parser.parse_args()
main(args)