spiders + gcs uploader #1
@ -5,22 +5,30 @@ from pymongo import MongoClient
|
|||||||
from bson.objectid import ObjectId
|
from bson.objectid import ObjectId
|
||||||
import os
|
import os
|
||||||
import datetime
|
import datetime
|
||||||
|
from subprocess import run,DEVNULL
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
def main(args):
|
||||||
def main():
|
|
||||||
uri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco"
|
uri = "mongodb://speechRecoUser:speech!reco@localhost/archSpeechReco"
|
||||||
dbName = "archSpeechReco"
|
dbName = "archSpeechReco"
|
||||||
colName = "moviesMeta"
|
colName = "moviesMeta"
|
||||||
bucket = 'archspeechreco'
|
bucket = 'archspeechreco'
|
||||||
|
|
||||||
col = getMongoCollection(colName,dbName,uri)
|
col = getMongoCollection(colName,dbName,uri)
|
||||||
|
fileFormat = args.format
|
||||||
|
|
||||||
|
if (fileFormat == 'mp4'):
|
||||||
|
uploadMp4(col,bucket)
|
||||||
|
elif (fileFormat == 'wav'):
|
||||||
|
uploadWave(col,bucket)
|
||||||
|
|
||||||
|
def uploadMp4(col,bucket):
|
||||||
toUpload = getUploadList(col)
|
toUpload = getUploadList(col)
|
||||||
|
|
||||||
for i in toUpload:
|
for i in toUpload:
|
||||||
fileName = ObjectId(i['_id'])
|
fileName = ObjectId(i['_id'])
|
||||||
getVid( i['url'], ObjectId( i['_id'] ) )
|
getVid( i['url'], ObjectId( i['_id'] ) )
|
||||||
upload_blob(bucket, fileName, "mp4/{}.mp4".format(fileName),col)
|
upload_blob(bucket, "{}.mp4".format(fileName), "mp4/{}.mp4".format(fileName),col,"Mp4")
|
||||||
try:
|
try:
|
||||||
os.remove("{}.mp4".format(fileName))
|
os.remove("{}.mp4".format(fileName))
|
||||||
except:
|
except:
|
||||||
@ -29,26 +37,43 @@ def main():
|
|||||||
print("{}.mp4 has been removed".format(fileName))
|
print("{}.mp4 has been removed".format(fileName))
|
||||||
|
|
||||||
|
|
||||||
def upload_blob(bucket_name, source_file_name, destination_blob_name,col):
|
def uploadWave(col,bucket):
|
||||||
|
toUpload = getWavUploadList(col)
|
||||||
|
|
||||||
|
for i in toUpload:
|
||||||
|
fileName = ObjectId(i['_id'])
|
||||||
|
getVid( i['url'], ObjectId( i['_id'] ) )
|
||||||
|
getWave("{}.mp4".format(fileName))
|
||||||
|
upload_blob(bucket, "{}.wav".format(fileName), "wave/{}.wav".format(fileName),col,"Wav")
|
||||||
|
try:
|
||||||
|
os.remove("{}.wav".format(fileName))
|
||||||
|
except:
|
||||||
|
print("{}.wav has NOT been removed".format(fileName))
|
||||||
|
else:
|
||||||
|
print("{}.wav has been removed".format(fileName))
|
||||||
|
|
||||||
|
|
||||||
|
def upload_blob(bucket_name, source_file_name, destination_blob_name,col,fileFormat):
|
||||||
"""Uploads a file to the bucket."""
|
"""Uploads a file to the bucket."""
|
||||||
storage_client = storage.Client()
|
storage_client = storage.Client()
|
||||||
bucket = storage_client.get_bucket(bucket_name)
|
bucket = storage_client.get_bucket(bucket_name)
|
||||||
blob = bucket.blob(destination_blob_name)
|
blob = bucket.blob(destination_blob_name)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
blob.upload_from_filename("{}.mp4".format(source_file_name))
|
blob.upload_from_filename(source_file_name)
|
||||||
except:
|
except:
|
||||||
print("gcs upload failed")
|
print("gcs upload failed")
|
||||||
else:
|
else:
|
||||||
print('File {}.mp4 uploaded to {}.'.format(
|
print('File {}.{} uploaded to {}.'.format(
|
||||||
source_file_name,
|
source_file_name,
|
||||||
|
fileFormat,
|
||||||
destination_blob_name))
|
destination_blob_name))
|
||||||
now = datetime.datetime.now()
|
now = datetime.datetime.now()
|
||||||
try:
|
try:
|
||||||
col.update_one(
|
col.update_one(
|
||||||
{"_id": ObjectId(source_file_name)},
|
{"_id": ObjectId(source_file_name.split('.')[0])},
|
||||||
{"$set":{
|
{"$set":{
|
||||||
"gcs":{
|
"gcs{}".format(fileFormat):{
|
||||||
"location":destination_blob_name,
|
"location":destination_blob_name,
|
||||||
"uploadDate":now.strftime("%Y-%m-%d %H:%M:%S")
|
"uploadDate":now.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
}
|
}
|
||||||
@ -73,7 +98,7 @@ def getUploadList(col):
|
|||||||
pipeline = []
|
pipeline = []
|
||||||
#$match phase, filetr documents withour gcs field - movies not uploaded to gcs
|
#$match phase, filetr documents withour gcs field - movies not uploaded to gcs
|
||||||
pipeline.append({"$match": {
|
pipeline.append({"$match": {
|
||||||
"gcs": {"$exists": False}
|
"gcsMp4": {"$exists": False}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
#project phase, show only url and _id keys
|
#project phase, show only url and _id keys
|
||||||
@ -88,6 +113,25 @@ def getUploadList(col):
|
|||||||
|
|
||||||
return col.aggregate(pipeline)
|
return col.aggregate(pipeline)
|
||||||
|
|
||||||
|
def getWavUploadList(col):
|
||||||
|
pipeline = []
|
||||||
|
#$match phase, filetr documents withour gcs field - movies not uploaded to gcs
|
||||||
|
pipeline.append({"$match": {
|
||||||
|
"gcsWav": {"$exists": False}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
#project phase, show only url and _id keys
|
||||||
|
pipeline.append({"$project": {
|
||||||
|
"url": { "$concat": [ "http://repozytorium.fn.org.pl/",{"$arrayElemAt": [ "$mp4",0 ]}] }
|
||||||
|
}
|
||||||
|
})
|
||||||
|
#skip first N documents
|
||||||
|
#pipeline.append({"$skip":362})
|
||||||
|
#fetch only N documents
|
||||||
|
#pipeline.append({"$limit":500})
|
||||||
|
|
||||||
|
return col.aggregate(pipeline)
|
||||||
|
|
||||||
|
|
||||||
def getVid(url,out):
|
def getVid(url,out):
|
||||||
try:
|
try:
|
||||||
@ -96,6 +140,23 @@ def getVid(url,out):
|
|||||||
print("wrong URL, can't download")
|
print("wrong URL, can't download")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
def getWave(filename):
|
||||||
main()
|
try:
|
||||||
|
run(['ffmpeg','-i', filename, '-vn', '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '1', filename.replace("mp4","wav")],stdout=DEVNULL)
|
||||||
|
except:
|
||||||
|
print("problem with ffmpeg")
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
os.remove(filename)
|
||||||
|
except:
|
||||||
|
print("{} has NOT been removed".format(filename))
|
||||||
|
else:
|
||||||
|
print("{} has been removed".format(filename))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
parser = argparse.ArgumentParser(description='GCS uploader')
|
||||||
|
parser.add_argument("--format", default='mp4', help="format to fetch and upload, [mp4, wav]")
|
||||||
|
args = parser.parse_args()
|
||||||
|
main(args)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user