From 25f4d03de09410d5dcd799cb8f6b46409ad38301 Mon Sep 17 00:00:00 2001 From: Jakub Kasprzak Date: Wed, 24 Apr 2024 12:55:37 +0200 Subject: [PATCH] fixes in the generator --- labs/data_generator/generator.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/labs/data_generator/generator.py b/labs/data_generator/generator.py index d683cb2..c961686 100644 --- a/labs/data_generator/generator.py +++ b/labs/data_generator/generator.py @@ -45,7 +45,7 @@ class KinesisProducer: # invoking a Lambda to add those new lines).Every message is a dumped json with \n tran_id = event["trans_id"] - payload = (json.dumps(event)+'\n').encode('utf-8') + payload = (json.dumps(event) + '\n').encode('utf-8') attempt = 1 while attempt < self.max_retry_attempt: @@ -55,7 +55,8 @@ class KinesisProducer: Data=payload, PartitionKey=key ) - logger.info('Msg with trans_id={} sent to shard {} seq no {}'.format(tran_id, response["ShardId"], response["SequenceNumber"])) + logger.info('Msg with trans_id={} sent to shard {} seq no {}'.format(tran_id, response["ShardId"], + response["SequenceNumber"])) return response except Exception as e: @@ -88,7 +89,7 @@ def prepare_event(event): return msg_formatted, msg_key -def produce_data(kinesis_data_stream, messages_per_sec, input_file): +def produce_data(kinesis_data_stream, messages_per_sec, input_file, singel_run): """ Main method for producing :param kinesis_data_stream: param from cmdline name of KDS @@ -118,6 +119,8 @@ def produce_data(kinesis_data_stream, messages_per_sec, input_file): event, key = prepare_event(row) kp.produce(event, key, kinesis_data_stream) + if singel_run: + break replay_cnt += 1 @@ -128,16 +131,20 @@ if __name__ == "__main__": parser.add_argument('-k', '--kinesis_ds', dest='kinesis_ds', required=True) parser.add_argument('-i', '--input_file', dest='input_file', required=False) parser.add_argument('-s', '--messages_per_sec', dest='mps', type=int, default=-1, required=False) + parser.add_argument('-r', '--single-run', dest='singel_run', action='store_true', required=False, default=False) args, unknown = parser.parse_known_args() config = configparser.ConfigParser() kinesis_data_stream = args.kinesis_ds messages_per_sec = int(args.mps) + + singel_run = args.singel_run if hasattr(args, 'singel_run') else False + if args.input_file: input_file = args.input_file else: main_path = os.path.abspath(os.path.dirname(__file__)) input_file = os.path.join(main_path, DEFAULT_DATA_FILE) - produce_data(kinesis_data_stream, messages_per_sec, input_file) + produce_data(kinesis_data_stream, messages_per_sec, input_file, singel_run)