fixes in the generator

This commit is contained in:
Jakub Kasprzak 2024-04-24 12:55:37 +02:00
parent 5bce0dbda4
commit 25f4d03de0

View File

@ -45,7 +45,7 @@ class KinesisProducer:
# invoking a Lambda to add those new lines).Every message is a dumped json with \n # invoking a Lambda to add those new lines).Every message is a dumped json with \n
tran_id = event["trans_id"] tran_id = event["trans_id"]
payload = (json.dumps(event)+'\n').encode('utf-8') payload = (json.dumps(event) + '\n').encode('utf-8')
attempt = 1 attempt = 1
while attempt < self.max_retry_attempt: while attempt < self.max_retry_attempt:
@ -55,7 +55,8 @@ class KinesisProducer:
Data=payload, Data=payload,
PartitionKey=key PartitionKey=key
) )
logger.info('Msg with trans_id={} sent to shard {} seq no {}'.format(tran_id, response["ShardId"], response["SequenceNumber"])) logger.info('Msg with trans_id={} sent to shard {} seq no {}'.format(tran_id, response["ShardId"],
response["SequenceNumber"]))
return response return response
except Exception as e: except Exception as e:
@ -88,7 +89,7 @@ def prepare_event(event):
return msg_formatted, msg_key return msg_formatted, msg_key
def produce_data(kinesis_data_stream, messages_per_sec, input_file): def produce_data(kinesis_data_stream, messages_per_sec, input_file, singel_run):
""" """
Main method for producing Main method for producing
:param kinesis_data_stream: param from cmdline name of KDS :param kinesis_data_stream: param from cmdline name of KDS
@ -118,6 +119,8 @@ def produce_data(kinesis_data_stream, messages_per_sec, input_file):
event, key = prepare_event(row) event, key = prepare_event(row)
kp.produce(event, key, kinesis_data_stream) kp.produce(event, key, kinesis_data_stream)
if singel_run:
break
replay_cnt += 1 replay_cnt += 1
@ -128,16 +131,20 @@ if __name__ == "__main__":
parser.add_argument('-k', '--kinesis_ds', dest='kinesis_ds', required=True) parser.add_argument('-k', '--kinesis_ds', dest='kinesis_ds', required=True)
parser.add_argument('-i', '--input_file', dest='input_file', required=False) parser.add_argument('-i', '--input_file', dest='input_file', required=False)
parser.add_argument('-s', '--messages_per_sec', dest='mps', type=int, default=-1, required=False) parser.add_argument('-s', '--messages_per_sec', dest='mps', type=int, default=-1, required=False)
parser.add_argument('-r', '--single-run', dest='singel_run', action='store_true', required=False, default=False)
args, unknown = parser.parse_known_args() args, unknown = parser.parse_known_args()
config = configparser.ConfigParser() config = configparser.ConfigParser()
kinesis_data_stream = args.kinesis_ds kinesis_data_stream = args.kinesis_ds
messages_per_sec = int(args.mps) messages_per_sec = int(args.mps)
singel_run = args.singel_run if hasattr(args, 'singel_run') else False
if args.input_file: if args.input_file:
input_file = args.input_file input_file = args.input_file
else: else:
main_path = os.path.abspath(os.path.dirname(__file__)) main_path = os.path.abspath(os.path.dirname(__file__))
input_file = os.path.join(main_path, DEFAULT_DATA_FILE) input_file = os.path.join(main_path, DEFAULT_DATA_FILE)
produce_data(kinesis_data_stream, messages_per_sec, input_file) produce_data(kinesis_data_stream, messages_per_sec, input_file, singel_run)