Compare commits
No commits in common. "4da2340ec3663f579004d3a1a26e2bee4dda4133" and "7d53b118d7b89daaf6653d2b51fc316d88d6228b" have entirely different histories.
4da2340ec3
...
7d53b118d7
@ -45,7 +45,7 @@ class KinesisProducer:
|
|||||||
# invoking a Lambda to add those new lines).Every message is a dumped json with \n
|
# invoking a Lambda to add those new lines).Every message is a dumped json with \n
|
||||||
|
|
||||||
tran_id = event["trans_id"]
|
tran_id = event["trans_id"]
|
||||||
payload = (json.dumps(event) + '\n').encode('utf-8')
|
payload = (json.dumps(event)+'\n').encode('utf-8')
|
||||||
|
|
||||||
attempt = 1
|
attempt = 1
|
||||||
while attempt < self.max_retry_attempt:
|
while attempt < self.max_retry_attempt:
|
||||||
@ -55,8 +55,7 @@ class KinesisProducer:
|
|||||||
Data=payload,
|
Data=payload,
|
||||||
PartitionKey=key
|
PartitionKey=key
|
||||||
)
|
)
|
||||||
logger.info('Msg with trans_id={} sent to shard {} seq no {}'.format(tran_id, response["ShardId"],
|
logger.info('Msg with trans_id={} sent to shard {} seq no {}'.format(tran_id, response["ShardId"], response["SequenceNumber"]))
|
||||||
response["SequenceNumber"]))
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -89,7 +88,7 @@ def prepare_event(event):
|
|||||||
return msg_formatted, msg_key
|
return msg_formatted, msg_key
|
||||||
|
|
||||||
|
|
||||||
def produce_data(kinesis_data_stream, messages_per_sec, input_file, single_run):
|
def produce_data(kinesis_data_stream, messages_per_sec, input_file):
|
||||||
"""
|
"""
|
||||||
Main method for producing
|
Main method for producing
|
||||||
:param kinesis_data_stream: param from cmdline name of KDS
|
:param kinesis_data_stream: param from cmdline name of KDS
|
||||||
@ -119,8 +118,6 @@ def produce_data(kinesis_data_stream, messages_per_sec, input_file, single_run):
|
|||||||
event, key = prepare_event(row)
|
event, key = prepare_event(row)
|
||||||
kp.produce(event, key, kinesis_data_stream)
|
kp.produce(event, key, kinesis_data_stream)
|
||||||
|
|
||||||
if single_run:
|
|
||||||
break
|
|
||||||
replay_cnt += 1
|
replay_cnt += 1
|
||||||
|
|
||||||
|
|
||||||
@ -131,20 +128,16 @@ if __name__ == "__main__":
|
|||||||
parser.add_argument('-k', '--kinesis_ds', dest='kinesis_ds', required=True)
|
parser.add_argument('-k', '--kinesis_ds', dest='kinesis_ds', required=True)
|
||||||
parser.add_argument('-i', '--input_file', dest='input_file', required=False)
|
parser.add_argument('-i', '--input_file', dest='input_file', required=False)
|
||||||
parser.add_argument('-s', '--messages_per_sec', dest='mps', type=int, default=-1, required=False)
|
parser.add_argument('-s', '--messages_per_sec', dest='mps', type=int, default=-1, required=False)
|
||||||
parser.add_argument('-r', '--single-run', dest='singel_run', action='store_true', required=False, default=False)
|
|
||||||
|
|
||||||
args, unknown = parser.parse_known_args()
|
args, unknown = parser.parse_known_args()
|
||||||
config = configparser.ConfigParser()
|
config = configparser.ConfigParser()
|
||||||
|
|
||||||
kinesis_data_stream = args.kinesis_ds
|
kinesis_data_stream = args.kinesis_ds
|
||||||
messages_per_sec = int(args.mps)
|
messages_per_sec = int(args.mps)
|
||||||
|
|
||||||
single_run = args.singel_run if hasattr(args, 'singel_run') else False
|
|
||||||
|
|
||||||
if args.input_file:
|
if args.input_file:
|
||||||
input_file = args.input_file
|
input_file = args.input_file
|
||||||
else:
|
else:
|
||||||
main_path = os.path.abspath(os.path.dirname(__file__))
|
main_path = os.path.abspath(os.path.dirname(__file__))
|
||||||
input_file = os.path.join(main_path, DEFAULT_DATA_FILE)
|
input_file = os.path.join(main_path, DEFAULT_DATA_FILE)
|
||||||
|
|
||||||
produce_data(kinesis_data_stream, messages_per_sec, input_file, single_run)
|
produce_data(kinesis_data_stream, messages_per_sec, input_file)
|
||||||
|
14
labs/labs_1.md
Normal file
14
labs/labs_1.md
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
|
||||||
|
Utwórz nowy folder w swoim repozytorium kodu ./cdp-terraform
|
||||||
|
skopiuj pliki
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
1. Stwórz S3 bucket
|
||||||
|
- sprawdź definicję https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket
|
||||||
|
|
||||||
|
|
||||||
|
2. Stwórz Kinesis Data Stream. Przyjmij do konfiguracji następujące parametry
|
||||||
|
- nazwa : <env = dev>_<inicjały_studenta>_<nr_indeksu> np dev_jk_12345
|
||||||
|
- liczba shardów - 1
|
@ -7,7 +7,7 @@
|
|||||||
1. Wymagania wstępne - środowiska (rekomendowane **PyCharm + Anacoda**)
|
1. Wymagania wstępne - środowiska (rekomendowane **PyCharm + Anacoda**)
|
||||||
* PyCharm - https://www.jetbrains.com/pycharm/download/
|
* PyCharm - https://www.jetbrains.com/pycharm/download/
|
||||||
* Anaconda - https://www.anaconda.com/products/individual#Downloads
|
* Anaconda - https://www.anaconda.com/products/individual#Downloads
|
||||||
- nowe środowisko Python 3.9
|
- nowe środowisko Python 3.8
|
||||||
Windows users : użyj Anaconda Prompt)
|
Windows users : użyj Anaconda Prompt)
|
||||||
Linux / MacOs bash / zsh etc..
|
Linux / MacOs bash / zsh etc..
|
||||||
```
|
```
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
awscli==1.32.91
|
awscli==1.19.33
|
||||||
boto3==1.34.89
|
boto3==1.17.33
|
||||||
configparser==7.0.0
|
configparser==5.0.2
|
||||||
awswrangler==3.7.3
|
awswrangler==2.7.0
|
||||||
|
@ -1,12 +0,0 @@
|
|||||||
#!/bin/bash
|
|
||||||
sudo yum update -y
|
|
||||||
|
|
||||||
wget https://releases.hashicorp.com/terraform/1.8.1/terraform_1.8.1_linux_amd64.zip -P ~/
|
|
||||||
unzip ~/terraform_1.8.1_linux_amd64.zip -d ~/.
|
|
||||||
sudo mv ~/terraform /usr/local/bin
|
|
||||||
|
|
||||||
pip install -r requirements.txt
|
|
||||||
|
|
||||||
echo "alias python='python3'
|
|
||||||
alias tf='terraform'
|
|
||||||
alias c='clear'" >> ~/.bashrc
|
|
@ -1,7 +1,7 @@
|
|||||||
locals {
|
locals {
|
||||||
common_tags = {
|
common_tags = {
|
||||||
purpose = "UAM Cloud Data Processing"
|
Purpose = "UAM Cloud Data Processing"
|
||||||
environment = "DEV"
|
Environment = "DEV"
|
||||||
owner = var.student_full_name
|
Owner = var.student_full_name
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -2,7 +2,7 @@ terraform {
|
|||||||
required_providers {
|
required_providers {
|
||||||
aws = {
|
aws = {
|
||||||
source = "hashicorp/aws"
|
source = "hashicorp/aws"
|
||||||
version = "~> 5.0"
|
version = "~> 3.27"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
17
labs/terraform/solutions/glue.tf
Normal file
17
labs/terraform/solutions/glue.tf
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
resource "aws_glue_catalog_database" "datalake_db_raw_zone" {
|
||||||
|
name = "datalake_${var.environment}_${var.account_number}_${var.student_initials}_${var.student_index_no}"
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
resource "aws_glue_crawler" "glue_crawler_raw_zone" {
|
||||||
|
database_name = aws_glue_catalog_database.datalake_db_raw_zone.name
|
||||||
|
name = "gc-raw-${var.environment}-${var.account_number}-${var.student_initials}-${var.student_index_no}"
|
||||||
|
role = aws_iam_role.glue_crawler_role.arn
|
||||||
|
table_prefix = "crawler_"
|
||||||
|
|
||||||
|
s3_target {
|
||||||
|
path = "s3://${aws_s3_bucket.main_dl_bucket.bucket}/raw-zone/stockdata/"
|
||||||
|
}
|
||||||
|
|
||||||
|
tags = merge(local.common_tags, )
|
||||||
|
}
|
171
labs/terraform/solutions/iam.tf
Normal file
171
labs/terraform/solutions/iam.tf
Normal file
@ -0,0 +1,171 @@
|
|||||||
|
resource "aws_iam_role" "firehose_stream_role" {
|
||||||
|
name = "firehose-role-${var.environment}-${var.account_number}-${var.student_initials}-${var.student_index_no}"
|
||||||
|
|
||||||
|
assume_role_policy = <<EOF
|
||||||
|
{
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Action": "sts:AssumeRole",
|
||||||
|
"Principal": {
|
||||||
|
"Service": "firehose.amazonaws.com"
|
||||||
|
},
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Sid": ""
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
resource "aws_iam_role_policy" "firehose_stream_policy" {
|
||||||
|
name = "firehose-stream-policy-${var.environment}-${var.account_number}-${var.student_initials}-${var.student_index_no}"
|
||||||
|
role = aws_iam_role.firehose_stream_role.id
|
||||||
|
|
||||||
|
policy = <<EOF
|
||||||
|
{
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Action": "kinesis:*",
|
||||||
|
"Resource": "*"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Action": [
|
||||||
|
"s3:AbortMultipartUpload",
|
||||||
|
"s3:GetBucketLocation",
|
||||||
|
"s3:GetObject",
|
||||||
|
"s3:ListBucket",
|
||||||
|
"s3:ListBucketMultipartUploads",
|
||||||
|
"s3:PutObject"
|
||||||
|
],
|
||||||
|
"Resource": [
|
||||||
|
"${aws_s3_bucket.main_dl_bucket.arn}",
|
||||||
|
"${aws_s3_bucket.main_dl_bucket.arn}/*"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Sid": "",
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Action": [
|
||||||
|
"logs:PutLogEvents"
|
||||||
|
],
|
||||||
|
"Resource": [
|
||||||
|
"arn:aws:logs:${var.region}:${var.account_number}:log-group:/aws/kinesisfirehose/*"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Role & policies for Glue Crawler
|
||||||
|
resource "aws_iam_role" "glue_crawler_role" {
|
||||||
|
name = "crawler-role-${var.environment}-${var.account_number}-${var.student_initials}-${var.student_index_no}"
|
||||||
|
|
||||||
|
assume_role_policy = <<EOF
|
||||||
|
{
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Principal": {
|
||||||
|
"Service": "glue.amazonaws.com"
|
||||||
|
},
|
||||||
|
"Action": "sts:AssumeRole"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
data "aws_iam_policy" "glue_service_policy" {
|
||||||
|
arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
|
||||||
|
}
|
||||||
|
|
||||||
|
resource "aws_iam_role_policy" "glue_crawler_user_bucket_policy" {
|
||||||
|
name = "user-bucket-policy-${var.environment}-${var.account_number}-${var.student_initials}-${var.student_index_no}"
|
||||||
|
role = aws_iam_role.glue_crawler_role.id
|
||||||
|
|
||||||
|
policy = <<EOF
|
||||||
|
{
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Action": [
|
||||||
|
"s3:GetObject",
|
||||||
|
"s3:PutObject"
|
||||||
|
],
|
||||||
|
"Resource": [
|
||||||
|
"${aws_s3_bucket.main_dl_bucket.arn}*"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
resource "aws_iam_policy_attachment" "crawler_attach_managed_policy" {
|
||||||
|
name = "crawler-managed-service-${var.environment}-${var.account_number}-${var.student_initials}-${var.student_index_no}"
|
||||||
|
roles = [
|
||||||
|
aws_iam_role.glue_crawler_role.name]
|
||||||
|
policy_arn = data.aws_iam_policy.glue_service_policy.arn
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Role and policies for Lambda
|
||||||
|
resource "aws_iam_role" "lambda_basic_role" {
|
||||||
|
name = "lambda-basic-role-${var.environment}-${var.account_number}-${var.student_initials}-${var.student_index_no}"
|
||||||
|
tags = merge(local.common_tags, )
|
||||||
|
|
||||||
|
assume_role_policy = <<EOF
|
||||||
|
{
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Action": "sts:AssumeRole",
|
||||||
|
"Principal": {
|
||||||
|
"Service": "lambda.amazonaws.com"
|
||||||
|
},
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Sid": ""
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
resource "aws_iam_role_policy" "lambda_basic_policy" {
|
||||||
|
name = "lambda-basic-policy-${var.environment}-${var.account_number}-${var.student_initials}-${var.student_index_no}"
|
||||||
|
role = aws_iam_role.lambda_basic_role.id
|
||||||
|
|
||||||
|
policy = <<EOF
|
||||||
|
{
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Action": [
|
||||||
|
"logs:CreateLogGroup",
|
||||||
|
"logs:CreateLogStream",
|
||||||
|
"logs:PutLogEvents"
|
||||||
|
],
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Resource": "*"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Action": "s3:*",
|
||||||
|
"Resource": [
|
||||||
|
"${aws_s3_bucket.main_dl_bucket.arn}",
|
||||||
|
"${aws_s3_bucket.main_dl_bucket.arn}/*"]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
EOF
|
||||||
|
}
|
||||||
|
|
14
labs/terraform/solutions/kinesis_ds.tf
Normal file
14
labs/terraform/solutions/kinesis_ds.tf
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
resource "aws_kinesis_stream" "cryptostock_stream" {
|
||||||
|
name = "cryptostock-${var.environment}-${var.account_number}-${var.student_initials}-${var.student_index_no}"
|
||||||
|
shard_count = 1
|
||||||
|
enforce_consumer_deletion = true
|
||||||
|
|
||||||
|
shard_level_metrics = [
|
||||||
|
"IncomingBytes",
|
||||||
|
"OutgoingBytes",
|
||||||
|
"IncomingRecords",
|
||||||
|
"OutgoingRecords"
|
||||||
|
]
|
||||||
|
|
||||||
|
tags = merge(local.common_tags, )
|
||||||
|
}
|
18
labs/terraform/solutions/kinesis_fh.tf
Normal file
18
labs/terraform/solutions/kinesis_fh.tf
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
resource "aws_kinesis_firehose_delivery_stream" "stock_delivery_stream" {
|
||||||
|
name = "firehose-${var.environment}-${var.account_number}-${var.student_initials}-${var.student_index_no}"
|
||||||
|
destination = "extended_s3"
|
||||||
|
|
||||||
|
kinesis_source_configuration {
|
||||||
|
kinesis_stream_arn = aws_kinesis_stream.cryptostock_stream.arn
|
||||||
|
role_arn = aws_iam_role.firehose_stream_role.arn
|
||||||
|
}
|
||||||
|
|
||||||
|
extended_s3_configuration {
|
||||||
|
role_arn = aws_iam_role.firehose_stream_role.arn
|
||||||
|
bucket_arn = aws_s3_bucket.main_dl_bucket.arn
|
||||||
|
buffer_size = 1
|
||||||
|
buffer_interval = 60
|
||||||
|
prefix = "raw-zone/stockdata/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
|
||||||
|
error_output_prefix = "${ "raw-zone/stockdata_errors/!{firehose:error-output-type}/year=!{timestamp:yyyy}"}${ "/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}"}/"
|
||||||
|
}
|
||||||
|
}
|
50
labs/terraform/solutions/lambda.tf
Normal file
50
labs/terraform/solutions/lambda.tf
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
resource "aws_lambda_layer_version" "aws_wrangler" {
|
||||||
|
filename = "../lambda/awswrangler-layer-2.7.0-py3.8.zip"
|
||||||
|
layer_name = "aws_wrangler_${var.environment}_${var.account_number}_${var.student_initials}_${var.student_index_no}"
|
||||||
|
source_code_hash = "${filebase64sha256("../lambda/awswrangler-layer-2.7.0-py3.8.zip")}"
|
||||||
|
compatible_runtimes = [
|
||||||
|
"python3.8"]
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
resource "aws_lambda_function" "etl_post_processing" {
|
||||||
|
|
||||||
|
function_name = "etl-post-processing-${var.environment}-${var.account_number}-${var.student_initials}-${var.student_index_no}"
|
||||||
|
filename = "../lambda/lambda_definition.zip"
|
||||||
|
handler = "lambda_definition.etl_function"
|
||||||
|
runtime = "python3.8"
|
||||||
|
role = aws_iam_role.lambda_basic_role.arn
|
||||||
|
timeout = 300
|
||||||
|
memory_size = 512
|
||||||
|
source_code_hash = filebase64sha256("../lambda/lambda_definition.zip")
|
||||||
|
layers = [
|
||||||
|
"${aws_lambda_layer_version.aws_wrangler.arn}"]
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
resource "aws_lambda_permission" "allow_bucket" {
|
||||||
|
statement_id = "AllowExecutionFromS3Bucket"
|
||||||
|
action = "lambda:InvokeFunction"
|
||||||
|
function_name = aws_lambda_function.etl_post_processing.arn
|
||||||
|
principal = "s3.amazonaws.com"
|
||||||
|
source_arn = aws_s3_bucket.main_dl_bucket.arn
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
resource "aws_s3_bucket_notification" "trigger_etl_lambda" {
|
||||||
|
bucket = aws_s3_bucket.main_dl_bucket.id
|
||||||
|
|
||||||
|
lambda_function {
|
||||||
|
lambda_function_arn = aws_lambda_function.etl_post_processing.arn
|
||||||
|
events = [
|
||||||
|
"s3:ObjectCreated:*"]
|
||||||
|
filter_prefix = "raw-zone/"
|
||||||
|
}
|
||||||
|
|
||||||
|
depends_on = [
|
||||||
|
aws_lambda_permission.allow_bucket]
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
7
labs/terraform/solutions/main.tf
Normal file
7
labs/terraform/solutions/main.tf
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
locals {
|
||||||
|
common_tags = {
|
||||||
|
Purpose = "UAM Cloud Data Processing"
|
||||||
|
Environment = "DEV"
|
||||||
|
Owner = var.student_full_name
|
||||||
|
}
|
||||||
|
}
|
75
labs/terraform/solutions/queries.sql
Normal file
75
labs/terraform/solutions/queries.sql
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
-- lab 3.3
|
||||||
|
|
||||||
|
WITH CTE AS
|
||||||
|
(
|
||||||
|
|
||||||
|
SELECT date_format(from_unixtime(transaction_ts),'%Y-%m-%dT%H') as HourlyBucket,
|
||||||
|
RANK() OVER(PARTITION BY date_format(from_unixtime(transaction_ts),'%Y-%m-%dT%H'), symbol ,type ORDER BY dollar_amount DESC) as rnk, *
|
||||||
|
FROM "datalake_dev_100603781557_jk_12345"."crawler_stockdata"
|
||||||
|
|
||||||
|
)
|
||||||
|
select *
|
||||||
|
from CTE
|
||||||
|
where rnk=1
|
||||||
|
order by 1, 4, 8
|
||||||
|
|
||||||
|
-- LAB 4.2
|
||||||
|
|
||||||
|
CREATE EXTERNAL TABLE processed_stockdata(
|
||||||
|
transaction_date timestamp,
|
||||||
|
price double,
|
||||||
|
amount double,
|
||||||
|
dollar_amount double,
|
||||||
|
type string,
|
||||||
|
trans_id bigint)
|
||||||
|
PARTITIONED BY (
|
||||||
|
symbol string,
|
||||||
|
year integer,
|
||||||
|
month integer,
|
||||||
|
day integer,
|
||||||
|
hour integer
|
||||||
|
)
|
||||||
|
ROW FORMAT SERDE
|
||||||
|
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|
||||||
|
STORED AS INPUTFORMAT
|
||||||
|
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
|
||||||
|
OUTPUTFORMAT
|
||||||
|
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|
||||||
|
LOCATION
|
||||||
|
's3://datalake-dev-100603781557-jk-12345/processed-zone/stockdata/'
|
||||||
|
|
||||||
|
|
||||||
|
MSCK REPAIR TABLE processed_stockdata;
|
||||||
|
|
||||||
|
-- LAB 5
|
||||||
|
|
||||||
|
-- .----------. .----------. .----------.
|
||||||
|
-- | SOURCE | | INSERT | | DESTIN. |
|
||||||
|
-- Source-->| STREAM |-->| & SELECT |-->| STREAM |-->Destination
|
||||||
|
-- | | | (PUMP) | | |
|
||||||
|
-- '----------' '----------' '----------'
|
||||||
|
|
||||||
|
|
||||||
|
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
|
||||||
|
("symbol" VARCHAR(10), "type" VARCHAR(10), "trans_id" BIGINT,
|
||||||
|
"dollar_amount" DOUBLE, "AvgLast30seconds" DOUBLE, "CntLast30seconds" INT,
|
||||||
|
"SumLast30rows" DOUBLE, "CntLast30rows" INT, "max_tran_id" BIGINT );
|
||||||
|
|
||||||
|
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
|
||||||
|
SELECT STREAM "symbol", "type", "trans_id", "dollar_amount", "AvgLast30seconds", "CntLast30seconds"
|
||||||
|
, "SumLast30rows", "CntLast30rows", "max_tran_id"
|
||||||
|
FROM (
|
||||||
|
|
||||||
|
SELECT STREAM "symbol", "type", "trans_id", "dollar_amount",
|
||||||
|
AVG("dollar_amount") OVER LAST_30_SECS AS "AvgLast30seconds",
|
||||||
|
COUNT(*) OVER LAST_30_SECS AS "CntLast30seconds",
|
||||||
|
SUM("dollar_amount") OVER LAST_30_ROWS AS "SumLast30rows",
|
||||||
|
COUNT(*) OVER LAST_30_ROWS AS "CntLast30rows",
|
||||||
|
MAX("trans_id") OVER LAST_30_ROWS AS "max_tran_id"
|
||||||
|
FROM "SOURCE_SQL_STREAM_001"
|
||||||
|
WHERE "symbol" = 'BTC_USD'
|
||||||
|
WINDOW
|
||||||
|
LAST_30_SECS AS (PARTITION BY "symbol", "type" RANGE INTERVAL '30' SECOND PRECEDING),
|
||||||
|
LAST_30_ROWS AS (PARTITION BY "symbol", "type" ROWS 30 PRECEDING)
|
||||||
|
)
|
||||||
|
WHERE "dollar_amount" > 4 * ("AvgLast30seconds");
|
6
labs/terraform/solutions/s3.tf
Normal file
6
labs/terraform/solutions/s3.tf
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
resource "aws_s3_bucket" "main_dl_bucket" {
|
||||||
|
bucket = "datalake-${var.environment}-${var.account_number}-${var.student_initials}-${var.student_index_no}"
|
||||||
|
force_destroy = true
|
||||||
|
|
||||||
|
tags = merge(local.common_tags, )
|
||||||
|
}
|
@ -1,7 +1,7 @@
|
|||||||
locals {
|
locals {
|
||||||
common_tags = {
|
common_tags = {
|
||||||
purpose = "UAM Cloud Data Processing"
|
Purpose = "UAM Cloud Data Processing"
|
||||||
environment = "DEV"
|
Environment = "DEV"
|
||||||
owner = var.student_full_name
|
Owner = var.student_full_name
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -2,7 +2,7 @@ terraform {
|
|||||||
required_providers {
|
required_providers {
|
||||||
aws = {
|
aws = {
|
||||||
source = "hashicorp/aws"
|
source = "hashicorp/aws"
|
||||||
version = "~> 5.0"
|
version = "~> 3.27"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
account_number=XXXXX
|
account_number=920628590621
|
||||||
student_initials="jk"
|
student_initials="jk"
|
||||||
student_full_name="Jakub Kasprzak"
|
student_full_name="Jakub Kasprzak"
|
||||||
student_index_no = "12345"
|
student_index_no = "12345"
|
||||||
lab_role_arn = "arn:aws:iam::XXXXX:role/LabRole"
|
|
@ -28,10 +28,4 @@ variable "student_full_name" {
|
|||||||
variable "student_index_no" {
|
variable "student_index_no" {
|
||||||
description = "Index no"
|
description = "Index no"
|
||||||
type = string
|
type = string
|
||||||
}
|
}
|
||||||
|
|
||||||
variable "lab_role_arn" {
|
|
||||||
description = "the role we use for all labs, dont use a single role for everything! it is an anti-pattern!!!!"
|
|
||||||
type = string
|
|
||||||
|
|
||||||
}
|
|
@ -1,5 +1,4 @@
|
|||||||
account_number=XXXXX
|
account_number=920628590621
|
||||||
student_initials="jk"
|
student_initials="jk"
|
||||||
student_full_name="Jakub Kasprzak"
|
student_full_name="Jakub Kasprzak"
|
||||||
student_index_no = "12345"
|
student_index_no = "12345"
|
||||||
lab_role_arn = "arn:aws:iam::XXXXX:role/LabRole"
|
|
@ -28,10 +28,4 @@ variable "student_full_name" {
|
|||||||
variable "student_index_no" {
|
variable "student_index_no" {
|
||||||
description = "Index no"
|
description = "Index no"
|
||||||
type = string
|
type = string
|
||||||
}
|
}
|
||||||
|
|
||||||
variable "lab_role_arn" {
|
|
||||||
description = "the role we use for all labs, dont use a single role for everything! it is an anti-pattern!!!!"
|
|
||||||
type = string
|
|
||||||
|
|
||||||
}
|
|
Binary file not shown.
Binary file not shown.
BIN
pdf/LABS Setup - Przetwarzanie Danych w chmurze publicznej.pdf
Normal file
BIN
pdf/LABS Setup - Przetwarzanie Danych w chmurze publicznej.pdf
Normal file
Binary file not shown.
Binary file not shown.
100
testing-stack/docker-compose.yml
Normal file
100
testing-stack/docker-compose.yml
Normal file
@ -0,0 +1,100 @@
|
|||||||
|
version: '3.7'
|
||||||
|
|
||||||
|
services:
|
||||||
|
zookeeper:
|
||||||
|
image: confluentinc/cp-zookeeper:5.2.2
|
||||||
|
ports:
|
||||||
|
- 2181:2181
|
||||||
|
hostname: zookeeper
|
||||||
|
environment:
|
||||||
|
ZOOKEEPER_CLIENT_PORT: 2181
|
||||||
|
networks:
|
||||||
|
- uam
|
||||||
|
|
||||||
|
kafka:
|
||||||
|
image: confluentinc/cp-kafka:5.2.2
|
||||||
|
ports:
|
||||||
|
- 9092:9092
|
||||||
|
- 9094:9094
|
||||||
|
hostname: kafka
|
||||||
|
environment:
|
||||||
|
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:9094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
|
||||||
|
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
|
||||||
|
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
|
||||||
|
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
|
||||||
|
KAFKA_BROKER_ID: 1
|
||||||
|
KAFKA_LOG4J_LOGGERS: kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO
|
||||||
|
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||||
|
KAFKA_LOG_RETENTION_HOURS: 168000
|
||||||
|
depends_on:
|
||||||
|
- zookeeper
|
||||||
|
networks:
|
||||||
|
- uam
|
||||||
|
|
||||||
|
minio:
|
||||||
|
image: minio/minio
|
||||||
|
ports:
|
||||||
|
- 9000:9000
|
||||||
|
hostname: minio
|
||||||
|
environment:
|
||||||
|
MINIO_ACCESS_KEY: minio
|
||||||
|
MINIO_SECRET_KEY: minio123
|
||||||
|
MINIO_HTTP_TRACE: /dev/stdout
|
||||||
|
command: server /data
|
||||||
|
networks:
|
||||||
|
- uam
|
||||||
|
|
||||||
|
default-buckets:
|
||||||
|
image: minio/mc
|
||||||
|
entrypoint: >
|
||||||
|
/bin/sh -c "
|
||||||
|
sleep 20;
|
||||||
|
/usr/bin/mc config host add minio http://minio:9000 minio minio123;
|
||||||
|
/usr/bin/mc mb minio/dev-data-raw;
|
||||||
|
/usr/bin/mc policy public minio/dev-data-raw;"
|
||||||
|
depends_on:
|
||||||
|
- minio
|
||||||
|
networks:
|
||||||
|
- uam
|
||||||
|
|
||||||
|
kafka-connect:
|
||||||
|
image: confluentinc/cp-kafka-connect:5.2.2
|
||||||
|
hostname: kafka-connect
|
||||||
|
ports:
|
||||||
|
- 8083:8083
|
||||||
|
- 3030:3030
|
||||||
|
environment:
|
||||||
|
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
|
||||||
|
CONNECT_REST_PORT: 8083
|
||||||
|
CONNECT_BOOTSTRAP_SERVERS: kafka:9094
|
||||||
|
CONNECT_GROUP_ID: primary-etl
|
||||||
|
CONNECT_CONFIG_STORAGE_TOPIC: __primary-etl-configs
|
||||||
|
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
|
||||||
|
CONNECT_OFFSET_STORAGE_TOPIC: __primary-etl-offsets
|
||||||
|
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
|
||||||
|
CONNECT_STATUS_STORAGE_TOPIC: __primary-etl-status
|
||||||
|
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
|
||||||
|
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
|
||||||
|
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
|
||||||
|
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
|
||||||
|
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
|
||||||
|
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
|
||||||
|
CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
|
||||||
|
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
|
||||||
|
CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
|
||||||
|
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
|
||||||
|
CONNECT_LOG4J_LOGGERS: org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
|
||||||
|
CONNECT_PLUGIN_PATH: /usr/share/java,/etc/kafka-connect/jars
|
||||||
|
AWS_ACCESS_KEY_ID: minio
|
||||||
|
AWS_SECRET_ACCESS_KEY: minio123
|
||||||
|
|
||||||
|
depends_on:
|
||||||
|
- kafka
|
||||||
|
- default-buckets
|
||||||
|
networks:
|
||||||
|
- uam
|
||||||
|
|
||||||
|
|
||||||
|
networks:
|
||||||
|
uam:
|
||||||
|
name: uam
|
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
69
tests/test_infrastructure_kinesis_ds.py
Normal file
69
tests/test_infrastructure_kinesis_ds.py
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
from unittest import main, TestCase
|
||||||
|
import boto3
|
||||||
|
import configparser as cp
|
||||||
|
import warnings
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
class TestInfraKinesis(TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.kinesis_client = boto3.client('kinesis')
|
||||||
|
|
||||||
|
# read configuration
|
||||||
|
# making path universal for running tests from the module / outside
|
||||||
|
cwd = os.getcwd()
|
||||||
|
extra_dot = '.' if cwd.endswith('tests') else ''
|
||||||
|
config_path = extra_dot + "./labs/terraform/terraform.tfvars"
|
||||||
|
|
||||||
|
with open(config_path, 'r') as f:
|
||||||
|
config_string = '[main]\n' + f.read()
|
||||||
|
config = cp.ConfigParser()
|
||||||
|
config.read_string(config_string)
|
||||||
|
|
||||||
|
self.config = {param: (
|
||||||
|
config["main"][param].replace('"', '') if isinstance(config["main"][param], str) else config["main"][param]
|
||||||
|
) for param in config["main"]}
|
||||||
|
|
||||||
|
warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*<ssl.SSLSocket.*>")
|
||||||
|
|
||||||
|
kinesis_stream_name_pattern = "cryptostock-dev-{account_number}-{student_initials}-{student_index_no}"
|
||||||
|
self.kinesis_stream_name = kinesis_stream_name_pattern.format(account_number=self.config["account_number"],
|
||||||
|
student_initials=self.config["student_initials"],
|
||||||
|
student_index_no=self.config["student_index_no"])
|
||||||
|
|
||||||
|
def test_kinesis_data_stream_exists(self):
|
||||||
|
kinesis_streams = self.kinesis_client.list_streams()["StreamNames"]
|
||||||
|
|
||||||
|
find_stream = next((stream for stream in kinesis_streams if stream == self.kinesis_stream_name), None)
|
||||||
|
|
||||||
|
self.assertEqual(find_stream, self.kinesis_stream_name)
|
||||||
|
|
||||||
|
def test_kinesis_data_stream_config(self):
|
||||||
|
expected_no_of_shards = 1
|
||||||
|
|
||||||
|
stream_config = self.kinesis_client.describe_stream(StreamName=self.kinesis_stream_name)
|
||||||
|
|
||||||
|
# check no of shards
|
||||||
|
no_of_shards = len(stream_config["StreamDescription"]["Shards"])
|
||||||
|
self.assertEqual(no_of_shards, expected_no_of_shards)
|
||||||
|
|
||||||
|
def test_kinesis_data_stream_tags(self):
|
||||||
|
tags = self.kinesis_client.list_tags_for_stream(StreamName=self.kinesis_stream_name)["Tags"]
|
||||||
|
|
||||||
|
find_env_tag = next((tag["Value"] for tag in tags if tag.get("Key") == "Environment"), None)
|
||||||
|
self.assertEqual(find_env_tag.upper(), "DEV")
|
||||||
|
|
||||||
|
def test_kinesis_data_stream_monitoring(self):
|
||||||
|
expected_monitors = ['IncomingBytes', 'OutgoingRecords', 'IncomingRecords', 'OutgoingBytes']
|
||||||
|
|
||||||
|
stream_monitoring = \
|
||||||
|
self.kinesis_client.describe_stream(StreamName=self.kinesis_stream_name)["StreamDescription"][
|
||||||
|
"EnhancedMonitoring"]
|
||||||
|
|
||||||
|
current_monitors = next((x["ShardLevelMetrics"] for x in stream_monitoring if x.get("ShardLevelMetrics")), None)
|
||||||
|
self.assertTrue(set(current_monitors).issuperset(set(expected_monitors)))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
73
tests/test_infrastructure_kinesis_firehose.py
Normal file
73
tests/test_infrastructure_kinesis_firehose.py
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
from unittest import main, TestCase
|
||||||
|
import boto3
|
||||||
|
import configparser as cp
|
||||||
|
import warnings
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
class TestInfraKinesisFH(TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.firehose_client = boto3.client('firehose')
|
||||||
|
self.kinesis_client = boto3.client('kinesis')
|
||||||
|
|
||||||
|
# read configuration
|
||||||
|
# making path universal for running tests from the module / outside
|
||||||
|
cwd = os.getcwd()
|
||||||
|
extra_dot = '.' if cwd.endswith('tests') else ''
|
||||||
|
config_path = extra_dot + "./labs/terraform/terraform.tfvars"
|
||||||
|
|
||||||
|
with open(config_path, 'r') as f:
|
||||||
|
config_string = '[main]\n' + f.read()
|
||||||
|
config = cp.ConfigParser()
|
||||||
|
config.read_string(config_string)
|
||||||
|
|
||||||
|
self.config = {param: (
|
||||||
|
config["main"][param].replace('"', '') if isinstance(config["main"][param], str) else config["main"][param]
|
||||||
|
) for param in config["main"]}
|
||||||
|
|
||||||
|
warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*<ssl.SSLSocket.*>")
|
||||||
|
|
||||||
|
firehose_stream_name_pattern = "firehose-dev-{account_number}-{student_initials}-{student_index_no}"
|
||||||
|
self.firehose_stream_name = firehose_stream_name_pattern.format(account_number=self.config["account_number"],
|
||||||
|
student_initials=self.config[
|
||||||
|
"student_initials"],
|
||||||
|
student_index_no=self.config[
|
||||||
|
"student_index_no"])
|
||||||
|
|
||||||
|
kinesis_stream_name_pattern = "cryptostock-dev-{account_number}-{student_initials}-{student_index_no}"
|
||||||
|
self.kinesis_stream_name = kinesis_stream_name_pattern.format(account_number=self.config["account_number"],
|
||||||
|
student_initials=self.config["student_initials"],
|
||||||
|
student_index_no=self.config["student_index_no"])
|
||||||
|
|
||||||
|
def test_kinesis_firehose_exists(self):
|
||||||
|
deliver_streams = self.firehose_client.list_delivery_streams()["DeliveryStreamNames"]
|
||||||
|
find_stream = next((stream for stream in deliver_streams if stream == self.firehose_stream_name), None)
|
||||||
|
|
||||||
|
self.assertEqual(find_stream, self.firehose_stream_name)
|
||||||
|
|
||||||
|
def test_kinesis_firehose_source(self):
|
||||||
|
config = self.firehose_client.describe_delivery_stream(DeliveryStreamName=self.firehose_stream_name)
|
||||||
|
source = config["DeliveryStreamDescription"]["Source"]["KinesisStreamSourceDescription"]["KinesisStreamARN"]
|
||||||
|
|
||||||
|
kinesis_stream_config = self.kinesis_client.describe_stream(StreamName=self.kinesis_stream_name)
|
||||||
|
kds_arn = kinesis_stream_config["StreamDescription"]["StreamARN"]
|
||||||
|
|
||||||
|
self.assertEqual(kds_arn, source)
|
||||||
|
|
||||||
|
def test_kinesis_firehose_dest_config(self):
|
||||||
|
config = self.firehose_client.describe_delivery_stream(DeliveryStreamName=self.firehose_stream_name)
|
||||||
|
destination = config["DeliveryStreamDescription"]["Destinations"][0]
|
||||||
|
|
||||||
|
expected_pref = 'raw-zone/stockdata/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/'
|
||||||
|
expected_err_pref = 'raw-zone/stockdata_errors/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/'
|
||||||
|
|
||||||
|
dest_prefix = destination.get("S3DestinationDescription", None).get("Prefix", None)
|
||||||
|
dest_errors_prefix = destination.get("S3DestinationDescription", None).get("ErrorOutputPrefix", None)
|
||||||
|
|
||||||
|
self.assertEqual(dest_prefix, expected_pref)
|
||||||
|
self.assertEqual(expected_err_pref, dest_errors_prefix)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
52
tests/test_infrastructure_s3.py
Normal file
52
tests/test_infrastructure_s3.py
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
from unittest import main, TestCase
|
||||||
|
import boto3
|
||||||
|
import configparser as cp
|
||||||
|
import warnings
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
class TestInfraS3(TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.s3_client = boto3.client('s3')
|
||||||
|
|
||||||
|
# read configuration
|
||||||
|
# making path universal for running tests from the module / outside
|
||||||
|
cwd = os.getcwd()
|
||||||
|
extra_dot = '.' if cwd.endswith('tests') else ''
|
||||||
|
config_path = extra_dot + "./labs/terraform/terraform.tfvars"
|
||||||
|
|
||||||
|
with open(config_path, 'r') as f:
|
||||||
|
config_string = '[main]\n' + f.read()
|
||||||
|
config = cp.ConfigParser()
|
||||||
|
config.read_string(config_string)
|
||||||
|
|
||||||
|
self.config = {param: (
|
||||||
|
config["main"][param].replace('"', '') if isinstance(config["main"][param], str) else config["main"][param]
|
||||||
|
) for param in config["main"]}
|
||||||
|
|
||||||
|
warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*<ssl.SSLSocket.*>")
|
||||||
|
|
||||||
|
bucket_name_pattern = "datalake-dev-{account_number}-{student_initials}-{student_index_no}"
|
||||||
|
self.bucket_name = bucket_name_pattern.format(account_number=self.config["account_number"],
|
||||||
|
student_initials=self.config["student_initials"],
|
||||||
|
student_index_no=self.config["student_index_no"])
|
||||||
|
|
||||||
|
def test_main_s3_bucket_exists(self):
|
||||||
|
s3_buckets = self.s3_client.list_buckets()["Buckets"]
|
||||||
|
|
||||||
|
find_bucket = next((bucket["Name"] for bucket in s3_buckets if bucket["Name"] == self.bucket_name), None)
|
||||||
|
|
||||||
|
self.assertEqual(find_bucket, self.bucket_name)
|
||||||
|
|
||||||
|
def test_main_s3_bucket_comfig(self):
|
||||||
|
tags = self.s3_client.get_bucket_tagging(
|
||||||
|
Bucket=self.bucket_name
|
||||||
|
)["TagSet"]
|
||||||
|
|
||||||
|
find_env_tag = next((tag["Value"] for tag in tags if tag.get("Key") == "Environment"), None)
|
||||||
|
self.assertEqual(find_env_tag.upper(), "DEV")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
Loading…
Reference in New Issue
Block a user