{ "cells": [ { "cell_type": "markdown", "id": "alternate-pantyhose", "metadata": {}, "source": [ "# Lab1 Czytanie Kinesis Data Streams \n", "\n", "Przebieg ćwiczenia\n", "* Stwórz Data Stream \n", "* wygeneruj testowe dane do streama\n", "* odczytaj dane ze streama (ShardIterator)\n", "* zwróć uwagę na iteracje po shardach i iteratorach (per shard)\n", "* porównaj przeczytane dane z danymi wygenerowanymi (czytamy dwie iteracje - pierwsze 10 rekordów TRIM_HORIZON)\n", "* sprawdź jakie inne opcje ustawienia punktu w shardzie są" ] }, { "cell_type": "code", "execution_count": 10, "id": "gross-series", "metadata": {}, "outputs": [], "source": [ "import boto3\n", "from pprint import pprint\n", "\n", "kinesis_client = boto3.client('kinesis')" ] }, { "cell_type": "code", "execution_count": 11, "id": "excited-latex", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['cryptostock-dev-100603781557-jk-12345']" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "kinesis_client.list_streams()[\"StreamNames\"]" ] }, { "cell_type": "code", "execution_count": 12, "id": "excellent-address", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'cryptostock-dev-100603781557-jk-12345'" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "STREAM_NAME = kinesis_client.list_streams()[\"StreamNames\"][0]\n", "STREAM_NAME" ] }, { "cell_type": "code", "execution_count": 13, "id": "attended-combat", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[{'HashKeyRange': {'EndingHashKey': '340282366920938463463374607431768211455',\n", " 'StartingHashKey': '0'},\n", " 'SequenceNumberRange': {'StartingSequenceNumber': '49617445977150094507622122574044516561004852020651229186'},\n", " 'ShardId': 'shardId-000000000000'}]\n" ] } ], "source": [ "response = kinesis_client.describe_stream(StreamName=STREAM_NAME) \n", "pprint(response[\"StreamDescription\"][\"Shards\"])" ] }, { "cell_type": "code", "execution_count": 14, "id": "together-finance", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[{'shard_id': 'shardId-000000000000',\n", " 'shard_iterator': 'AAAAAAAAAAEfLA4f5f+lMhjNfHXIXsKxQeP3dg79sVKKRiT+843gRXwSQsYRXeMIS4KwdRUjPdChkE2ZZGYSG3DeghHZi41DXOE0pNSdFHnqkePkBVIX2cN/9rbedZTgX/WXfNaL+sMUfdbYV6f9iQEtTtRAYN3bXfk5jUwIBvcgB1mQDRzdT1Or150vbf3LSlLtC7XlkK7HNZoGM1t577jseZTyvJ4+yeBOV73DQnSFnL/EPQvVdm+lidZtaNe39NMak4bXx5AWmhwblwLPmXg/l2PMDx7Z'}]" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "shard_ids = []\n", "stream_name = None \n", "if response and 'StreamDescription' in response:\n", " stream_name= response['StreamDescription']['StreamName'] \n", " \n", " # reading all shards (getting shard iterators)\n", " for shard_id in response['StreamDescription']['Shards']:\n", " shard_id = shard_id['ShardId'] \n", " shard_iterator = kinesis_client.get_shard_iterator(StreamName=stream_name, ShardId = shard_id, ShardIteratorType=\"TRIM_HORIZON\")\n", " \n", " si = shard_iterator[\"ShardIterator\"]\n", " shard_ids.append({'shard_id' : shard_id ,'shard_iterator' : si })\n", " \n", "shard_ids" ] }, { "cell_type": "code", "execution_count": 15, "id": "vital-bridges", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[{'ApproximateArrivalTimestamp': datetime.datetime(2021, 4, 18, 14, 18, 25, 191000, tzinfo=tzlocal()),\n", " 'Data': b'{\"transaction_ts\": 1601510403, \"symbol\": \"ETH_USD\", \"price\": 360'\n", " b'.03, \"amount\": 0.646, \"dollar_amount\": 232.57938, \"type\": \"buy\",'\n", " b' \"trans_id\": 124289044}\\n',\n", " 'PartitionKey': 'ETH_USD',\n", " 'SequenceNumber': '49617445977150094507622122578777461144796130256147709954'},\n", " {'ApproximateArrivalTimestamp': datetime.datetime(2021, 4, 18, 14, 18, 25, 310000, tzinfo=tzlocal()),\n", " 'Data': b'{\"transaction_ts\": 1601510403, \"symbol\": \"BTC_USD\", \"price\": 107'\n", " b'80.83, \"amount\": 0.035, \"dollar_amount\": 377.32905, \"type\": \"buy'\n", " b'\", \"trans_id\": 124289043}\\n',\n", " 'PartitionKey': 'BTC_USD',\n", " 'SequenceNumber': '49617445977150094507622122578778670070615744885322416130'},\n", " {'ApproximateArrivalTimestamp': datetime.datetime(2021, 4, 18, 14, 18, 25, 428000, tzinfo=tzlocal()),\n", " 'Data': b'{\"transaction_ts\": 1601510404, \"symbol\": \"ETH_USD\", \"price\": 360'\n", " b'.12, \"amount\": 0.523, \"dollar_amount\": 188.34276, \"type\": \"buy\",'\n", " b' \"trans_id\": 124289045}\\n',\n", " 'PartitionKey': 'ETH_USD',\n", " 'SequenceNumber': '49617445977150094507622122578779878996435359514497122306'},\n", " {'ApproximateArrivalTimestamp': datetime.datetime(2021, 4, 18, 14, 18, 25, 545000, tzinfo=tzlocal()),\n", " 'Data': b'{\"transaction_ts\": 1601510405, \"symbol\": \"BTC_USD\", \"price\": 107'\n", " b'84.42, \"amount\": 0.25635676, \"dollar_amount\": 2764.65897, \"type\"'\n", " b': \"buy\", \"trans_id\": 124289050}\\n',\n", " 'PartitionKey': 'BTC_USD',\n", " 'SequenceNumber': '49617445977150094507622122578781087922254974143671828482'},\n", " {'ApproximateArrivalTimestamp': datetime.datetime(2021, 4, 18, 14, 18, 25, 663000, tzinfo=tzlocal()),\n", " 'Data': b'{\"transaction_ts\": 1601510407, \"symbol\": \"BTC_USD\", \"price\": 107'\n", " b'84.42, \"amount\": 0.23877038, \"dollar_amount\": 2575.000061, \"type'\n", " b'\": \"buy\", \"trans_id\": 124289051}\\n',\n", " 'PartitionKey': 'BTC_USD',\n", " 'SequenceNumber': '49617445977150094507622122578782296848074588772846534658'}]\n" ] } ], "source": [ "limit = 5\n", "response_get_rec = kinesis_client.get_records(ShardIterator = si , Limit = limit)\n", "next_shard_iterator = response_get_rec['NextShardIterator']\n", "pprint(response_get_rec[\"Records\"])" ] }, { "cell_type": "code", "execution_count": 16, "id": "alone-martial", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "AAAAAAAAAAGUetfZYhJAUbRmLdnxgHF2gXHQ+Yt8063YzfurEZ+Vdauri9LJ13JLrPqLIrBxeHRJ1GEBctxNJ4jYeB4Um/JNu4+2L5Jfa1Apl9s9y6f/5UMZlqIAFGvUPmW53Gj6MyauM9r7EWNBUBZCvrFQkHvC9fQwNYP3eyYm1xp4K9fcjBX90qUdnGmFU69bq+3BF5I7PXgPHitcwzJev6PqPLVny2SmhSHtnRF/Rogj00Xv+DtKo1/SBdVid3tyQ0e9tm4XrgttPfPIhNsJB3j57lpM\n", "[{'ApproximateArrivalTimestamp': datetime.datetime(2021, 4, 18, 14, 18, 25, 812000, tzinfo=tzlocal()),\n", " 'Data': b'{\"transaction_ts\": 1601510409, \"symbol\": \"BTC_USD\", \"price\": 107'\n", " b'84.42, \"amount\": 1.01303547, \"dollar_amount\": 10924.99998, \"type'\n", " b'\": \"buy\", \"trans_id\": 124289054}\\n',\n", " 'PartitionKey': 'BTC_USD',\n", " 'SequenceNumber': '49617445977150094507622122578783505773894203402021240834'},\n", " {'ApproximateArrivalTimestamp': datetime.datetime(2021, 4, 18, 14, 18, 25, 930000, tzinfo=tzlocal()),\n", " 'Data': b'{\"transaction_ts\": 1601510410, \"symbol\": \"BTC_USD\", \"price\": 107'\n", " b'84.42, \"amount\": 0.26135077, \"dollar_amount\": 2818.516471, \"type'\n", " b'\": \"buy\", \"trans_id\": 124289055}\\n',\n", " 'PartitionKey': 'BTC_USD',\n", " 'SequenceNumber': '49617445977150094507622122578784714699713818031195947010'},\n", " {'ApproximateArrivalTimestamp': datetime.datetime(2021, 4, 18, 14, 18, 26, 48000, tzinfo=tzlocal()),\n", " 'Data': b'{\"transaction_ts\": 1601510413, \"symbol\": \"ETH_USD\", \"price\": 360'\n", " b'.39, \"amount\": 5.55416701, \"dollar_amount\": 2001.666249, \"type\":'\n", " b' \"buy\", \"trans_id\": 124289059}\\n',\n", " 'PartitionKey': 'ETH_USD',\n", " 'SequenceNumber': '49617445977150094507622122578785923625533432660370653186'},\n", " {'ApproximateArrivalTimestamp': datetime.datetime(2021, 4, 18, 14, 18, 26, 165000, tzinfo=tzlocal()),\n", " 'Data': b'{\"transaction_ts\": 1601510414, \"symbol\": \"ETH_USD\", \"price\": 360'\n", " b'.6, \"amount\": 13.855, \"dollar_amount\": 4996.113, \"type\": \"buy\", '\n", " b'\"trans_id\": 124289071}\\n',\n", " 'PartitionKey': 'ETH_USD',\n", " 'SequenceNumber': '49617445977150094507622122578787132551353047358264836098'},\n", " {'ApproximateArrivalTimestamp': datetime.datetime(2021, 4, 18, 14, 18, 26, 282000, tzinfo=tzlocal()),\n", " 'Data': b'{\"transaction_ts\": 1601510415, \"symbol\": \"ETH_USD\", \"price\": 360'\n", " b'.24, \"amount\": 6.32869733, \"dollar_amount\": 2279.849926, \"type\":'\n", " b' \"sell\", \"trans_id\": 124289072}\\n',\n", " 'PartitionKey': 'ETH_USD',\n", " 'SequenceNumber': '49617445977150094507622122578788341477172661987439542274'}]\n" ] } ], "source": [ "print(next_shard_iterator)\n", "response_get_rec = kinesis_client.get_records(ShardIterator = next_shard_iterator , Limit = limit)\n", "pprint(response_get_rec[\"Records\"])" ] }, { "cell_type": "code", "execution_count": null, "id": "analyzed-applicant", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.8" } }, "nbformat": 4, "nbformat_minor": 5 }