diff --git a/labs/ab-etl-visual-raw-to-processed.py b/labs/ab-etl-visual-raw-to-processed.py new file mode 100644 index 0000000..53c9bdc --- /dev/null +++ b/labs/ab-etl-visual-raw-to-processed.py @@ -0,0 +1,39 @@ +import sys +from awsglue.transforms import * +from awsglue.utils import getResolvedOptions +from pyspark.context import SparkContext +from awsglue.context import GlueContext +from awsglue.job import Job +from awsglue import DynamicFrame + +def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: + for alias, frame in mapping.items(): + frame.toDF().createOrReplaceTempView(alias) + result = spark.sql(query) + return DynamicFrame.fromDF(result, glueContext, transformation_ctx) +args = getResolvedOptions(sys.argv, ['JOB_NAME']) +sc = SparkContext() +glueContext = GlueContext(sc) +spark = glueContext.spark_session +job = Job(glueContext) +job.init(args['JOB_NAME'], args) + +# Script generated for node AWS Glue Data Catalog +AWSGlueDataCatalog_node1715504004627 = glueContext.create_dynamic_frame.from_catalog(database="datalake_raw_534534002841_ab_1201680", table_name="crawler_stockdata", transformation_ctx="AWSGlueDataCatalog_node1715504004627") + +# Script generated for node SQL Query +SqlQuery1825 = ''' +select year(cast(transaction_ts as timestamp)) as year, + month(cast(transaction_ts as timestamp)) as month, + day(cast(transaction_ts as timestamp)) as day, + hour(cast(transaction_ts as timestamp)) as hour, + transaction_ts, symbol, price, amount, dollar_amount, type +from myDataSource; + +''' +SQLQuery_node1715504223461 = sparkSqlQuery(glueContext, query = SqlQuery1825, mapping = {"myDataSource":AWSGlueDataCatalog_node1715504004627}, transformation_ctx = "SQLQuery_node1715504223461") + +# Script generated for node AWS Glue Data Catalog +AWSGlueDataCatalog_node1716060278270 = glueContext.write_dynamic_frame.from_catalog(frame=SQLQuery_node1715504223461, database="datalake_processed_534534002841_ab_1201680", table_name="processed_stockdata", additional_options={"enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE", "partitionKeys": ["symbol", "year", "month", "day", "hour"]}, transformation_ctx="AWSGlueDataCatalog_node1716060278270") + +job.commit() \ No newline at end of file diff --git a/labs/ab-visual-etl-proces-transformation.py b/labs/ab-visual-etl-proces-transformation.py new file mode 100644 index 0000000..dd9279f --- /dev/null +++ b/labs/ab-visual-etl-proces-transformation.py @@ -0,0 +1,53 @@ +import sys +from awsglue.transforms import * +from awsglue.utils import getResolvedOptions +from pyspark.context import SparkContext +from awsglue.context import GlueContext +from awsglue.job import Job +from awsglue import DynamicFrame + +def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: + for alias, frame in mapping.items(): + frame.toDF().createOrReplaceTempView(alias) + result = spark.sql(query) + return DynamicFrame.fromDF(result, glueContext, transformation_ctx) +def sparkUnion(glueContext, unionType, mapping, transformation_ctx) -> DynamicFrame: + for alias, frame in mapping.items(): + frame.toDF().createOrReplaceTempView(alias) + result = spark.sql("(select * from source1) UNION " + unionType + " (select * from source2)") + return DynamicFrame.fromDF(result, glueContext, transformation_ctx) +args = getResolvedOptions(sys.argv, ['JOB_NAME']) +sc = SparkContext() +glueContext = GlueContext(sc) +spark = glueContext.spark_session +job = Job(glueContext) +job.init(args['JOB_NAME'], args) + +# Script generated for node AWS Glue Data Catalog +AWSGlueDataCatalog_node1716120307832 = glueContext.create_dynamic_frame.from_catalog(database="datalake_processed_534534002841_ab_1201680", table_name="agg_stockdata", transformation_ctx="AWSGlueDataCatalog_node1716120307832") + +# Script generated for node AWS Glue Data Catalog +AWSGlueDataCatalog_node1716061298505 = glueContext.create_dynamic_frame.from_catalog(database="datalake_processed_534534002841_ab_1201680", table_name="processed_stockdata", transformation_ctx="AWSGlueDataCatalog_node1716061298505") + +# Script generated for node Change Schema +ChangeSchema_node1716120318123 = ApplyMapping.apply(frame=AWSGlueDataCatalog_node1716120307832, mappings=[("total_volume", "double", "total_volume", "double"), ("total_dollars", "double", "total_dollars", "double"), ("total_cnt_of_transactions", "int", "total_cnt_of_transactions", "int"), ("type", "string", "type", "string"), ("symbol", "string", "symbol", "string"), ("year", "int", "year", "int"), ("month", "int", "month", "int"), ("day", "int", "day", "int")], transformation_ctx="ChangeSchema_node1716120318123") + +# Script generated for node SQL Query +SqlQuery1998 = ''' +select ROUND(SUM(amount), 2) as total_volume, +ROUND(SUM(dollar_amount), 2) as total_dollars, +COUNT(transaction_ts) as total_cnt_of_transactions, +type, symbol, year, month, day +from datalake_processed_534534002841_ab_1201680.processed_stockdata +group by symbol, year, month, day, type +order by symbol, day, type +''' +SQLQuery_node1716061397564 = sparkSqlQuery(glueContext, query = SqlQuery1998, mapping = {"myDataSource":AWSGlueDataCatalog_node1716061298505}, transformation_ctx = "SQLQuery_node1716061397564") + +# Script generated for node Union +Union_node1716065407070 = sparkUnion(glueContext, unionType = "DISTINCT", mapping = {"source1": ChangeSchema_node1716120318123, "source2": SQLQuery_node1716061397564}, transformation_ctx = "Union_node1716065407070") + +# Script generated for node AWS Glue Data Catalog +AWSGlueDataCatalog_node1716064742210 = glueContext.write_dynamic_frame.from_catalog(frame=Union_node1716065407070, database="datalake_processed_534534002841_ab_1201680", table_name="agg_stockdata", additional_options={"enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE", "partitionKeys": ["symbol", "year", "month", "day"]}, transformation_ctx="AWSGlueDataCatalog_node1716064742210") + +job.commit() \ No newline at end of file diff --git a/labs/terraform/terraform.tfstate b/labs/terraform/terraform.tfstate index 584595d..8823567 100644 --- a/labs/terraform/terraform.tfstate +++ b/labs/terraform/terraform.tfstate @@ -1,7 +1,7 @@ { "version": 4, "terraform_version": "1.8.1", - "serial": 212, + "serial": 276, "lineage": "a77aaaba-b4f8-6adb-0387-8f0b98d722c2", "outputs": {}, "resources": [], diff --git a/labs/terraform/terraform.tfstate.backup b/labs/terraform/terraform.tfstate.backup index f45b390..c18b2ba 100644 --- a/labs/terraform/terraform.tfstate.backup +++ b/labs/terraform/terraform.tfstate.backup @@ -1,7 +1,7 @@ { "version": 4, "terraform_version": "1.8.1", - "serial": 196, + "serial": 260, "lineage": "a77aaaba-b4f8-6adb-0387-8f0b98d722c2", "outputs": {}, "resources": [ @@ -43,7 +43,7 @@ "id": "development", "name": "development", "state": "ENABLED", - "tags": {}, + "tags": null, "tags_all": {} }, "sensitive_attributes": [], @@ -82,8 +82,8 @@ "id": "534534002841:datalake_processed_534534002841_ab_1201680", "location_uri": "", "name": "datalake_processed_534534002841_ab_1201680", - "parameters": {}, - "tags": {}, + "parameters": null, + "tags": null, "tags_all": {}, "target_database": [] }, @@ -120,8 +120,8 @@ "id": "534534002841:datalake_raw_534534002841_ab_1201680", "location_uri": "", "name": "datalake_raw_534534002841_ab_1201680", - "parameters": {}, - "tags": {}, + "parameters": null, + "tags": null, "tags_all": {}, "target_database": [] }, @@ -141,7 +141,7 @@ "attributes": { "arn": "arn:aws:glue:us-east-1:534534002841:crawler/gc-raw-534534002841-ab-1201680", "catalog_target": [], - "classifiers": [], + "classifiers": null, "configuration": "", "database_name": "datalake_raw_534534002841_ab_1201680", "delta_target": [], @@ -175,7 +175,7 @@ "connection_name": "", "dlq_event_queue_arn": "", "event_queue_arn": "", - "exclusions": [], + "exclusions": null, "path": "s3://datalake-raw-534534002841-ab-1201680/raw-zone/stockdata/", "sample_size": 0 } @@ -275,7 +275,7 @@ ], "snowflake_configuration": [], "splunk_configuration": [], - "tags": {}, + "tags": null, "tags_all": {}, "timeouts": null, "version_id": "1" @@ -365,9 +365,9 @@ "image_uri": "", "invoke_arn": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1:534534002841:function:etl-post-processing-534534002841-ab-1201680/invocations", "kms_key_arn": "", - "last_modified": "2024-05-12T07:30:33.751+0000", + "last_modified": "2024-05-19T10:36:30.839+0000", "layers": [ - "arn:aws:lambda:us-east-1:534534002841:layer:aws_wrangler_534534002841_ab_1201680:4" + "arn:aws:lambda:us-east-1:534534002841:layer:aws_wrangler_534534002841_ab_1201680:6" ], "logging_config": [ { @@ -396,7 +396,7 @@ "snap_start": [], "source_code_hash": "DYklWA51/+hutwYtHutJg59rV7DY0LEgfp+ne8wgiSo=", "source_code_size": 884, - "tags": {}, + "tags": null, "tags_all": {}, "timeout": 300, "timeouts": null, @@ -425,15 +425,15 @@ { "schema_version": 0, "attributes": { - "arn": "arn:aws:lambda:us-east-1:534534002841:layer:aws_wrangler_534534002841_ab_1201680:4", - "compatible_architectures": [], + "arn": "arn:aws:lambda:us-east-1:534534002841:layer:aws_wrangler_534534002841_ab_1201680:6", + "compatible_architectures": null, "compatible_runtimes": [ "python3.8" ], - "created_date": "2024-05-12T07:30:26.214+0000", + "created_date": "2024-05-19T10:36:30.504+0000", "description": "", "filename": "../lambda/awswrangler-layer-2.7.0-py3.8.zip", - "id": "arn:aws:lambda:us-east-1:534534002841:layer:aws_wrangler_534534002841_ab_1201680:4", + "id": "arn:aws:lambda:us-east-1:534534002841:layer:aws_wrangler_534534002841_ab_1201680:6", "layer_arn": "arn:aws:lambda:us-east-1:534534002841:layer:aws_wrangler_534534002841_ab_1201680", "layer_name": "aws_wrangler_534534002841_ab_1201680", "license_info": "", @@ -445,7 +445,7 @@ "skip_destroy": false, "source_code_hash": "C0YX/4auMnBs4J9JCDy1f7uc2GLF0vU7ppQgzffQiN4=", "source_code_size": 43879070, - "version": "4" + "version": "6" }, "sensitive_attributes": [], "private": "bnVsbA==" @@ -514,25 +514,7 @@ ], "hosted_zone_id": "Z3AQBSTGFYJSTF", "id": "athena-results-534534002841-ab-1201680", - "lifecycle_rule": [ - { - "abort_incomplete_multipart_upload_days": 0, - "enabled": true, - "expiration": [ - { - "date": "", - "days": 1, - "expired_object_delete_marker": false - } - ], - "id": "standard-expiration", - "noncurrent_version_expiration": [], - "noncurrent_version_transition": [], - "prefix": "", - "tags": {}, - "transition": [] - } - ], + "lifecycle_rule": [], "logging": [], "object_lock_configuration": [], "object_lock_enabled": false, @@ -796,17 +778,7 @@ "bucket": "datalake-raw-534534002841-ab-1201680", "eventbridge": false, "id": "datalake-raw-534534002841-ab-1201680", - "lambda_function": [ - { - "events": [ - "s3:ObjectCreated:*" - ], - "filter_prefix": "raw-zone/", - "filter_suffix": "", - "id": "tf-s3-lambda-20240512073033855700000001", - "lambda_function_arn": "arn:aws:lambda:us-east-1:534534002841:function:etl-post-processing-534534002841-ab-1201680" - } - ], + "lambda_function": [], "queue": [], "topic": [] }, @@ -839,12 +811,12 @@ "key_id": "", "name": "s3_processed_bucket_name", "overwrite": null, - "tags": {}, + "tags": null, "tags_all": {}, "tier": "Standard", "type": "String", "value": "datalake-processed-534534002841-ab-1201680", - "version": 3 + "version": 1 }, "sensitive_attributes": [ [