diff --git a/labs/data_generator/generator_new.py b/labs/data_generator/generator_new.py index 5091fd6..daf61ac 100644 --- a/labs/data_generator/generator_new.py +++ b/labs/data_generator/generator_new.py @@ -33,25 +33,16 @@ class KinesisProducer: def produce(self, events, data_stream): """ - A simple wrapper for put record - :param event: - :param key: + EDITED FUNCTION FOR put_records + :param events: :param data_stream: - :return: """ - # adding a new line at the end to produce JSON lines - # (otherwise we would need to pre-process those records in Firehose - # invoking a Lambda to add those new lines).Every message is a dumped json with \n - - #tran_id = event["trans_id"] - # payload = (json.dumps(event) + '\n').encode('utf-8') - attempt = 1 - logger.info(events) + while attempt < self.max_retry_attempt: try: - response = self.client.put_records(Records=events, StreamName=data_stream) + response = self.client.put_records(Records=events, StreamName=data_stream) # IMPLEMENTED put_records FUNCTION logger.info('Succesfully sended') return response @@ -97,7 +88,7 @@ def produce_data(kinesis_data_stream, messages_per_sec, input_file, single_run): with open(input_file) as csv_file: - events = [] + events = [] # SINGLE LIST OF EVENTS FOR put_records FUNCTION reader = csv.DictReader(csv_file, delimiter=',') all_rows = list(reader) @@ -119,16 +110,16 @@ def produce_data(kinesis_data_stream, messages_per_sec, input_file, single_run): time.sleep(time_delta / messages_per_sec) event, key = prepare_event(row) - # event_prepared = str(event) - data_for_event = {'Data':str(event), 'PartitionKey':key} - events.append(data_for_event) + prepared_event = (json.dumps(event) + '\n').encode('utf-8') # PREPARE EVENT - ADD NEW LINE + prepared_event_for_put_records = {'Data':prepared_event, 'PartitionKey':key} # STRUCTURE REQUIRED FOR put_records + events.append(prepared_event_for_put_records) events_cnt+=1 - print(events_cnt) - if events_cnt == 10: - kp.produce(events, kinesis_data_stream) - events = [] + + if events_cnt == 10: # 10 EVENTS PER BATCH + kp.produce(events, kinesis_data_stream) + events = [] events_cnt=0 - time.sleep(1) + time.sleep(2) # FOR EASIER CHECK FILES IN S3 if single_run: break replay_cnt += 1 diff --git a/labs/terraform/terraform.tfstate b/labs/terraform/terraform.tfstate index d153378..6a67cdd 100644 --- a/labs/terraform/terraform.tfstate +++ b/labs/terraform/terraform.tfstate @@ -1,7 +1,7 @@ { "version": 4, "terraform_version": "1.8.1", - "serial": 43, + "serial": 55, "lineage": "a77aaaba-b4f8-6adb-0387-8f0b98d722c2", "outputs": {}, "resources": [ diff --git a/labs/terraform/terraform.tfstate.backup b/labs/terraform/terraform.tfstate.backup index 3e3b299..f3818e2 100644 --- a/labs/terraform/terraform.tfstate.backup +++ b/labs/terraform/terraform.tfstate.backup @@ -1,7 +1,7 @@ { "version": 4, "terraform_version": "1.8.1", - "serial": 37, + "serial": 49, "lineage": "a77aaaba-b4f8-6adb-0387-8f0b98d722c2", "outputs": {}, "resources": [],