diff --git a/labs/data_generator/generator.py b/labs/data_generator/generator.py index c961686..a72c118 100644 --- a/labs/data_generator/generator.py +++ b/labs/data_generator/generator.py @@ -89,7 +89,7 @@ def prepare_event(event): return msg_formatted, msg_key -def produce_data(kinesis_data_stream, messages_per_sec, input_file, singel_run): +def produce_data(kinesis_data_stream, messages_per_sec, input_file, single_run): """ Main method for producing :param kinesis_data_stream: param from cmdline name of KDS @@ -119,7 +119,7 @@ def produce_data(kinesis_data_stream, messages_per_sec, input_file, singel_run): event, key = prepare_event(row) kp.produce(event, key, kinesis_data_stream) - if singel_run: + if single_run: break replay_cnt += 1 @@ -139,7 +139,7 @@ if __name__ == "__main__": kinesis_data_stream = args.kinesis_ds messages_per_sec = int(args.mps) - singel_run = args.singel_run if hasattr(args, 'singel_run') else False + single_run = args.singel_run if hasattr(args, 'singel_run') else False if args.input_file: input_file = args.input_file @@ -147,4 +147,4 @@ if __name__ == "__main__": main_path = os.path.abspath(os.path.dirname(__file__)) input_file = os.path.join(main_path, DEFAULT_DATA_FILE) - produce_data(kinesis_data_stream, messages_per_sec, input_file, singel_run) + produce_data(kinesis_data_stream, messages_per_sec, input_file, single_run) diff --git a/labs/labs_1.md b/labs/labs_1.md deleted file mode 100644 index 19ec126..0000000 --- a/labs/labs_1.md +++ /dev/null @@ -1,14 +0,0 @@ - -Utwórz nowy folder w swoim repozytorium kodu ./cdp-terraform -skopiuj pliki - - - - -1. Stwórz S3 bucket -- sprawdź definicję https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket - - -2. Stwórz Kinesis Data Stream. Przyjmij do konfiguracji następujące parametry -- nazwa : __ np dev_jk_12345 -- liczba shardów - 1 diff --git a/labs/labs_preparation.md b/labs/labs_preparation.md index 5e51e73..0ab5f83 100644 --- a/labs/labs_preparation.md +++ b/labs/labs_preparation.md @@ -7,7 +7,7 @@ 1. Wymagania wstępne - środowiska (rekomendowane **PyCharm + Anacoda**) * PyCharm - https://www.jetbrains.com/pycharm/download/ * Anaconda - https://www.anaconda.com/products/individual#Downloads - - nowe środowisko Python 3.8 + - nowe środowisko Python 3.9 Windows users : użyj Anaconda Prompt) Linux / MacOs bash / zsh etc.. ``` diff --git a/pdf/Intro_to_cloud_computing.pdf b/pdf/Intro_to_cloud_computing.pdf index d59f16a..c2ef9b5 100644 Binary files a/pdf/Intro_to_cloud_computing.pdf and b/pdf/Intro_to_cloud_computing.pdf differ diff --git a/pdf/LABS Przetwarzanie Danych w chmurze publicznej.pdf b/pdf/LABS Przetwarzanie Danych w chmurze publicznej.pdf deleted file mode 100644 index b60703d..0000000 Binary files a/pdf/LABS Przetwarzanie Danych w chmurze publicznej.pdf and /dev/null differ diff --git a/pdf/LABS Setup - Przetwarzanie Danych w chmurze publicznej.pdf b/pdf/LABS Setup - Przetwarzanie Danych w chmurze publicznej.pdf deleted file mode 100644 index 753cee1..0000000 Binary files a/pdf/LABS Setup - Przetwarzanie Danych w chmurze publicznej.pdf and /dev/null differ diff --git a/pdf/cloud_data_processing.pdf b/pdf/cloud_data_processing.pdf index 2e8dacd..999327d 100644 Binary files a/pdf/cloud_data_processing.pdf and b/pdf/cloud_data_processing.pdf differ diff --git a/testing-stack/docker-compose.yml b/testing-stack/docker-compose.yml deleted file mode 100644 index 8ce7122..0000000 --- a/testing-stack/docker-compose.yml +++ /dev/null @@ -1,100 +0,0 @@ -version: '3.7' - -services: - zookeeper: - image: confluentinc/cp-zookeeper:5.2.2 - ports: - - 2181:2181 - hostname: zookeeper - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - networks: - - uam - - kafka: - image: confluentinc/cp-kafka:5.2.2 - ports: - - 9092:9092 - - 9094:9094 - hostname: kafka - environment: - KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:9094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_BROKER_ID: 1 - KAFKA_LOG4J_LOGGERS: kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_LOG_RETENTION_HOURS: 168000 - depends_on: - - zookeeper - networks: - - uam - - minio: - image: minio/minio - ports: - - 9000:9000 - hostname: minio - environment: - MINIO_ACCESS_KEY: minio - MINIO_SECRET_KEY: minio123 - MINIO_HTTP_TRACE: /dev/stdout - command: server /data - networks: - - uam - - default-buckets: - image: minio/mc - entrypoint: > - /bin/sh -c " - sleep 20; - /usr/bin/mc config host add minio http://minio:9000 minio minio123; - /usr/bin/mc mb minio/dev-data-raw; - /usr/bin/mc policy public minio/dev-data-raw;" - depends_on: - - minio - networks: - - uam - - kafka-connect: - image: confluentinc/cp-kafka-connect:5.2.2 - hostname: kafka-connect - ports: - - 8083:8083 - - 3030:3030 - environment: - CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect - CONNECT_REST_PORT: 8083 - CONNECT_BOOTSTRAP_SERVERS: kafka:9094 - CONNECT_GROUP_ID: primary-etl - CONNECT_CONFIG_STORAGE_TOPIC: __primary-etl-configs - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_OFFSET_STORAGE_TOPIC: __primary-etl-offsets - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_STATUS_STORAGE_TOPIC: __primary-etl-status - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false" - CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" - CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter - CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE: "false" - CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter - CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" - CONNECT_LOG4J_ROOT_LOGLEVEL: INFO - CONNECT_LOG4J_LOGGERS: org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR - CONNECT_PLUGIN_PATH: /usr/share/java,/etc/kafka-connect/jars - AWS_ACCESS_KEY_ID: minio - AWS_SECRET_ACCESS_KEY: minio123 - - depends_on: - - kafka - - default-buckets - networks: - - uam - - -networks: - uam: - name: uam diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/test_infrastructure_kinesis_ds.py b/tests/test_infrastructure_kinesis_ds.py deleted file mode 100644 index 9ec1d2f..0000000 --- a/tests/test_infrastructure_kinesis_ds.py +++ /dev/null @@ -1,69 +0,0 @@ -from unittest import main, TestCase -import boto3 -import configparser as cp -import warnings -import os - - -class TestInfraKinesis(TestCase): - - def setUp(self): - self.kinesis_client = boto3.client('kinesis') - - # read configuration - # making path universal for running tests from the module / outside - cwd = os.getcwd() - extra_dot = '.' if cwd.endswith('tests') else '' - config_path = extra_dot + "./labs/terraform/terraform.tfvars" - - with open(config_path, 'r') as f: - config_string = '[main]\n' + f.read() - config = cp.ConfigParser() - config.read_string(config_string) - - self.config = {param: ( - config["main"][param].replace('"', '') if isinstance(config["main"][param], str) else config["main"][param] - ) for param in config["main"]} - - warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*") - - kinesis_stream_name_pattern = "cryptostock-dev-{account_number}-{student_initials}-{student_index_no}" - self.kinesis_stream_name = kinesis_stream_name_pattern.format(account_number=self.config["account_number"], - student_initials=self.config["student_initials"], - student_index_no=self.config["student_index_no"]) - - def test_kinesis_data_stream_exists(self): - kinesis_streams = self.kinesis_client.list_streams()["StreamNames"] - - find_stream = next((stream for stream in kinesis_streams if stream == self.kinesis_stream_name), None) - - self.assertEqual(find_stream, self.kinesis_stream_name) - - def test_kinesis_data_stream_config(self): - expected_no_of_shards = 1 - - stream_config = self.kinesis_client.describe_stream(StreamName=self.kinesis_stream_name) - - # check no of shards - no_of_shards = len(stream_config["StreamDescription"]["Shards"]) - self.assertEqual(no_of_shards, expected_no_of_shards) - - def test_kinesis_data_stream_tags(self): - tags = self.kinesis_client.list_tags_for_stream(StreamName=self.kinesis_stream_name)["Tags"] - - find_env_tag = next((tag["Value"] for tag in tags if tag.get("Key") == "Environment"), None) - self.assertEqual(find_env_tag.upper(), "DEV") - - def test_kinesis_data_stream_monitoring(self): - expected_monitors = ['IncomingBytes', 'OutgoingRecords', 'IncomingRecords', 'OutgoingBytes'] - - stream_monitoring = \ - self.kinesis_client.describe_stream(StreamName=self.kinesis_stream_name)["StreamDescription"][ - "EnhancedMonitoring"] - - current_monitors = next((x["ShardLevelMetrics"] for x in stream_monitoring if x.get("ShardLevelMetrics")), None) - self.assertTrue(set(current_monitors).issuperset(set(expected_monitors))) - - -if __name__ == '__main__': - main() diff --git a/tests/test_infrastructure_kinesis_firehose.py b/tests/test_infrastructure_kinesis_firehose.py deleted file mode 100644 index a94701b..0000000 --- a/tests/test_infrastructure_kinesis_firehose.py +++ /dev/null @@ -1,73 +0,0 @@ -from unittest import main, TestCase -import boto3 -import configparser as cp -import warnings -import os - - -class TestInfraKinesisFH(TestCase): - - def setUp(self): - self.firehose_client = boto3.client('firehose') - self.kinesis_client = boto3.client('kinesis') - - # read configuration - # making path universal for running tests from the module / outside - cwd = os.getcwd() - extra_dot = '.' if cwd.endswith('tests') else '' - config_path = extra_dot + "./labs/terraform/terraform.tfvars" - - with open(config_path, 'r') as f: - config_string = '[main]\n' + f.read() - config = cp.ConfigParser() - config.read_string(config_string) - - self.config = {param: ( - config["main"][param].replace('"', '') if isinstance(config["main"][param], str) else config["main"][param] - ) for param in config["main"]} - - warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*") - - firehose_stream_name_pattern = "firehose-dev-{account_number}-{student_initials}-{student_index_no}" - self.firehose_stream_name = firehose_stream_name_pattern.format(account_number=self.config["account_number"], - student_initials=self.config[ - "student_initials"], - student_index_no=self.config[ - "student_index_no"]) - - kinesis_stream_name_pattern = "cryptostock-dev-{account_number}-{student_initials}-{student_index_no}" - self.kinesis_stream_name = kinesis_stream_name_pattern.format(account_number=self.config["account_number"], - student_initials=self.config["student_initials"], - student_index_no=self.config["student_index_no"]) - - def test_kinesis_firehose_exists(self): - deliver_streams = self.firehose_client.list_delivery_streams()["DeliveryStreamNames"] - find_stream = next((stream for stream in deliver_streams if stream == self.firehose_stream_name), None) - - self.assertEqual(find_stream, self.firehose_stream_name) - - def test_kinesis_firehose_source(self): - config = self.firehose_client.describe_delivery_stream(DeliveryStreamName=self.firehose_stream_name) - source = config["DeliveryStreamDescription"]["Source"]["KinesisStreamSourceDescription"]["KinesisStreamARN"] - - kinesis_stream_config = self.kinesis_client.describe_stream(StreamName=self.kinesis_stream_name) - kds_arn = kinesis_stream_config["StreamDescription"]["StreamARN"] - - self.assertEqual(kds_arn, source) - - def test_kinesis_firehose_dest_config(self): - config = self.firehose_client.describe_delivery_stream(DeliveryStreamName=self.firehose_stream_name) - destination = config["DeliveryStreamDescription"]["Destinations"][0] - - expected_pref = 'raw-zone/stockdata/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/' - expected_err_pref = 'raw-zone/stockdata_errors/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/' - - dest_prefix = destination.get("S3DestinationDescription", None).get("Prefix", None) - dest_errors_prefix = destination.get("S3DestinationDescription", None).get("ErrorOutputPrefix", None) - - self.assertEqual(dest_prefix, expected_pref) - self.assertEqual(expected_err_pref, dest_errors_prefix) - - -if __name__ == '__main__': - main() diff --git a/tests/test_infrastructure_s3.py b/tests/test_infrastructure_s3.py deleted file mode 100644 index 541e7b8..0000000 --- a/tests/test_infrastructure_s3.py +++ /dev/null @@ -1,52 +0,0 @@ -from unittest import main, TestCase -import boto3 -import configparser as cp -import warnings -import os - - -class TestInfraS3(TestCase): - - def setUp(self): - self.s3_client = boto3.client('s3') - - # read configuration - # making path universal for running tests from the module / outside - cwd = os.getcwd() - extra_dot = '.' if cwd.endswith('tests') else '' - config_path = extra_dot + "./labs/terraform/terraform.tfvars" - - with open(config_path, 'r') as f: - config_string = '[main]\n' + f.read() - config = cp.ConfigParser() - config.read_string(config_string) - - self.config = {param: ( - config["main"][param].replace('"', '') if isinstance(config["main"][param], str) else config["main"][param] - ) for param in config["main"]} - - warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*") - - bucket_name_pattern = "datalake-dev-{account_number}-{student_initials}-{student_index_no}" - self.bucket_name = bucket_name_pattern.format(account_number=self.config["account_number"], - student_initials=self.config["student_initials"], - student_index_no=self.config["student_index_no"]) - - def test_main_s3_bucket_exists(self): - s3_buckets = self.s3_client.list_buckets()["Buckets"] - - find_bucket = next((bucket["Name"] for bucket in s3_buckets if bucket["Name"] == self.bucket_name), None) - - self.assertEqual(find_bucket, self.bucket_name) - - def test_main_s3_bucket_comfig(self): - tags = self.s3_client.get_bucket_tagging( - Bucket=self.bucket_name - )["TagSet"] - - find_env_tag = next((tag["Value"] for tag in tags if tag.get("Key") == "Environment"), None) - self.assertEqual(find_env_tag.upper(), "DEV") - - -if __name__ == '__main__': - main()