Added comments to edited parts
This commit is contained in:
parent
272fff7769
commit
bc7de6365e
@ -33,25 +33,16 @@ class KinesisProducer:
|
|||||||
|
|
||||||
def produce(self, events, data_stream):
|
def produce(self, events, data_stream):
|
||||||
"""
|
"""
|
||||||
A simple wrapper for put record
|
EDITED FUNCTION FOR put_records
|
||||||
:param event:
|
:param events:
|
||||||
:param key:
|
|
||||||
:param data_stream:
|
:param data_stream:
|
||||||
:return:
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# adding a new line at the end to produce JSON lines
|
|
||||||
# (otherwise we would need to pre-process those records in Firehose
|
|
||||||
# invoking a Lambda to add those new lines).Every message is a dumped json with \n
|
|
||||||
|
|
||||||
#tran_id = event["trans_id"]
|
|
||||||
# payload = (json.dumps(event) + '\n').encode('utf-8')
|
|
||||||
|
|
||||||
attempt = 1
|
attempt = 1
|
||||||
logger.info(events)
|
|
||||||
while attempt < self.max_retry_attempt:
|
while attempt < self.max_retry_attempt:
|
||||||
try:
|
try:
|
||||||
response = self.client.put_records(Records=events, StreamName=data_stream)
|
response = self.client.put_records(Records=events, StreamName=data_stream) # IMPLEMENTED put_records FUNCTION
|
||||||
logger.info('Succesfully sended')
|
logger.info('Succesfully sended')
|
||||||
|
|
||||||
return response
|
return response
|
||||||
@ -97,7 +88,7 @@ def produce_data(kinesis_data_stream, messages_per_sec, input_file, single_run):
|
|||||||
|
|
||||||
with open(input_file) as csv_file:
|
with open(input_file) as csv_file:
|
||||||
|
|
||||||
events = []
|
events = [] # SINGLE LIST OF EVENTS FOR put_records FUNCTION
|
||||||
|
|
||||||
reader = csv.DictReader(csv_file, delimiter=',')
|
reader = csv.DictReader(csv_file, delimiter=',')
|
||||||
all_rows = list(reader)
|
all_rows = list(reader)
|
||||||
@ -119,16 +110,16 @@ def produce_data(kinesis_data_stream, messages_per_sec, input_file, single_run):
|
|||||||
time.sleep(time_delta / messages_per_sec)
|
time.sleep(time_delta / messages_per_sec)
|
||||||
|
|
||||||
event, key = prepare_event(row)
|
event, key = prepare_event(row)
|
||||||
# event_prepared = str(event)
|
prepared_event = (json.dumps(event) + '\n').encode('utf-8') # PREPARE EVENT - ADD NEW LINE
|
||||||
data_for_event = {'Data':str(event), 'PartitionKey':key}
|
prepared_event_for_put_records = {'Data':prepared_event, 'PartitionKey':key} # STRUCTURE REQUIRED FOR put_records
|
||||||
events.append(data_for_event)
|
events.append(prepared_event_for_put_records)
|
||||||
events_cnt+=1
|
events_cnt+=1
|
||||||
print(events_cnt)
|
|
||||||
if events_cnt == 10:
|
if events_cnt == 10: # 10 EVENTS PER BATCH
|
||||||
kp.produce(events, kinesis_data_stream)
|
kp.produce(events, kinesis_data_stream)
|
||||||
events = []
|
events = []
|
||||||
events_cnt=0
|
events_cnt=0
|
||||||
time.sleep(1)
|
time.sleep(2) # FOR EASIER CHECK FILES IN S3
|
||||||
if single_run:
|
if single_run:
|
||||||
break
|
break
|
||||||
replay_cnt += 1
|
replay_cnt += 1
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
{
|
{
|
||||||
"version": 4,
|
"version": 4,
|
||||||
"terraform_version": "1.8.1",
|
"terraform_version": "1.8.1",
|
||||||
"serial": 43,
|
"serial": 55,
|
||||||
"lineage": "a77aaaba-b4f8-6adb-0387-8f0b98d722c2",
|
"lineage": "a77aaaba-b4f8-6adb-0387-8f0b98d722c2",
|
||||||
"outputs": {},
|
"outputs": {},
|
||||||
"resources": [
|
"resources": [
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
{
|
{
|
||||||
"version": 4,
|
"version": 4,
|
||||||
"terraform_version": "1.8.1",
|
"terraform_version": "1.8.1",
|
||||||
"serial": 37,
|
"serial": 49,
|
||||||
"lineage": "a77aaaba-b4f8-6adb-0387-8f0b98d722c2",
|
"lineage": "a77aaaba-b4f8-6adb-0387-8f0b98d722c2",
|
||||||
"outputs": {},
|
"outputs": {},
|
||||||
"resources": [],
|
"resources": [],
|
||||||
|
Loading…
Reference in New Issue
Block a user