Compare commits

..

10 Commits

Author SHA1 Message Date
c543423de6 updated version of pdf labs 2024-05-11 07:01:28 +02:00
4da2340ec3 updated instruction for labs 2024-04-25 07:42:47 +02:00
f9bce209e3 updated instructions 2024-04-25 07:31:32 +02:00
bd230c3c97 updated instructions 2024-04-25 07:29:06 +02:00
fb140fc7cd updated instructions 2024-04-25 07:24:02 +02:00
60f469bed0 fixes in the starter files 2024-04-24 15:42:33 +02:00
25f4d03de0 fixes in the generator 2024-04-24 12:55:37 +02:00
EC2 Default User
5bce0dbda4 updating scripts 2024-04-24 06:09:50 +00:00
5dcb6ea56f Updating tags 2023-03-25 06:49:38 +01:00
6770f463c1 presentation 2022-05-15 10:10:23 +02:00
22 changed files with 54 additions and 329 deletions

View File

@ -45,7 +45,7 @@ class KinesisProducer:
# invoking a Lambda to add those new lines).Every message is a dumped json with \n # invoking a Lambda to add those new lines).Every message is a dumped json with \n
tran_id = event["trans_id"] tran_id = event["trans_id"]
payload = (json.dumps(event)+'\n').encode('utf-8') payload = (json.dumps(event) + '\n').encode('utf-8')
attempt = 1 attempt = 1
while attempt < self.max_retry_attempt: while attempt < self.max_retry_attempt:
@ -55,7 +55,8 @@ class KinesisProducer:
Data=payload, Data=payload,
PartitionKey=key PartitionKey=key
) )
logger.info('Msg with trans_id={} sent to shard {} seq no {}'.format(tran_id, response["ShardId"], response["SequenceNumber"])) logger.info('Msg with trans_id={} sent to shard {} seq no {}'.format(tran_id, response["ShardId"],
response["SequenceNumber"]))
return response return response
except Exception as e: except Exception as e:
@ -88,7 +89,7 @@ def prepare_event(event):
return msg_formatted, msg_key return msg_formatted, msg_key
def produce_data(kinesis_data_stream, messages_per_sec, input_file): def produce_data(kinesis_data_stream, messages_per_sec, input_file, single_run):
""" """
Main method for producing Main method for producing
:param kinesis_data_stream: param from cmdline name of KDS :param kinesis_data_stream: param from cmdline name of KDS
@ -118,6 +119,8 @@ def produce_data(kinesis_data_stream, messages_per_sec, input_file):
event, key = prepare_event(row) event, key = prepare_event(row)
kp.produce(event, key, kinesis_data_stream) kp.produce(event, key, kinesis_data_stream)
if single_run:
break
replay_cnt += 1 replay_cnt += 1
@ -128,16 +131,20 @@ if __name__ == "__main__":
parser.add_argument('-k', '--kinesis_ds', dest='kinesis_ds', required=True) parser.add_argument('-k', '--kinesis_ds', dest='kinesis_ds', required=True)
parser.add_argument('-i', '--input_file', dest='input_file', required=False) parser.add_argument('-i', '--input_file', dest='input_file', required=False)
parser.add_argument('-s', '--messages_per_sec', dest='mps', type=int, default=-1, required=False) parser.add_argument('-s', '--messages_per_sec', dest='mps', type=int, default=-1, required=False)
parser.add_argument('-r', '--single-run', dest='singel_run', action='store_true', required=False, default=False)
args, unknown = parser.parse_known_args() args, unknown = parser.parse_known_args()
config = configparser.ConfigParser() config = configparser.ConfigParser()
kinesis_data_stream = args.kinesis_ds kinesis_data_stream = args.kinesis_ds
messages_per_sec = int(args.mps) messages_per_sec = int(args.mps)
single_run = args.singel_run if hasattr(args, 'singel_run') else False
if args.input_file: if args.input_file:
input_file = args.input_file input_file = args.input_file
else: else:
main_path = os.path.abspath(os.path.dirname(__file__)) main_path = os.path.abspath(os.path.dirname(__file__))
input_file = os.path.join(main_path, DEFAULT_DATA_FILE) input_file = os.path.join(main_path, DEFAULT_DATA_FILE)
produce_data(kinesis_data_stream, messages_per_sec, input_file) produce_data(kinesis_data_stream, messages_per_sec, input_file, single_run)

View File

