Added scripts for exercise 5
This commit is contained in:
parent
9c95137819
commit
2936c9058c
39
labs/ab-etl-visual-raw-to-processed.py
Normal file
39
labs/ab-etl-visual-raw-to-processed.py
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
import sys
|
||||||
|
from awsglue.transforms import *
|
||||||
|
from awsglue.utils import getResolvedOptions
|
||||||
|
from pyspark.context import SparkContext
|
||||||
|
from awsglue.context import GlueContext
|
||||||
|
from awsglue.job import Job
|
||||||
|
from awsglue import DynamicFrame
|
||||||
|
|
||||||
|
def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
|
||||||
|
for alias, frame in mapping.items():
|
||||||
|
frame.toDF().createOrReplaceTempView(alias)
|
||||||
|
result = spark.sql(query)
|
||||||
|
return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
|
||||||
|
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
|
||||||
|
sc = SparkContext()
|
||||||
|
glueContext = GlueContext(sc)
|
||||||
|
spark = glueContext.spark_session
|
||||||
|
job = Job(glueContext)
|
||||||
|
job.init(args['JOB_NAME'], args)
|
||||||
|
|
||||||
|
# Script generated for node AWS Glue Data Catalog
|
||||||
|
AWSGlueDataCatalog_node1715504004627 = glueContext.create_dynamic_frame.from_catalog(database="datalake_raw_534534002841_ab_1201680", table_name="crawler_stockdata", transformation_ctx="AWSGlueDataCatalog_node1715504004627")
|
||||||
|
|
||||||
|
# Script generated for node SQL Query
|
||||||
|
SqlQuery1825 = '''
|
||||||
|
select year(cast(transaction_ts as timestamp)) as year,
|
||||||
|
month(cast(transaction_ts as timestamp)) as month,
|
||||||
|
day(cast(transaction_ts as timestamp)) as day,
|
||||||
|
hour(cast(transaction_ts as timestamp)) as hour,
|
||||||
|
transaction_ts, symbol, price, amount, dollar_amount, type
|
||||||
|
from myDataSource;
|
||||||
|
|
||||||
|
'''
|
||||||
|
SQLQuery_node1715504223461 = sparkSqlQuery(glueContext, query = SqlQuery1825, mapping = {"myDataSource":AWSGlueDataCatalog_node1715504004627}, transformation_ctx = "SQLQuery_node1715504223461")
|
||||||
|
|
||||||
|
# Script generated for node AWS Glue Data Catalog
|
||||||
|
AWSGlueDataCatalog_node1716060278270 = glueContext.write_dynamic_frame.from_catalog(frame=SQLQuery_node1715504223461, database="datalake_processed_534534002841_ab_1201680", table_name="processed_stockdata", additional_options={"enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE", "partitionKeys": ["symbol", "year", "month", "day", "hour"]}, transformation_ctx="AWSGlueDataCatalog_node1716060278270")
|
||||||
|
|
||||||
|
job.commit()
|
53
labs/ab-visual-etl-proces-transformation.py
Normal file
53
labs/ab-visual-etl-proces-transformation.py
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
import sys
|
||||||
|
from awsglue.transforms import *
|
||||||
|
from awsglue.utils import getResolvedOptions
|
||||||
|
from pyspark.context import SparkContext
|
||||||
|
from awsglue.context import GlueContext
|
||||||
|
from awsglue.job import Job
|
||||||
|
from awsglue import DynamicFrame
|
||||||
|
|
||||||
|
def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
|
||||||
|
for alias, frame in mapping.items():
|
||||||
|
frame.toDF().createOrReplaceTempView(alias)
|
||||||
|
result = spark.sql(query)
|
||||||
|
return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
|
||||||
|
def sparkUnion(glueContext, unionType, mapping, transformation_ctx) -> DynamicFrame:
|
||||||
|
for alias, frame in mapping.items():
|
||||||
|
frame.toDF().createOrReplaceTempView(alias)
|
||||||
|
result = spark.sql("(select * from source1) UNION " + unionType + " (select * from source2)")
|
||||||
|
return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
|
||||||
|
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
|
||||||
|
sc = SparkContext()
|
||||||
|
glueContext = GlueContext(sc)
|
||||||
|
spark = glueContext.spark_session
|
||||||
|
job = Job(glueContext)
|
||||||
|
job.init(args['JOB_NAME'], args)
|
||||||
|
|
||||||
|
# Script generated for node AWS Glue Data Catalog
|
||||||
|
AWSGlueDataCatalog_node1716120307832 = glueContext.create_dynamic_frame.from_catalog(database="datalake_processed_534534002841_ab_1201680", table_name="agg_stockdata", transformation_ctx="AWSGlueDataCatalog_node1716120307832")
|
||||||
|
|
||||||
|
# Script generated for node AWS Glue Data Catalog
|
||||||
|
AWSGlueDataCatalog_node1716061298505 = glueContext.create_dynamic_frame.from_catalog(database="datalake_processed_534534002841_ab_1201680", table_name="processed_stockdata", transformation_ctx="AWSGlueDataCatalog_node1716061298505")
|
||||||
|
|
||||||
|
# Script generated for node Change Schema
|
||||||
|
ChangeSchema_node1716120318123 = ApplyMapping.apply(frame=AWSGlueDataCatalog_node1716120307832, mappings=[("total_volume", "double", "total_volume", "double"), ("total_dollars", "double", "total_dollars", "double"), ("total_cnt_of_transactions", "int", "total_cnt_of_transactions", "int"), ("type", "string", "type", "string"), ("symbol", "string", "symbol", "string"), ("year", "int", "year", "int"), ("month", "int", "month", "int"), ("day", "int", "day", "int")], transformation_ctx="ChangeSchema_node1716120318123")
|
||||||
|
|
||||||
|
# Script generated for node SQL Query
|
||||||
|
SqlQuery1998 = '''
|
||||||
|
select ROUND(SUM(amount), 2) as total_volume,
|
||||||
|
ROUND(SUM(dollar_amount), 2) as total_dollars,
|
||||||
|
COUNT(transaction_ts) as total_cnt_of_transactions,
|
||||||
|
type, symbol, year, month, day
|
||||||
|
from datalake_processed_534534002841_ab_1201680.processed_stockdata
|
||||||
|
group by symbol, year, month, day, type
|
||||||
|
order by symbol, day, type
|
||||||
|
'''
|
||||||
|
SQLQuery_node1716061397564 = sparkSqlQuery(glueContext, query = SqlQuery1998, mapping = {"myDataSource":AWSGlueDataCatalog_node1716061298505}, transformation_ctx = "SQLQuery_node1716061397564")
|
||||||
|
|
||||||
|
# Script generated for node Union
|
||||||
|
Union_node1716065407070 = sparkUnion(glueContext, unionType = "DISTINCT", mapping = {"source1": ChangeSchema_node1716120318123, "source2": SQLQuery_node1716061397564}, transformation_ctx = "Union_node1716065407070")
|
||||||
|
|
||||||
|
# Script generated for node AWS Glue Data Catalog
|
||||||
|
AWSGlueDataCatalog_node1716064742210 = glueContext.write_dynamic_frame.from_catalog(frame=Union_node1716065407070, database="datalake_processed_534534002841_ab_1201680", table_name="agg_stockdata", additional_options={"enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE", "partitionKeys": ["symbol", "year", "month", "day"]}, transformation_ctx="AWSGlueDataCatalog_node1716064742210")
|
||||||
|
|
||||||
|
job.commit()
|
@ -1,7 +1,7 @@
|
|||||||
{
|
{
|
||||||
"version": 4,
|
"version": 4,
|
||||||
"terraform_version": "1.8.1",
|
"terraform_version": "1.8.1",
|
||||||
"serial": 212,
|
"serial": 276,
|
||||||
"lineage": "a77aaaba-b4f8-6adb-0387-8f0b98d722c2",
|
"lineage": "a77aaaba-b4f8-6adb-0387-8f0b98d722c2",
|
||||||
"outputs": {},
|
"outputs": {},
|
||||||
"resources": [],
|
"resources": [],
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
{
|
{
|
||||||
"version": 4,
|
"version": 4,
|
||||||
"terraform_version": "1.8.1",
|
"terraform_version": "1.8.1",
|
||||||
"serial": 196,
|
"serial": 260,
|
||||||
"lineage": "a77aaaba-b4f8-6adb-0387-8f0b98d722c2",
|
"lineage": "a77aaaba-b4f8-6adb-0387-8f0b98d722c2",
|
||||||
"outputs": {},
|
"outputs": {},
|
||||||
"resources": [
|
"resources": [
|
||||||
@ -43,7 +43,7 @@
|
|||||||
"id": "development",
|
"id": "development",
|
||||||
"name": "development",
|
"name": "development",
|
||||||
"state": "ENABLED",
|
"state": "ENABLED",
|
||||||
"tags": {},
|
"tags": null,
|
||||||
"tags_all": {}
|
"tags_all": {}
|
||||||
},
|
},
|
||||||
"sensitive_attributes": [],
|
"sensitive_attributes": [],
|
||||||
@ -82,8 +82,8 @@
|
|||||||
"id": "534534002841:datalake_processed_534534002841_ab_1201680",
|
"id": "534534002841:datalake_processed_534534002841_ab_1201680",
|
||||||
"location_uri": "",
|
"location_uri": "",
|
||||||
"name": "datalake_processed_534534002841_ab_1201680",
|
"name": "datalake_processed_534534002841_ab_1201680",
|
||||||
"parameters": {},
|
"parameters": null,
|
||||||
"tags": {},
|
"tags": null,
|
||||||
"tags_all": {},
|
"tags_all": {},
|
||||||
"target_database": []
|
"target_database": []
|
||||||
},
|
},
|
||||||
@ -120,8 +120,8 @@
|
|||||||
"id": "534534002841:datalake_raw_534534002841_ab_1201680",
|
"id": "534534002841:datalake_raw_534534002841_ab_1201680",
|
||||||
"location_uri": "",
|
"location_uri": "",
|
||||||
"name": "datalake_raw_534534002841_ab_1201680",
|
"name": "datalake_raw_534534002841_ab_1201680",
|
||||||
"parameters": {},
|
"parameters": null,
|
||||||
"tags": {},
|
"tags": null,
|
||||||
"tags_all": {},
|
"tags_all": {},
|
||||||
"target_database": []
|
"target_database": []
|
||||||
},
|
},
|
||||||
@ -141,7 +141,7 @@
|
|||||||
"attributes": {
|
"attributes": {
|
||||||
"arn": "arn:aws:glue:us-east-1:534534002841:crawler/gc-raw-534534002841-ab-1201680",
|
"arn": "arn:aws:glue:us-east-1:534534002841:crawler/gc-raw-534534002841-ab-1201680",
|
||||||
"catalog_target": [],
|
"catalog_target": [],
|
||||||
"classifiers": [],
|
"classifiers": null,
|
||||||
"configuration": "",
|
"configuration": "",
|
||||||
"database_name": "datalake_raw_534534002841_ab_1201680",
|
"database_name": "datalake_raw_534534002841_ab_1201680",
|
||||||
"delta_target": [],
|
"delta_target": [],
|
||||||
@ -175,7 +175,7 @@
|
|||||||
"connection_name": "",
|
"connection_name": "",
|
||||||
"dlq_event_queue_arn": "",
|
"dlq_event_queue_arn": "",
|
||||||
"event_queue_arn": "",
|
"event_queue_arn": "",
|
||||||
"exclusions": [],
|
"exclusions": null,
|
||||||
"path": "s3://datalake-raw-534534002841-ab-1201680/raw-zone/stockdata/",
|
"path": "s3://datalake-raw-534534002841-ab-1201680/raw-zone/stockdata/",
|
||||||
"sample_size": 0
|
"sample_size": 0
|
||||||
}
|
}
|
||||||
@ -275,7 +275,7 @@
|
|||||||
],
|
],
|
||||||
"snowflake_configuration": [],
|
"snowflake_configuration": [],
|
||||||
"splunk_configuration": [],
|
"splunk_configuration": [],
|
||||||
"tags": {},
|
"tags": null,
|
||||||
"tags_all": {},
|
"tags_all": {},
|
||||||
"timeouts": null,
|
"timeouts": null,
|
||||||
"version_id": "1"
|
"version_id": "1"
|
||||||
@ -365,9 +365,9 @@
|
|||||||
"image_uri": "",
|
"image_uri": "",
|
||||||
"invoke_arn": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:534534002841:function:etl-post-processing-534534002841-ab-1201680/invocations",
|
"invoke_arn": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:534534002841:function:etl-post-processing-534534002841-ab-1201680/invocations",
|
||||||
"kms_key_arn": "",
|
"kms_key_arn": "",
|
||||||
"last_modified": "2024-05-12T07:30:33.751+0000",
|
"last_modified": "2024-05-19T10:36:30.839+0000",
|
||||||
"layers": [
|
"layers": [
|
||||||
"arn:aws:lambda:us-east-1:534534002841:layer:aws_wrangler_534534002841_ab_1201680:4"
|
"arn:aws:lambda:us-east-1:534534002841:layer:aws_wrangler_534534002841_ab_1201680:6"
|
||||||
],
|
],
|
||||||
"logging_config": [
|
"logging_config": [
|
||||||
{
|
{
|
||||||
@ -396,7 +396,7 @@
|
|||||||
"snap_start": [],
|
"snap_start": [],
|
||||||
"source_code_hash": "DYklWA51/+hutwYtHutJg59rV7DY0LEgfp+ne8wgiSo=",
|
"source_code_hash": "DYklWA51/+hutwYtHutJg59rV7DY0LEgfp+ne8wgiSo=",
|
||||||
"source_code_size": 884,
|
"source_code_size": 884,
|
||||||
"tags": {},
|
"tags": null,
|
||||||
"tags_all": {},
|
"tags_all": {},
|
||||||
"timeout": 300,
|
"timeout": 300,
|
||||||
"timeouts": null,
|
"timeouts": null,
|
||||||
@ -425,15 +425,15 @@
|
|||||||
{
|
{
|
||||||
"schema_version": 0,
|
"schema_version": 0,
|
||||||
"attributes": {
|
"attributes": {
|
||||||
"arn": "arn:aws:lambda:us-east-1:534534002841:layer:aws_wrangler_534534002841_ab_1201680:4",
|
"arn": "arn:aws:lambda:us-east-1:534534002841:layer:aws_wrangler_534534002841_ab_1201680:6",
|
||||||
"compatible_architectures": [],
|
"compatible_architectures": null,
|
||||||
"compatible_runtimes": [
|
"compatible_runtimes": [
|
||||||
"python3.8"
|
"python3.8"
|
||||||
],
|
],
|
||||||
"created_date": "2024-05-12T07:30:26.214+0000",
|
"created_date": "2024-05-19T10:36:30.504+0000",
|
||||||
"description": "",
|
"description": "",
|
||||||
"filename": "../lambda/awswrangler-layer-2.7.0-py3.8.zip",
|
"filename": "../lambda/awswrangler-layer-2.7.0-py3.8.zip",
|
||||||
"id": "arn:aws:lambda:us-east-1:534534002841:layer:aws_wrangler_534534002841_ab_1201680:4",
|
"id": "arn:aws:lambda:us-east-1:534534002841:layer:aws_wrangler_534534002841_ab_1201680:6",
|
||||||
"layer_arn": "arn:aws:lambda:us-east-1:534534002841:layer:aws_wrangler_534534002841_ab_1201680",
|
"layer_arn": "arn:aws:lambda:us-east-1:534534002841:layer:aws_wrangler_534534002841_ab_1201680",
|
||||||
"layer_name": "aws_wrangler_534534002841_ab_1201680",
|
"layer_name": "aws_wrangler_534534002841_ab_1201680",
|
||||||
"license_info": "",
|
"license_info": "",
|
||||||
@ -445,7 +445,7 @@
|
|||||||
"skip_destroy": false,
|
"skip_destroy": false,
|
||||||
"source_code_hash": "C0YX/4auMnBs4J9JCDy1f7uc2GLF0vU7ppQgzffQiN4=",
|
"source_code_hash": "C0YX/4auMnBs4J9JCDy1f7uc2GLF0vU7ppQgzffQiN4=",
|
||||||
"source_code_size": 43879070,
|
"source_code_size": 43879070,
|
||||||
"version": "4"
|
"version": "6"
|
||||||
},
|
},
|
||||||
"sensitive_attributes": [],
|
"sensitive_attributes": [],
|
||||||
"private": "bnVsbA=="
|
"private": "bnVsbA=="
|
||||||
@ -514,25 +514,7 @@
|
|||||||
],
|
],
|
||||||
"hosted_zone_id": "Z3AQBSTGFYJSTF",
|
"hosted_zone_id": "Z3AQBSTGFYJSTF",
|
||||||
"id": "athena-results-534534002841-ab-1201680",
|
"id": "athena-results-534534002841-ab-1201680",
|
||||||
"lifecycle_rule": [
|
"lifecycle_rule": [],
|
||||||
{
|
|
||||||
"abort_incomplete_multipart_upload_days": 0,
|
|
||||||
"enabled": true,
|
|
||||||
"expiration": [
|
|
||||||
{
|
|
||||||
"date": "",
|
|
||||||
"days": 1,
|
|
||||||
"expired_object_delete_marker": false
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"id": "standard-expiration",
|
|
||||||
"noncurrent_version_expiration": [],
|
|
||||||
"noncurrent_version_transition": [],
|
|
||||||
"prefix": "",
|
|
||||||
"tags": {},
|
|
||||||
"transition": []
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"logging": [],
|
"logging": [],
|
||||||
"object_lock_configuration": [],
|
"object_lock_configuration": [],
|
||||||
"object_lock_enabled": false,
|
"object_lock_enabled": false,
|
||||||
@ -796,17 +778,7 @@
|
|||||||
"bucket": "datalake-raw-534534002841-ab-1201680",
|
"bucket": "datalake-raw-534534002841-ab-1201680",
|
||||||
"eventbridge": false,
|
"eventbridge": false,
|
||||||
"id": "datalake-raw-534534002841-ab-1201680",
|
"id": "datalake-raw-534534002841-ab-1201680",
|
||||||
"lambda_function": [
|
"lambda_function": [],
|
||||||
{
|
|
||||||
"events": [
|
|
||||||
"s3:ObjectCreated:*"
|
|
||||||
],
|
|
||||||
"filter_prefix": "raw-zone/",
|
|
||||||
"filter_suffix": "",
|
|
||||||
"id": "tf-s3-lambda-20240512073033855700000001",
|
|
||||||
"lambda_function_arn": "arn:aws:lambda:us-east-1:534534002841:function:etl-post-processing-534534002841-ab-1201680"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"queue": [],
|
"queue": [],
|
||||||
"topic": []
|
"topic": []
|
||||||
},
|
},
|
||||||
@ -839,12 +811,12 @@
|
|||||||
"key_id": "",
|
"key_id": "",
|
||||||
"name": "s3_processed_bucket_name",
|
"name": "s3_processed_bucket_name",
|
||||||
"overwrite": null,
|
"overwrite": null,
|
||||||
"tags": {},
|
"tags": null,
|
||||||
"tags_all": {},
|
"tags_all": {},
|
||||||
"tier": "Standard",
|
"tier": "Standard",
|
||||||
"type": "String",
|
"type": "String",
|
||||||
"value": "datalake-processed-534534002841-ab-1201680",
|
"value": "datalake-processed-534534002841-ab-1201680",
|
||||||
"version": 3
|
"version": 1
|
||||||
},
|
},
|
||||||
"sensitive_attributes": [
|
"sensitive_attributes": [
|
||||||
[
|
[
|
||||||
|
Loading…
Reference in New Issue
Block a user