chmury_aronb/jupyter/UAM_4_Kinesis.ipynb

35 KiB
Raw Blame History

4 Kinesis Data Streams & Firehose

Przebieg ćwiczenia

  • Stwórz Data Stream z wykorzystaniem boto3 / AWS console (GUI)
  • wygeneruj testowe dane do streama
  • odczytaj dane ze streama (ShardIterator)
  • Stwórz Kinesis Firehose Stream i podepnij pod niego utworzony wcześniej Data Stream jako source. Skonfiguruj buffor size = 1Mb buffor time = 60s
  • wygeneruj 10000 wiadomości i sprawdź czy dane ładowane są do S3

Pamiętaj aby po skończonych ćwiczeniach usunąć wszystkie obiekty

Uwaga !!! poniższy skrypt tworzy obiekty w regionie HongKong ! Na końcu skryptu jest funkcja tear_down_all() która usuwa testowy bucket, bazę Glue i Kinesis Data Streams czyli wszystkie obiekty które były stworzone w kodzie.

import boto3

REGION = "ap-east-1"

session_kwargs = {

    "aws_access_key_id":"",
    "aws_secret_access_key":"",
    "aws_session_token":"",
    "region_name": REGION
}

session = boto3.Session(**session_kwargs)
kinesis_client = session.client("kinesis")
STREAM_NAME = 'uam-test'

kinesis_client.create_stream(
    StreamName=STREAM_NAME,
    ShardCount=1
)
{'ResponseMetadata': {'RequestId': 'dd209b3a-57d1-b862-8a48-bc194546845a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'dd209b3a-57d1-b862-8a48-bc194546845a',
   'x-amz-id-2': 'k78aY4x6wCDEXo6kL76yEG64tV2ct9TQxM76Bfy345CJgSaVdfDJsjlr1jzNnpRxVk2qc+G9L42xZbmYrx/mivAFMWoek7Si',
   'date': 'Sat, 20 Jun 2020 15:01:55 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0'},
  'RetryAttempts': 0}}