@ -1,14 +0,0 @@
Utwórz nowy folder w swoim repozytorium kodu ./cdp-terraform
skopiuj pliki
1. Stwórz S3 bucket
- sprawdź definicję https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket
2. Stwórz Kinesis Data Stream. Przyjmij do konfiguracji następujące parametry
- nazwa : <env = dev>_<inicjały_studenta>_<nr_indeksu> np dev_jk_12345
- liczba shardów - 1

View File

@ -7,7 +7,7 @@
1. Wymagania wstępne - środowiska (rekomendowane **PyCharm + Anacoda**) 1. Wymagania wstępne - środowiska (rekomendowane **PyCharm + Anacoda**)
* PyCharm - https://www.jetbrains.com/pycharm/download/ * PyCharm - https://www.jetbrains.com/pycharm/download/
* Anaconda - https://www.anaconda.com/products/individual#Downloads * Anaconda - https://www.anaconda.com/products/individual#Downloads
- nowe środowisko Python 3.8 - nowe środowisko Python 3.9
Windows users : użyj Anaconda Prompt) Windows users : użyj Anaconda Prompt)
Linux / MacOs bash / zsh etc.. Linux / MacOs bash / zsh etc..
``` ```

View File

@ -1,4 +1,4 @@
awscli==1.19.33 awscli==1.32.91
boto3==1.17.33 boto3==1.34.89
configparser==5.0.2 configparser==7.0.0
awswrangler==2.7.0 awswrangler==3.7.3

12
labs/setup_c9.sh Normal file
View File

@ -0,0 +1,12 @@
#!/bin/bash
sudo yum update -y
wget https://releases.hashicorp.com/terraform/1.8.1/terraform_1.8.1_linux_amd64.zip -P ~/
unzip ~/terraform_1.8.1_linux_amd64.zip -d ~/.
sudo mv ~/terraform /usr/local/bin
pip install -r requirements.txt
echo "alias python='python3'
alias tf='terraform'
alias c='clear'" >> ~/.bashrc

View File

@ -1,7 +1,7 @@
locals { locals {
common_tags = { common_tags = {
Purpose = "UAM Cloud Data Processing" purpose = "UAM Cloud Data Processing"
Environment = "DEV" environment = "DEV"
Owner = var.student_full_name owner = var.student_full_name
} }
} }

View File

@ -2,7 +2,7 @@ terraform {
required_providers { required_providers {
aws = { aws = {
source = "hashicorp/aws" source = "hashicorp/aws"
version = "~> 3.27" version = "~> 5.0"
} }
} }
} }

View File

@ -1,7 +1,7 @@
locals { locals {
common_tags = { common_tags = {
Purpose = "UAM Cloud Data Processing" purpose = "UAM Cloud Data Processing"
Environment = "DEV" environment = "DEV"
Owner = var.student_full_name owner = var.student_full_name
} }
} }

View File

@ -2,7 +2,7 @@ terraform {
required_providers { required_providers {
aws = { aws = {
source = "hashicorp/aws" source = "hashicorp/aws"
version = "~> 3.27" version = "~> 5.0"
} }
} }
} }

View File

@ -1,4 +1,5 @@
account_number=920628590621 account_number=XXXXX
student_initials="jk" student_initials="jk"
student_full_name="Jakub Kasprzak" student_full_name="Jakub Kasprzak"
student_index_no = "12345" student_index_no = "12345"
lab_role_arn = "arn:aws:iam::XXXXX:role/LabRole"

View File

