53 lines
3.4 KiB
Python
53 lines
3.4 KiB
Python
import sys
|
|
from awsglue.transforms import *
|
|
from awsglue.utils import getResolvedOptions
|
|
from pyspark.context import SparkContext
|
|
from awsglue.context import GlueContext
|
|
from awsglue.job import Job
|
|
from awsglue import DynamicFrame
|
|
|
|
def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
|
|
for alias, frame in mapping.items():
|
|
frame.toDF().createOrReplaceTempView(alias)
|
|
result = spark.sql(query)
|
|
return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
|
|
def sparkUnion(glueContext, unionType, mapping, transformation_ctx) -> DynamicFrame:
|
|
for alias, frame in mapping.items():
|
|
frame.toDF().createOrReplaceTempView(alias)
|
|
result = spark.sql("(select * from source1) UNION " + unionType + " (select * from source2)")
|
|
return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
|
|
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
|
|
sc = SparkContext()
|
|
glueContext = GlueContext(sc)
|
|
spark = glueContext.spark_session
|
|
job = Job(glueContext)
|
|
job.init(args['JOB_NAME'], args)
|
|
|
|
# Script generated for node AWS Glue Data Catalog
|
|
AWSGlueDataCatalog_node1716120307832 = glueContext.create_dynamic_frame.from_catalog(database="datalake_processed_534534002841_ab_1201680", table_name="agg_stockdata", transformation_ctx="AWSGlueDataCatalog_node1716120307832")
|
|
|
|
# Script generated for node AWS Glue Data Catalog
|
|
AWSGlueDataCatalog_node1716061298505 = glueContext.create_dynamic_frame.from_catalog(database="datalake_processed_534534002841_ab_1201680", table_name="processed_stockdata", transformation_ctx="AWSGlueDataCatalog_node1716061298505")
|
|
|
|
# Script generated for node Change Schema
|
|
ChangeSchema_node1716120318123 = ApplyMapping.apply(frame=AWSGlueDataCatalog_node1716120307832, mappings=[("total_volume", "double", "total_volume", "double"), ("total_dollars", "double", "total_dollars", "double"), ("total_cnt_of_transactions", "int", "total_cnt_of_transactions", "int"), ("type", "string", "type", "string"), ("symbol", "string", "symbol", "string"), ("year", "int", "year", "int"), ("month", "int", "month", "int"), ("day", "int", "day", "int")], transformation_ctx="ChangeSchema_node1716120318123")
|
|
|
|
# Script generated for node SQL Query
|
|
SqlQuery1998 = '''
|
|
select ROUND(SUM(amount), 2) as total_volume,
|
|
ROUND(SUM(dollar_amount), 2) as total_dollars,
|
|
COUNT(transaction_ts) as total_cnt_of_transactions,
|
|
type, symbol, year, month, day
|
|
from datalake_processed_534534002841_ab_1201680.processed_stockdata
|
|
group by symbol, year, month, day, type
|
|
order by symbol, day, type
|
|
'''
|
|
SQLQuery_node1716061397564 = sparkSqlQuery(glueContext, query = SqlQuery1998, mapping = {"myDataSource":AWSGlueDataCatalog_node1716061298505}, transformation_ctx = "SQLQuery_node1716061397564")
|
|
|
|
# Script generated for node Union
|
|
Union_node1716065407070 = sparkUnion(glueContext, unionType = "DISTINCT", mapping = {"source1": ChangeSchema_node1716120318123, "source2": SQLQuery_node1716061397564}, transformation_ctx = "Union_node1716065407070")
|
|
|
|
# Script generated for node AWS Glue Data Catalog
|
|
AWSGlueDataCatalog_node1716064742210 = glueContext.write_dynamic_frame.from_catalog(frame=Union_node1716065407070, database="datalake_processed_534534002841_ab_1201680", table_name="agg_stockdata", additional_options={"enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE", "partitionKeys": ["symbol", "year", "month", "day"]}, transformation_ctx="AWSGlueDataCatalog_node1716064742210")
|
|
|
|
job.commit() |