chmury_projekt/labs/etl/ek-visual-etl-raw-to-processed.py
2024-05-31 23:49:46 +02:00

38 lines
2.0 KiB
Python

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame
def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
for alias, frame in mapping.items():
frame.toDF().createOrReplaceTempView(alias)
result = spark.sql(query)
return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1717180425106 = glueContext.create_dynamic_frame.from_catalog(database="datalake_raw_878695318857_ek_1201695", table_name="crawler_stockdata", transformation_ctx="AWSGlueDataCatalog_node1717180425106")
# Script generated for node SQL Query
SqlQuery3637 = '''
select year(cast(transaction_ts as timestamp)) as year,
month(cast(transaction_ts as timestamp)) as month,
day(cast(transaction_ts as timestamp)) as day,
hour(cast(transaction_ts as timestamp)) as hour,
transaction_ts, symbol, price, amount, dollar_amount, type
from myDataSource;
'''
SQLQuery_node1717180577964 = sparkSqlQuery(glueContext, query = SqlQuery3637, mapping = {"myDataSource":AWSGlueDataCatalog_node1717180425106}, transformation_ctx = "SQLQuery_node1717180577964")
# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1717180955274 = glueContext.write_dynamic_frame.from_catalog(frame=SQLQuery_node1717180577964, database="datalake_processed_878695318857_ek_1201695", table_name="processed_stockdata", additional_options={"enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE", "partitionKeys": ["symbol", "year", "month", "day", "hour"]}, transformation_ctx="AWSGlueDataCatalog_node1717180955274")
job.commit()