kinesis_client.list_streams()["StreamNames"]
['uam-test']
response = kinesis_client.put_record(
    StreamName=STREAM_NAME,
    Data=b'{"col1" : "this is my test json data"}',
    PartitionKey='1' 
)
print(response)
{'ShardId': 'shardId-000000000000', 'SequenceNumber': '49608139280302835973846909376978574930250627257427034114', 'ResponseMetadata': {'RequestId': 'c4858035-fc96-5621-93ed-a71b8bbdb95f', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'c4858035-fc96-5621-93ed-a71b8bbdb95f', 'x-amz-id-2': 'Z8hEKLIaCne7YPLWuDMTiFcAt7s1HJvfENE/2Oj7ARnuzjOsVNj+QyHpuIbM0tMucp7YZxLq2M65Xy/bgyyYeI2T5eJviuYA', 'date': 'Sat, 20 Jun 2020 15:02:06 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '110'}, 'RetryAttempts': 0}}
response = kinesis_client.describe_stream(StreamName=STREAM_NAME)   
print(response)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-3-f9ad17c57d0b> in <module>
----> 1 response = kinesis_client.describe_stream(StreamName=STREAM_NAME)
      2 print(response)

NameError: name 'kinesis_client' is not defined
shard_ids = []
stream_name = None 
if response and 'StreamDescription' in response:
    stream_name= response['StreamDescription']['StreamName'] 
    
    for shard_id in response['StreamDescription']['Shards']:
        shard_id = shard_id['ShardId']
        shard_iterator = kinesis_client.get_shard_iterator(StreamName=stream_name, ShardId = shard_id, ShardIteratorType="TRIM_HORIZON")
        shard_ids.append({'shard_id' : shard_id ,'shard_iterator' : shard_iterator['ShardIterator'] })
            
shard_ids
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-4-bfebdd8c4299> in <module>
      1 shard_ids = []
      2 stream_name = None
----> 3 if response and 'StreamDescription' in response:
      4     stream_name= response['StreamDescription']['StreamName']
      5 

NameError: name 'response' is not defined
sh = shard_iterator["ShardIterator"]
sh
'AAAAAAAAAAHfJPwIwOEqHzQIjn90snM/nPs4zZARsJlXPyGlUTbvU+T5cdGvXzb54qetks+heTq/ttfFlehkcLGr27CpkPNDn2A9NHYc1w+3VjLIBmNKTLJlHnCjjFCwgqksrs1mUQVli12hZjy6wZXhGualZUI//H2BxRwKqH/Pf2Zk9S6KSbeJFDm0boV2COPqB3wZ21axe8lWXJVJAfjMgPacIU6K'
tries = 0
limit = 100
result = []
while tries < 10:
    tries += 1
    response_get_rec = kinesis_client.get_records(ShardIterator = sh , Limit = limit)
    shard_iterator = response_get_rec['NextShardIterator']
    break
          
response_get_rec
{'Records': [{'SequenceNumber': '49608139280302835973846909376978574930250627257427034114',
   'ApproximateArrivalTimestamp': datetime.datetime(2020, 6, 20, 17, 2, 6, 976000, tzinfo=tzlocal()),
   'Data': b'{"col1" : "this is my test json data"}',
   'PartitionKey': '1'}],
 'NextShardIterator': 'AAAAAAAAAAEgmwgBEgaauGNF/YzN5S+FcuWOWZZMledH4BR7CLiGD9iYYL4z+eLK7NaTQTHTAlSFEYm6N6vtjdFcTl8ibGJGKnuQthZiMgCfolA1FAAoWmLHvI0slHvZx1oWLfdApD8robDWj3zX/2d4zOzj1P9xz/+Xo8/YFdCXd0ENfUNxI7MhzZUGamw09rXa8Y0sDunFpkLy7msr5vjURGjr+xrf',
 'MillisBehindLatest': 0,
 'ResponseMetadata': {'RequestId': 'e580a084-b06e-ca24-b2e8-87b6c745255a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e580a084-b06e-ca24-b2e8-87b6c745255a',
   'x-amz-id-2': '/zy9vbX4wzEuqd7TZ959MQcL0OzB9kPQ3TSfrtEIIpI1IvubKb7OgnxYhxiWIZpvPWbtzXO5x0r+eO6+A8wR+bn0v8khnlpL',
   'date': 'Sat, 20 Jun 2020 15:02:10 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '489'},
  'RetryAttempts': 0}}
[x["SequenceNumber"] for x in response_get_rec["Records"] ]
['49608122819498384072835111163875039160761192199284064258']
response_get_rec["Records"]
[{'SequenceNumber': '49608122819498384072835111163875039160761192199284064258',
  'ApproximateArrivalTimestamp': datetime.datetime(2020, 6, 20, 4, 13, 11, 39000, tzinfo=tzlocal()),
  'Data': b'{"col1" : "this is my test json data"}',
  'PartitionKey': '1'}]

Utworz recznie Kinesis Firehose dla tego Stream'a dopiero pozniej wygeneruj dane testowe ponizsza petla

for i in range(0,10000):
    response = kinesis_client.put_record(
    StreamName=STREAM_NAME,
    Data=b'{"col1" : "this is json data"}',
    PartitionKey='1' 
    )
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-13-0e9e1b83eba0> in <module>
      1 for i in range(0,10000):
----> 2     response = kinesis_client.put_record(
      3     StreamName=STREAM_NAME,
      4     Data=b'{"col1" : "this is json data"}',
      5     PartitionKey='1'

~/opt/anaconda3/envs/uam-d/lib/python3.8/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
    314                     "%s() only accepts keyword arguments." % py_operation_name)
    315             # The "self" in this scope is referring to the BaseClient.
--> 316             return self._make_api_call(operation_name, kwargs)
    317 
    318         _api_call.__name__ = str(py_operation_name)

~/opt/anaconda3/envs/uam-d/lib/python3.8/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
    619             http, parsed_response = event_response
    620         else:
--> 621             http, parsed_response = self._make_request(
    622                 operation_model, request_dict, request_context)
    623 

~/opt/anaconda3/envs/uam-d/lib/python3.8/site-packages/botocore/client.py in _make_request(self, operation_model, request_dict, request_context)
    639     def _make_request(self, operation_model, request_dict, request_context):
    640         try:
--> 641             return self._endpoint.make_request(operation_model, request_dict)
    642         except Exception as e:
    643             self.meta.events.emit(

~/opt/anaconda3/envs/uam-d/lib/python3.8/site-packages/botocore/endpoint.py in make_request(self, operation_model, request_dict)
    100         logger.debug("Making request for %s with params: %s",
    101                      operation_model, request_dict)
--> 102         return self._send_request(request_dict, operation_model)
    103 
    104     def create_request(self, params, operation_model=None):

~/opt/anaconda3/envs/uam-d/lib/python3.8/site-packages/botocore/endpoint.py in _send_request(self, request_dict, operation_model)
    132         request = self.create_request(request_dict, operation_model)
    133         context = request_dict['context']
--> 134         success_response, exception = self._get_response(
    135             request, operation_model, context)
    136         while self._needs_retry(attempts, operation_model, request_dict,

~/opt/anaconda3/envs/uam-d/lib/python3.8/site-packages/botocore/endpoint.py in _get_response(self, request, operation_model, context)
    164         # If an exception occurs then the success_response is None.
    165         # If no exception occurs then exception is None.
--> 166         success_response, exception = self._do_get_response(
    167             request, operation_model)
    168         kwargs_to_emit = {

~/opt/anaconda3/envs/uam-d/lib/python3.8/site-packages/botocore/endpoint.py in _do_get_response(self, request, operation_model)
    198             http_response = first_non_none_response(responses)
    199             if http_response is None:
--> 200                 http_response = self._send(request)
    201         except HTTPClientError as e:
    202             return (None, e)

~/opt/anaconda3/envs/uam-d/lib/python3.8/site-packages/botocore/endpoint.py in _send(self, request)
    267 
    268     def _send(self, request):
--> 269         return self.http_session.send(request)
    270 
    271 

~/opt/anaconda3/envs/uam-d/lib/python3.8/site-packages/botocore/httpsession.py in send(self, request)
    252 
    253             request_target = self._get_request_target(request.url, proxy_url)
--> 254             urllib_response = conn.urlopen(
    255                 method=request.method,
    256                 url=request_target,

~/opt/anaconda3/envs/uam-d/lib/python3.8/site-packages/urllib3/connectionpool.py in urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
    668 
    669             # Make the request on the httplib connection object.
--> 670             httplib_response = self._make_request(
    671                 conn,
    672                 method,

~/opt/anaconda3/envs/uam-d/lib/python3.8/site-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    424                     # Python 3 (including for exceptions like SystemExit).
    425                     # Otherwise it looks like a bug in the code.
--> 426                     six.raise_from(e, None)
    427         except (SocketTimeout, BaseSSLError, SocketError) as e:
    428             self._raise_timeout(err=e, url=url, timeout_value=read_timeout)

~/opt/anaconda3/envs/uam-d/lib/python3.8/site-packages/urllib3/packages/six.py in raise_from(value, from_value)

~/opt/anaconda3/envs/uam-d/lib/python3.8/site-packages/urllib3/connectionpool.py in _make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
    419                 # Python 3
    420                 try:
--> 421                     httplib_response = conn.getresponse()
    422                 except BaseException as e:
    423                     # Remove the TypeError from the exception chain in

~/opt/anaconda3/envs/uam-d/lib/python3.8/http/client.py in getresponse(self)
   1330         try:
   1331             try:
-> 1332                 response.begin()
   1333             except ConnectionError:
   1334                 self.close()

~/opt/anaconda3/envs/uam-d/lib/python3.8/http/client.py in begin(self)
    301         # read until we get a non-100 response
    302         while True:
--> 303             version, status, reason = self._read_status()
    304             if status != CONTINUE:
    305                 break

~/opt/anaconda3/envs/uam-d/lib/python3.8/http/client.py in _read_status(self)
    262 
    263     def _read_status(self):
--> 264         line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
    265         if len(line) > _MAXLINE:
    266             raise LineTooLong("status line")

~/opt/anaconda3/envs/uam-d/lib/python3.8/socket.py in readinto(self, b)
    667         while True:
    668             try:
--> 669                 return self._sock.recv_into(b)
    670             except timeout:
    671                 self._timeout_occurred = True

~/opt/anaconda3/envs/uam-d/lib/python3.8/ssl.py in recv_into(self, buffer, nbytes, flags)
   1239                   "non-zero flags not allowed in calls to recv_into() on %s" %
   1240                   self.__class__)
-> 1241             return self.read(nbytes, buffer)
   1242         else:
   1243             return super().recv_into(buffer, nbytes, flags)

~/opt/anaconda3/envs/uam-d/lib/python3.8/ssl.py in read(self, len, buffer)
   1097         try:
   1098             if buffer is not None:
-> 1099                 return self._sslobj.read(len, buffer)
   1100             else:
   1101                 return self._sslobj.read(len)

KeyboardInterrupt: 
BUCKET_NAME = 'datalake-uam'
GLUE_DB = 'uam'
KINESIS_STREAM = 'uam-test'
KINESIS_FIREHOSE = 'uam-test-fh'

s3_client = session.client("s3")
glue_client = session.client("glue")

def tear_down_all():
    
    s3 = boto3.resource('s3',**session_kwargs)
    bucket = s3.Bucket(BUCKET_NAME)
    bucket.objects.delete()
    
    s3_client.delete_bucket(
        Bucket = BUCKET_NAME
    )
    
    glue_client.delete_database(
    Name=GLUE_DB
    )
    
    kinesis_client.delete_stream(StreamName=STREAM_NAME)
tear_down_all()