@ -28,4 +28,10 @@ variable "student_full_name" {
variable "student_index_no" { variable "student_index_no" {
description = "Index no" description = "Index no"
type = string type = string
} }
variable "lab_role_arn" {
description = "the role we use for all labs, dont use a single role for everything! it is an anti-pattern!!!!"
type = string
}

View File

@ -1,4 +1,5 @@
account_number=920628590621 account_number=XXXXX
student_initials="jk" student_initials="jk"
student_full_name="Jakub Kasprzak" student_full_name="Jakub Kasprzak"
student_index_no = "12345" student_index_no = "12345"
lab_role_arn = "arn:aws:iam::XXXXX:role/LabRole"

View File

@ -28,4 +28,10 @@ variable "student_full_name" {
variable "student_index_no" { variable "student_index_no" {
description = "Index no" description = "Index no"
type = string type = string
} }
variable "lab_role_arn" {
description = "the role we use for all labs, dont use a single role for everything! it is an anti-pattern!!!!"
type = string
}

Binary file not shown.

Binary file not shown.

View File

@ -1,100 +0,0 @@
version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.2.2
ports:
- 2181:2181
hostname: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
networks:
- uam
kafka:
image: confluentinc/cp-kafka:5.2.2
ports:
- 9092:9092
- 9094:9094
hostname: kafka
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:9094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_HOURS: 168000
depends_on:
- zookeeper
networks:
- uam
minio:
image: minio/minio
ports:
- 9000:9000
hostname: minio
environment:
MINIO_ACCESS_KEY: minio
MINIO_SECRET_KEY: minio123
MINIO_HTTP_TRACE: /dev/stdout
command: server /data
networks:
- uam
default-buckets:
image: minio/mc
entrypoint: >
/bin/sh -c "
sleep 20;
/usr/bin/mc config host add minio http://minio:9000 minio minio123;
/usr/bin/mc mb minio/dev-data-raw;
/usr/bin/mc policy public minio/dev-data-raw;"
depends_on:
- minio
networks:
- uam
kafka-connect:
image: confluentinc/cp-kafka-connect:5.2.2
hostname: kafka-connect
ports:
- 8083:8083
- 3030:3030
environment:
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_REST_PORT: 8083
CONNECT_BOOTSTRAP_SERVERS: kafka:9094
CONNECT_GROUP_ID: primary-etl
CONNECT_CONFIG_STORAGE_TOPIC: __primary-etl-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: __primary-etl-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: __primary-etl-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
CONNECT_LOG4J_LOGGERS: org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
CONNECT_PLUGIN_PATH: /usr/share/java,/etc/kafka-connect/jars
AWS_ACCESS_KEY_ID: minio
AWS_SECRET_ACCESS_KEY: minio123
depends_on:
- kafka
- default-buckets
networks:
- uam
networks:
uam:
name: uam

View File

View File

@ -1,69 +0,0 @@
from unittest import main, TestCase
import boto3
import configparser as cp
import warnings
import os
class TestInfraKinesis(TestCase):
def setUp(self):
self.kinesis_client = boto3.client('kinesis')
# read configuration
# making path universal for running tests from the module / outside
cwd = os.getcwd()
extra_dot = '.' if cwd.endswith('tests') else ''
config_path = extra_dot + "./labs/terraform/terraform.tfvars"
with open(config_path, 'r') as f:
config_string = '[main]\n' + f.read()
config = cp.ConfigParser()
config.read_string(config_string)
self.config = {param: (
config["main"][param].replace('"', '') if isinstance(config["main"][param], str) else config["main"][param]
) for param in config["main"]}
warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*<ssl.SSLSocket.*>")
kinesis_stream_name_pattern = "cryptostock-dev-{account_number}-{student_initials}-{student_index_no}"
self.kinesis_stream_name = kinesis_stream_name_pattern.format(account_number=self.config["account_number"],
student_initials=self.config["student_initials"],
student_index_no=self.config["student_index_no"])
def test_kinesis_data_stream_exists(self):
kinesis_streams = self.kinesis_client.list_streams()["StreamNames"]
find_stream = next((stream for stream in kinesis_streams if stream == self.kinesis_stream_name), None)
self.assertEqual(find_stream, self.kinesis_stream_name)
def test_kinesis_data_stream_config(self):
expected_no_of_shards = 1
stream_config = self.kinesis_client.describe_stream(StreamName=self.kinesis_stream_name)
# check no of shards
no_of_shards = len(stream_config["StreamDescription"]["Shards"])
self.assertEqual(no_of_shards, expected_no_of_shards)
def test_kinesis_data_stream_tags(self):
tags = self.kinesis_client.list_tags_for_stream(StreamName=self.kinesis_stream_name)["Tags"]
find_env_tag = next((tag["Value"] for tag in tags if tag.get("Key") == "Environment"), None)
self.assertEqual(find_env_tag.upper(), "DEV")
def test_kinesis_data_stream_monitoring(self):
expected_monitors = ['IncomingBytes', 'OutgoingRecords', 'IncomingRecords', 'OutgoingBytes']
stream_monitoring = \
self.kinesis_client.describe_stream(StreamName=self.kinesis_stream_name)["StreamDescription"][
"EnhancedMonitoring"]
current_monitors = next((x["ShardLevelMetrics"] for x in stream_monitoring if x.get("ShardLevelMetrics")), None)
self.assertTrue(set(current_monitors).issuperset(set(expected_monitors)))
if __name__ == '__main__':
main()

View File

@ -1,73 +0,0 @@
from unittest import main, TestCase
import boto3
import configparser as cp
import warnings
import os
class TestInfraKinesisFH(TestCase):
def setUp(self):
self.firehose_client = boto3.client('firehose')
self.kinesis_client = boto3.client('kinesis')
# read configuration
# making path universal for running tests from the module / outside
cwd = os.getcwd()
extra_dot = '.' if cwd.endswith('tests') else ''
config_path = extra_dot + "./labs/terraform/terraform.tfvars"
with open(config_path, 'r') as f:
config_string = '[main]\n' + f.read()
config = cp.ConfigParser()
config.read_string(config_string)
self.config = {param: (
config["main"][param].replace('"', '') if isinstance(config["main"][param], str) else config["main"][param]
) for param in config["main"]}
warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*<ssl.SSLSocket.*>")
firehose_stream_name_pattern = "firehose-dev-{account_number}-{student_initials}-{student_index_no}"
self.firehose_stream_name = firehose_stream_name_pattern.format(account_number=self.config["account_number"],
student_initials=self.config[
"student_initials"],
student_index_no=self.config[
"student_index_no"])
kinesis_stream_name_pattern = "cryptostock-dev-{account_number}-{student_initials}-{student_index_no}"
self.kinesis_stream_name = kinesis_stream_name_pattern.format(account_number=self.config["account_number"],
student_initials=self.config["student_initials"],
student_index_no=self.config["student_index_no"])
def test_kinesis_firehose_exists(self):
deliver_streams = self.firehose_client.list_delivery_streams()["DeliveryStreamNames"]
find_stream = next((stream for stream in deliver_streams if stream == self.firehose_stream_name), None)
self.assertEqual(find_stream, self.firehose_stream_name)
def test_kinesis_firehose_source(self):
config = self.firehose_client.describe_delivery_stream(DeliveryStreamName=self.firehose_stream_name)
source = config["DeliveryStreamDescription"]["Source"]["KinesisStreamSourceDescription"]["KinesisStreamARN"]
kinesis_stream_config = self.kinesis_client.describe_stream(StreamName=self.kinesis_stream_name)
kds_arn = kinesis_stream_config["StreamDescription"]["StreamARN"]
self.assertEqual(kds_arn, source)
def test_kinesis_firehose_dest_config(self):
config = self.firehose_client.describe_delivery_stream(DeliveryStreamName=self.firehose_stream_name)
destination = config["DeliveryStreamDescription"]["Destinations"][0]
expected_pref = 'raw-zone/stockdata/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/'
expected_err_pref = 'raw-zone/stockdata_errors/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/'
dest_prefix = destination.get("S3DestinationDescription", None).get("Prefix", None)
dest_errors_prefix = destination.get("S3DestinationDescription", None).get("ErrorOutputPrefix", None)
self.assertEqual(dest_prefix, expected_pref)
self.assertEqual(expected_err_pref, dest_errors_prefix)
if __name__ == '__main__':
main()

View File

@ -1,52 +0,0 @@
from unittest import main, TestCase
import boto3
import configparser as cp
import warnings
import os
class TestInfraS3(TestCase):
def setUp(self):
self.s3_client = boto3.client('s3')
# read configuration
# making path universal for running tests from the module / outside
cwd = os.getcwd()
extra_dot = '.' if cwd.endswith('tests') else ''
config_path = extra_dot + "./labs/terraform/terraform.tfvars"
with open(config_path, 'r') as f:
config_string = '[main]\n' + f.read()
config = cp.ConfigParser()
config.read_string(config_string)
self.config = {param: (
config["main"][param].replace('"', '') if isinstance(config["main"][param], str) else config["main"][param]
) for param in config["main"]}
warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*<ssl.SSLSocket.*>")
bucket_name_pattern = "datalake-dev-{account_number}-{student_initials}-{student_index_no}"
self.bucket_name = bucket_name_pattern.format(account_number=self.config["account_number"],
student_initials=self.config["student_initials"],
student_index_no=self.config["student_index_no"])
def test_main_s3_bucket_exists(self):
s3_buckets = self.s3_client.list_buckets()["Buckets"]
find_bucket = next((bucket["Name"] for bucket in s3_buckets if bucket["Name"] == self.bucket_name), None)
self.assertEqual(find_bucket, self.bucket_name)
def test_main_s3_bucket_comfig(self):
tags = self.s3_client.get_bucket_tagging(
Bucket=self.bucket_name
)["TagSet"]
find_env_tag = next((tag["Value"] for tag in tags if tag.get("Key") == "Environment"), None)
self.assertEqual(find_env_tag.upper(), "DEV")
if __name__ == '__main__':
main()