39 lines
2.0 KiB
Python
39 lines
2.0 KiB
Python
import sys
|
|
from awsglue.transforms import *
|
|
from awsglue.utils import getResolvedOptions
|
|
from pyspark.context import SparkContext
|
|
from awsglue.context import GlueContext
|
|
from awsglue.job import Job
|
|
from awsglue import DynamicFrame
|
|
|
|
def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
|
|
for alias, frame in mapping.items():
|
|
frame.toDF().createOrReplaceTempView(alias)
|
|
result = spark.sql(query)
|
|
return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
|
|
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
|
|
sc = SparkContext()
|
|
glueContext = GlueContext(sc)
|
|
spark = glueContext.spark_session
|
|
job = Job(glueContext)
|
|
job.init(args['JOB_NAME'], args)
|
|
|
|
# Script generated for node AWS Glue Data Catalog
|
|
AWSGlueDataCatalog_node1715504004627 = glueContext.create_dynamic_frame.from_catalog(database="datalake_raw_534534002841_ab_1201680", table_name="crawler_stockdata", transformation_ctx="AWSGlueDataCatalog_node1715504004627")
|
|
|
|
# Script generated for node SQL Query
|
|
SqlQuery1825 = '''
|
|
select year(cast(transaction_ts as timestamp)) as year,
|
|
month(cast(transaction_ts as timestamp)) as month,
|
|
day(cast(transaction_ts as timestamp)) as day,
|
|
hour(cast(transaction_ts as timestamp)) as hour,
|
|
transaction_ts, symbol, price, amount, dollar_amount, type
|
|
from myDataSource;
|
|
|
|
'''
|
|
SQLQuery_node1715504223461 = sparkSqlQuery(glueContext, query = SqlQuery1825, mapping = {"myDataSource":AWSGlueDataCatalog_node1715504004627}, transformation_ctx = "SQLQuery_node1715504223461")
|
|
|
|
# Script generated for node AWS Glue Data Catalog
|
|
AWSGlueDataCatalog_node1716060278270 = glueContext.write_dynamic_frame.from_catalog(frame=SQLQuery_node1715504223461, database="datalake_processed_534534002841_ab_1201680", table_name="processed_stockdata", additional_options={"enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE", "partitionKeys": ["symbol", "year", "month", "day", "hour"]}, transformation_ctx="AWSGlueDataCatalog_node1716060278270")
|
|
|
|
job.commit() |