chmury/jupyter/UAM_1_avro.ipynb
2024-05-29 11:02:19 +00:00

25 KiB

1 Format danych AVRO

Ćwiczenie ma na celu zademonstrowanie schematów danych AVRO, typów złożonych (mapy, listy, struktury zagnieżdżone) oraz wstęp do Glue / Athena

Przebieg ćwiczenia

  • skonfiguruj środowisko uruchomieniowe Python (sugerowana Anaconda z Python 3)
  • zainstaluj wszystkie wymagane biblioteki
% conda create -n myenv python=3.8 % conda activate uam-datalake % pip install -r ./datalake-uam/jupyter/requirements.txt
  • zaloguj się do konsoli AWS i stwórz Bucket testowy oraz bazę dancyh w Glue. Uzupełnij poniższy skrypt o te dane
  • wygeneruj dane testowe w wybranym schemacie AVRO
  • zapisz dane do plików na S3 w folderach s3://EventName/namespace=xxx/year=YYYY/month=MM/day=DD/version=VVV
  • zarejestruj tabele w Glue z wykorzystaniem BOTO3 / crawler (poprzez konsole AWS GUI - przeglądarkę)
  • skonfiguruj domyślną WorkGroup w Athena (PRIMARY) - konieczne wskazanie miejsce docelowego dla danych z zapytań (S3 location Athena) https://docs.aws.amazon.com/athena/latest/ug/getting-started.html
  • sprawdź definicję tabeli i upewnij się że są zarejestrowane partycje (użyj polecenia MSCK REPAIR w Athena (LOAD PARTITIONS)
  • sprawdź ile danych jest w tabeli (select count(*) from table) - Data Scanned in bytes
  • odpytaj tabele z wykorzystaniem predykatu day=1 (partition elimination) - zweryfikuj ilość danych przeskanowanych (do porównania z ćwiczeniem 2 - parquet)
import boto3

REGION = "us-east-1"


session_kwargs = {

    "aws_access_key_id":"",
    "aws_secret_access_key":"",
    "aws_session_token":"",
    "region_name": REGION
}
 
session = boto3.Session(**session_kwargs)
import json

from faker import Faker
from botocore.exceptions import ClientError
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
import time 
import io
import datetime
from avro.schema import Parse


fake = Faker()
fake.seed_instance(4321)
 
S3_BUCKET = "datalake-dev-920628590621-us-east-1"

TEST_DB = 'datalake_dev_jk'
TEST_TABLE_NAME = 'avro_uam_test'
EVENT_NAME = "UamTestEvent"


s3_client = session.client("s3")
glue_client = session.client("glue")


def tear_down_test_db(database=TEST_DB):
    db_names = [x["Name"] for x in glue_client.get_databases()["DatabaseList"] ]
    if database in db_names:
        glue_client.delete_database(Name=database)
        print("{} deleted".format(database))

    response_create_db = glue_client.create_database(DatabaseInput={'Name': database })       
    print("%s db recreated" % database)

def tear_down_test_table(database=TEST_DB, table_name=TEST_TABLE_NAME):
    tbl_list = [x["Name"] for x in glue_client.get_tables(DatabaseName=database)["TableList"]]
    if table_name in tbl_list:
        glue_client.delete_table(DatabaseName=database,Name=table_name)
        print("test table {} deleted".format(table_name))
    else:
        print("tbl %s not found" % table_name)
        
def tear_down_s3(bucket=S3_BUCKET,prefix=EVENT_NAME):
    s3 = boto3.resource('s3',**session_kwargs)
    bucket = s3.Bucket(bucket)
    bucket.objects.filter(Prefix=prefix).delete()

1. AVRO schema

# test Avro Schema with all important cases

1

schema_string = """
{
   "mox-meta":{
      "version":"1.0.2",
      "type":"ENTITY_SNAPSHOT"
   },
   "namespace":"com.uam.datalake.v1",
   "type":"record",
   "name":"",
   "fields":[
      {
         "name":"customerId",
         "type":{
            "type":"string",
            "avro.java.string":"String"
         }
      },
      {
         "name":"isActive",
         "type":"boolean",
         "doc":"a boolean flag if the Customer is active"
      },
      {
         "name":"age",
         "type":"int"
      },
      {
         "name":"balance",
         "type":"float"
      },
      {
         "name":"accountBalance_logical_dec",
         "type":{
            "type":"bytes",
            "logicalType":"decimal",
            "precision":20,
            "scale":4
         }
      },
      {
         "name":"array_of_strings",
         "type":[
            "null",
            {
               "type":"array",
               "items":{
                  "type":"string",
                  "avro.java.string":"String"
               }
            }
         ],
         "default":null
      },
      {
         "name":"paymentDetails",
         "type":[
            "null",
            {
               "type":"record",
               "name":"PaymentDetails",
               "fields":[
                  {
                     "name":"counterPartyName",
                     "type":[
                        "null",
                        {
                           "type":"string",
                           "avro.java.string":"String"
                        }
                     ],
                     "default":null
                  },
                  {
                     "name":"groupingId",
                     "type":[
                        "null",
                        {
                           "type":"string",
                           "avro.java.string":"String"
                        }
                     ],
                     "default":null
                  },
                  {
                     "name":"payeeId",
                     "type":[
                        "null",
                        {
                           "type":"string",
                           "avro.java.string":"String"
                        }
                     ],
                     "default":null
                  },
                  {
                     "name":"message",
                     "type":[
                        "null",
                        {
                           "type":"string",
                           "avro.java.string":"String"
                        }
                     ],
                     "default":null
                  },
                  {
                     "name":"type",
                     "type":{
                        "type":"enum",
                        "name":"PaymentType",
                        "symbols":[
                           "UNKNOWN",
                           "ONE",
                           "TWO"                           
                        ]
                     }
                  },
                  {
                     "name":"otherAccountId",
                     "type":[
                        "null",
                        {
                           "type":"string",
                           "avro.java.string":"String"
                        }
                     ],
                     "default":null
                  }
               ]
            }
         ],
         "default":null
      },
      {
         "name":"parameters",
         "type":[
            "null",
            {
               "type":"map",
               "avro.java.string":"String",
               "values":{
                  "type":"string",
                  "avro.java.string":"String"
               }
            }
         ],
         "default":null
      }
   ]
}
"""

schema = json.loads(schema_string)
schema["name"] = EVENT_NAME

RECORD_NAME = schema["name"]
NAMESPACE = schema["namespace"]
VERSION = schema["mox-meta"]["version"]

print('%s %s %s' %(RECORD_NAME,NAMESPACE,VERSION))
schema
UamTestEvent com.uam.datalake.v1 1.0.2
{'mox-meta': {'version': '1.0.2', 'type': 'ENTITY_SNAPSHOT'},
 'namespace': 'com.uam.datalake.v1',
 'type': 'record',
 'name': 'UamTestEvent',
 'fields': [{'name': 'customerId',
   'type': {'type': 'string', 'avro.java.string': 'String'}},
  {'name': 'isActive',
   'type': 'boolean',
   'doc': 'a boolean flag if the Customer is active'},
  {'name': 'age', 'type': 'int'},
  {'name': 'balance', 'type': 'float'},
  {'name': 'accountBalance_logical_dec',
   'type': {'type': 'bytes',
    'logicalType': 'decimal',
    'precision': 20,
    'scale': 4}},
  {'name': 'array_of_strings',
   'type': ['null',
    {'type': 'array',
     'items': {'type': 'string', 'avro.java.string': 'String'}}],
   'default': None},
  {'name': 'paymentDetails',
   'type': ['null',
    {'type': 'record',
     'name': 'PaymentDetails',
     'fields': [{'name': 'counterPartyName',
       'type': ['null', {'type': 'string', 'avro.java.string': 'String'}],
       'default': None},
      {'name': 'groupingId',
       'type': ['null', {'type': 'string', 'avro.java.string': 'String'}],
       'default': None},
      {'name': 'payeeId',
       'type': ['null', {'type': 'string', 'avro.java.string': 'String'}],
       'default': None},
      {'name': 'message',
       'type': ['null', {'type': 'string', 'avro.java.string': 'String'}],
       'default': None},
      {'name': 'type',
       'type': {'type': 'enum',
        'name': 'PaymentType',
        'symbols': ['UNKNOWN', 'ONE', 'TWO']}},
      {'name': 'otherAccountId',
       'type': ['null', {'type': 'string', 'avro.java.string': 'String'}],
       'default': None}]}],
   'default': None},
  {'name': 'parameters',
   'type': ['null',
    {'type': 'map',
     'avro.java.string': 'String',
     'values': {'type': 'string', 'avro.java.string': 'String'}}],
   'default': None}]}

2. Generating test data

# generate some avro data (buffer file) based on the above schema

avro_schema = Parse(json.dumps(schema))
buf = io.BytesIO()
writer = DataFileWriter(buf, DatumWriter(), avro_schema)

for x in range(0, 10000):


    customer_id = fake.uuid4()
    amount = fake.pydecimal(left_digits=8, right_digits=4)
    amount_int = int(str(amount).replace('.', ''))

    strings_arrray = [fake.first_name() for x in range(0, fake.random.randint(1, 5))]

    paymentDetails = {'counterPartyName': customer_id,
                      'groupingId': str(fake.uuid4()), 'payeeId': None, 'message': None,
                      'type': 'ONE', 'otherAccountId': str(fake.uuid4())}
    
    randint = fake.random.randint(20, 70)
    

    array_of_structs = [{"field1": "one"}, {"field1": "two"}]
    customer = {
        "customerId": customer_id,
        "isActive": fake.random.choice([True, False]),
        "age": randint,
        "balance": fake.random.random() * 123,
        "accountBalance_logical_dec": amount_int.to_bytes(amount_int.bit_length() // 8 + 1, byteorder='big',
                                                          signed=True),
        "array_of_strings": strings_arrray,
        "paymentDetails": paymentDetails,

        "parameters": {"key1": "value1", "key2": "value2"}

    }
    writer.append(customer)


writer.flush()
raw_bytes = buf.getvalue()
tear_down_s3()

for i in range(1,11):

    target_key_name = '{record_name}/namespace={ns}/year=2020/month=2/day={day}/version={ver}/CustData_{rand}.avro'.format(
                    record_name=RECORD_NAME,ns=NAMESPACE, day=i,ver=VERSION, rand=fake.uuid4())
    try:
        response = s3_client.put_object(Body=raw_bytes, Bucket=S3_BUCKET, Key=target_key_name)
        print("uploaded %s" % target_key_name)
    except ClientError as e:
        logging.error(e)
uploaded UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=1/version=1.0.2/CustData_d6f44c22-5c87-4e14-956b-ad7d985226d0.avro
uploaded UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=2/version=1.0.2/CustData_cf0edf74-e76e-458f-a1d2-e092275a719c.avro
uploaded UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=3/version=1.0.2/CustData_f9ed4ad7-ed1c-4431-b9ab-317387cdb5af.avro
uploaded UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=4/version=1.0.2/CustData_d89b1c7f-66e3-4522-a603-1630abbf24fa.avro
uploaded UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=5/version=1.0.2/CustData_92483d0c-5254-4825-9bbb-59545fc6e4dd.avro
uploaded UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=6/version=1.0.2/CustData_3473649b-97c5-4597-965b-672a11cdad73.avro
uploaded UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=7/version=1.0.2/CustData_ecc999f8-eda7-4782-b844-17809980f34c.avro
uploaded UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=8/version=1.0.2/CustData_b95fd45c-8f0d-4612-8f8b-131437895013.avro
uploaded UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=9/version=1.0.2/CustData_5a5ef6ba-1576-4450-82a0-3f6f9a10a7c8.avro
uploaded UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=10/version=1.0.2/CustData_4e5dc9b8-6eb7-4fa4-93e0-61faf874b698.avro

3. Avro reading

print(target_key_name)

obj = s3_client.get_object(Bucket=S3_BUCKET, Key=target_key_name)
record_raw = obj['Body'].read()


reader = DataFileReader(io.BytesIO(record_raw), DatumReader())
for line in reader:
    print(line)
    break

avro_schema = reader.meta["avro.schema"]
reader.close()
UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=10/version=1.0.2/CustData_4e5dc9b8-6eb7-4fa4-93e0-61faf874b698.avro
{'customerId': 'cc733c92-6853-45f6-8e49-bec741188ebb', 'isActive': True, 'age': 58, 'balance': 49.34309768676758, 'accountBalance_logical_dec': b'7L\xbc\xff\xf3', 'array_of_strings': ['Rebecca'], 'paymentDetails': {'counterPartyName': 'cc733c92-6853-45f6-8e49-bec741188ebb', 'groupingId': '9626bf79-2f97-4c0c-9aae-de080adab7df', 'payeeId': None, 'message': None, 'type': 'ONE', 'otherAccountId': '69261bc2-4a71-4de7-bc8b-1beb0d9320ac'}, 'parameters': {'key1': 'value1', 'key2': 'value2'}}
avro_schema
b'{"type": "record", "mox-meta": {"version": "1.0.2", "type": "ENTITY_SNAPSHOT"}, "name": "UamTestEvent", "namespace": "com.uam.datalake.v1", "fields": [{"type": {"type": "string", "avro.java.string": "String"}, "name": "customerId"}, {"type": "boolean", "name": "isActive", "doc": "a boolean flag if the Customer is active"}, {"type": "int", "name": "age"}, {"type": "float", "name": "balance"}, {"type": {"type": "bytes", "logicalType": "decimal", "precision": 20, "scale": 4}, "name": "accountBalance_logical_dec"}, {"type": ["null", {"type": "array", "items": {"type": "string", "avro.java.string": "String"}}], "name": "array_of_strings", "default": null}, {"type": ["null", {"type": "record", "name": "PaymentDetails", "namespace": "com.uam.datalake.v1", "fields": [{"type": ["null", {"type": "string", "avro.java.string": "String"}], "name": "counterPartyName", "default": null}, {"type": ["null", {"type": "string", "avro.java.string": "String"}], "name": "groupingId", "default": null}, {"type": ["null", {"type": "string", "avro.java.string": "String"}], "name": "payeeId", "default": null}, {"type": ["null", {"type": "string", "avro.java.string": "String"}], "name": "message", "default": null}, {"type": {"type": "enum", "name": "PaymentType", "namespace": "com.uam.datalake.v1", "symbols": ["UNKNOWN", "ONE", "TWO"]}, "name": "type"}, {"type": ["null", {"type": "string", "avro.java.string": "String"}], "name": "otherAccountId", "default": null}]}], "name": "paymentDetails", "default": null}, {"type": ["null", {"type": "map", "avro.java.string": "String", "values": {"type": "string", "avro.java.string": "String"}}], "name": "parameters", "default": null}]}'
#register glue table with avro SCHEMA
tear_down_test_table() # create or replace

glue_client.create_table(
    DatabaseName=TEST_DB,
    TableInput={
        "Name" : TEST_TABLE_NAME,
        'Owner': 'owner',
        'StorageDescriptor': {
            'Columns': [
                {'Name': 'customerId', 'Type': 'string'},
                {'Name': 'isActive', 'Type': 'boolean'},
                {'Name': 'age', 'Type': 'int'},              
                {'Name': 'balance', 'Type': 'float'},
                {'Name': 'accountBalance_logical_dec', 'Type': 'decimal(20,4)'},
                {'Name': 'array_of_strings', 'Type': 'array<string>'},
                {'Name': 'paymentdetails',
                     'Type': 'struct<counterpartyname:string,groupingid:string,payeeid:string,message:string,type:string,otheraccountid:string>'},
                {'Name': 'parameters', 'Type': 'map<string,string>'}
            ],
        'Location': 's3://{}/{}/namespace={}/'.format(S3_BUCKET,RECORD_NAME,NAMESPACE),
        'InputFormat': 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat',
        'OutputFormat': 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat',
        'Compressed': False,
        'NumberOfBuckets': -1,
        'SerdeInfo': {
            'SerializationLibrary': 'org.apache.hadoop.hive.serde2.avro.AvroSerDe',
            'Parameters': 
               {
                    'avro.schema.literal': json.dumps(schema),
                    'serialization.format': '1'
               }
            },
        'BucketColumns': [],
        'SortColumns': [],
        },
        'PartitionKeys': [
          {'Name': 'year','Type': 'int'},
          {'Name': 'month','Type': 'int'},
          {'Name': 'day','Type': 'int'},
          {'Name': 'version','Type': 'string'}
        ],
        'TableType': 'EXTERNAL_TABLE',  
        'Parameters': {
        
          'avro.schema.literal': json.dumps(schema),
          'classification': 'avro',
          'compressionType': 'none',
          
        }
    }
)
tbl avro_uam_test not found
{'ResponseMetadata': {'RequestId': '1d5a05a1-0253-436c-a85a-d21f334d51ae',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sat, 24 Apr 2021 11:20:04 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': '1d5a05a1-0253-436c-a85a-d21f334d51ae'},
  'RetryAttempts': 0}}

now you can find the table in Glue Data Catalogue and query with Athena (remember about partitions)

SELECT * , paymentdetails.groupingid , "$path"
FROM "avro_uam_test" as a 
CROSS JOIN UNNEST(array_of_strings) as t(names)
where customerid = '0d6f913f-9364-4898-875e-d07311d1e300' and day = 1