# 2 Format danych Parquet

* wykonaj wszystkie kroki z ćwiczenia 1
* przekonwertuj pliki AVRO do Parquet z wykorzystaniem bibliotek pandas i pandavro
* zapisz pliki na S3 w osobnej ścieżce
* zarejestruj tabelę GLUE z wykorzystaniem boto3 / crawlera
* odśwież partycje (MSCK REPAIR w Athena/Load partitions na tabeli)
* sprawdź efektywność zapytań agregujących - np AVG(Age) - porównaj wydajność (ilosć przeskanowanych danych = koszt & czas)

Przykłady kwerened do testów :

SELECT AVG(age) FROM "uam"."avro_uam_test" where day = '1'

SELECT AVG(age) FROM "uam"."parquet_uam_test" where day = '1'

## Pamiętaj aby po skończonych ćwiczeniach usunąć wszystkie obiekty
### Uwaga !!! poniższy skrypt tworzy obiekty w regionie HongKong !


In [2]:
import boto3
import io 
import json
 
REGION = "us-east-1"


session_kwargs = {

    "aws_access_key_id":"",
    "aws_secret_access_key":"",
    "aws_session_token":"",
    "region_name": REGION
}
session = boto3.Session(**session_kwargs)

In [3]:
 
S3_BUCKET = "datalake-dev-920628590621-us-east-1"

TEST_DB = 'datalake_dev_jk'
TEST_TABLE_NAME = 'parquet_uam_test'
EVENT_NAME = "UamTestEvent"

PARQUET_PREFIX = "Parquet_UamTestEvent"
NAMESPACE = "com.uam.datalake.v1"

def tear_down_test_table(database=TEST_DB, table_name=TEST_TABLE_NAME):
    tbl_list = [x["Name"] for x in glue_client.get_tables(DatabaseName=database)["TableList"]]
    if table_name in tbl_list:
        glue_client.delete_table(DatabaseName=database,Name=table_name)
        print("test table {} deleted".format(table_name))
    else:
        print("tbl %s not found" % table_name)
        
def tear_down_s3(bucket=S3_BUCKET,prefix=PARQUET_PREFIX):
    s3 = boto3.resource('s3',**session_kwargs)
    bucket = s3.Bucket(bucket)
    bucket.objects.filter(Prefix=prefix).delete()

In [4]:
s3_client = session.client("s3")
glue_client = session.client("glue")

avro_files = [keys["Key"] for keys in s3_client.list_objects(Bucket=S3_BUCKET, Prefix=EVENT_NAME)["Contents"]]
avro_files

['UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=1/version=1.0.2/CustData_d6f44c22-5c87-4e14-956b-ad7d985226d0.avro',
 'UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=10/version=1.0.2/CustData_4e5dc9b8-6eb7-4fa4-93e0-61faf874b698.avro',
 'UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=2/version=1.0.2/CustData_cf0edf74-e76e-458f-a1d2-e092275a719c.avro',
 'UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=3/version=1.0.2/CustData_f9ed4ad7-ed1c-4431-b9ab-317387cdb5af.avro',
 'UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=4/version=1.0.2/CustData_d89b1c7f-66e3-4522-a603-1630abbf24fa.avro',
 'UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=5/version=1.0.2/CustData_92483d0c-5254-4825-9bbb-59545fc6e4dd.avro',
 'UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=6/version=1.0.2/CustData_3473649b-97c5-4597-965b-672a11cdad73.avro',
 'UamTestEvent/namespace=com.uam.datalake.v1/ye

In [5]:
import tempfile
import os
import pandavro as pdx
import pandas as pd

In [6]:
obj = s3_client.get_object(Bucket=S3_BUCKET, Key=avro_files[0])

In [7]:
record_raw = obj['Body'].read()

In [8]:
record_raw

b'Obj\x01\x04\x14avro.codec\x08null\x16avro.schema\x92\x1a{"type": "record", "mox-meta": {"version": "1.0.2", "type": "ENTITY_SNAPSHOT"}, "name": "UamTestEvent", "namespace": "com.uam.datalake.v1", "fields": [{"type": {"type": "string", "avro.java.string": "String"}, "name": "customerId"}, {"type": "boolean", "name": "isActive", "doc": "a boolean flag if the Customer is active"}, {"type": "int", "name": "age"}, {"type": "float", "name": "balance"}, {"type": {"type": "bytes", "logicalType": "decimal", "precision": 20, "scale": 4}, "name": "accountBalance_logical_dec"}, {"type": ["null", {"type": "array", "items": {"type": "string", "avro.java.string": "String"}}], "name": "array_of_strings", "default": null}, {"type": ["null", {"type": "record", "name": "PaymentDetails", "namespace": "com.uam.datalake.v1", "fields": [{"type": ["null", {"type": "string", "avro.java.string": "String"}], "name": "counterPartyName", "default": null}, {"type": ["null", {"type": "string", "avro.java.string": 

In [9]:
from avro.datafile import DataFileReader
from avro.io import DatumReader

reader = DataFileReader(io.BytesIO(record_raw), DatumReader())
avro_schema = json.loads(reader.meta["avro.schema"])
reader.close()

print(avro_schema)

RECORD_NAME = avro_schema["name"]
NAMESPACE = avro_schema["namespace"]
VERSION = avro_schema["mox-meta"]["version"]

{'type': 'record', 'mox-meta': {'version': '1.0.2', 'type': 'ENTITY_SNAPSHOT'}, 'name': 'UamTestEvent', 'namespace': 'com.uam.datalake.v1', 'fields': [{'type': {'type': 'string', 'avro.java.string': 'String'}, 'name': 'customerId'}, {'type': 'boolean', 'name': 'isActive', 'doc': 'a boolean flag if the Customer is active'}, {'type': 'int', 'name': 'age'}, {'type': 'float', 'name': 'balance'}, {'type': {'type': 'bytes', 'logicalType': 'decimal', 'precision': 20, 'scale': 4}, 'name': 'accountBalance_logical_dec'}, {'type': ['null', {'type': 'array', 'items': {'type': 'string', 'avro.java.string': 'String'}}], 'name': 'array_of_strings', 'default': None}, {'type': ['null', {'type': 'record', 'name': 'PaymentDetails', 'namespace': 'com.uam.datalake.v1', 'fields': [{'type': ['null', {'type': 'string', 'avro.java.string': 'String'}], 'name': 'counterPartyName', 'default': None}, {'type': ['null', {'type': 'string', 'avro.java.string': 'String'}], 'name': 'groupingId', 'default': None}, {'type

In [10]:
f = io.BytesIO(record_raw)
f.seek(0)

df = pdx.read_avro(f)

In [11]:
print(df.shape)
df.head()

(10000, 8)


Unnamed: 0,customerId,isActive,age,balance,accountBalance_logical_dec,array_of_strings,paymentDetails,parameters
0,cc733c92-6853-45f6-8e49-bec741188ebb,True,58,49.343098,23751065.5987,[Rebecca],{'counterPartyName': 'cc733c92-6853-45f6-8e49-...,"{'key1': 'value1', 'key2': 'value2'}"
1,7217d7d2-6f24-4bf5-942d-3e4cf15982c1,True,60,69.04995,604206.7111,[Amanda],{'counterPartyName': '7217d7d2-6f24-4bf5-942d-...,"{'key1': 'value1', 'key2': 'value2'}"
2,d9117f52-3839-4641-a470-7de16f437d8b,False,32,30.037497,-26093820.4854,"[Nancy, Michael]",{'counterPartyName': 'd9117f52-3839-4641-a470-...,"{'key1': 'value1', 'key2': 'value2'}"
3,6527fe9d-4eb8-4d82-bb8b-01321086a9ed,True,23,92.981667,-92047155.6115,"[Meghan, Kevin, Timothy, Angelica]",{'counterPartyName': '6527fe9d-4eb8-4d82-bb8b-...,"{'key1': 'value1', 'key2': 'value2'}"
4,0d6f913f-9364-4898-875e-d07311d1e300,False,57,51.859432,-73779420.6927,"[Allison, James, Danielle]",{'counterPartyName': '0d6f913f-9364-4898-875e-...,"{'key1': 'value1', 'key2': 'value2'}"


In [12]:
df.to_parquet("local_file.parquet", compression='gzip')

In [17]:
def save_df(df, full_path, output_file_name):
    
    parquet_key_name = '/'.join([full_path, output_file_name])    
    with tempfile.TemporaryDirectory() as tmpdirname:
        local_file = os.path.join(tmpdirname, output_file_name)
        
        df.to_parquet(local_file, compression='gzip')
        
        try:
            s3_client.upload_file(local_file, S3_BUCKET, parquet_key_name)
            print("{} uploaded".format(parquet_key_name))
                  
        except ClientError as e:
            print(e)   
    
    

for every_avro in avro_files:   
    
    obj = s3_client.get_object(Bucket=S3_BUCKET, Key=every_avro)
    record_raw = obj['Body'].read()

    f = io.BytesIO(record_raw)
    f.seek(0)
    
    df = pdx.read_avro(f)
    

    full_path = '/'.join(every_avro.split("/")[:-1]).replace('avro','parquet').replace("UamTestEvent",PARQUET_PREFIX)
    output_file_name = every_avro.split("/")[-1].replace('avro','parquet')
    
    save_df(df, full_path, output_file_name)

Parquet_UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=1/version=1.0.2/CustData_d6f44c22-5c87-4e14-956b-ad7d985226d0.parquet uploaded
Parquet_UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=10/version=1.0.2/CustData_4e5dc9b8-6eb7-4fa4-93e0-61faf874b698.parquet uploaded
Parquet_UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=2/version=1.0.2/CustData_cf0edf74-e76e-458f-a1d2-e092275a719c.parquet uploaded
Parquet_UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=3/version=1.0.2/CustData_f9ed4ad7-ed1c-4431-b9ab-317387cdb5af.parquet uploaded
Parquet_UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=4/version=1.0.2/CustData_d89b1c7f-66e3-4522-a603-1630abbf24fa.parquet uploaded
Parquet_UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=5/version=1.0.2/CustData_92483d0c-5254-4825-9bbb-59545fc6e4dd.parquet uploaded
Parquet_UamTestEvent/namespace=com.uam.datalake.v1/year=2020/month=2/day=6/version=1.0.

In [26]:
def tear_down_test_table(database=TEST_DB, table_name=TEST_TABLE_NAME):
    tbl_list = [x["Name"] for x in glue_client.get_tables(DatabaseName=database)["TableList"]]
    if table_name in tbl_list:
        glue_client.delete_table(DatabaseName=database,Name=table_name)
        print("test table {} deleted".format(table_name))
    else:
        print("tbl %s not found" % table_name)
        
tear_down_test_table() # create or replace

glue_client.create_table(
    DatabaseName=TEST_DB,
    TableInput={
        "Name" : TEST_TABLE_NAME,
       
 'Owner': 'owner',
 'PartitionKeys': [
                   {'Name': 'year', 'Type': 'int'},
                   {'Name': 'month', 'Type': 'int'},
                   {'Name': 'day', 'Type': 'int'},
                   {'Name': 'version', 'Type': 'string'}],
 'Retention': 0,
 'StorageDescriptor': {'BucketColumns': [],
                       'Columns': [{'Name': 'customerid', 'Type': 'string'},
                                   {'Name': 'isactive', 'Type': 'boolean'},
                                   {'Name': 'age', 'Type': 'bigint'},
                                   {'Name': 'balance', 'Type': 'double'},
                                   {'Name': 'accountbalance_logical_dec',
                                    'Type': 'decimal(12,4)'},
                                   {'Name': 'array_of_strings',
                                    'Type': 'array<string>'},
                                   {'Name': 'paymentdetails',
                                    'Type': 'struct<counterPartyName:string,groupingId:string,message:int,otherAccountId:string,payeeId:int,type:string>'},
                                   {'Name': 'parameters',
                                    'Type': 'struct<key1:string,key2:string>'}],
                       'Compressed': True,
                       'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
                        'Location': 's3://{}/{}/namespace={}/'.format(S3_BUCKET,PARQUET_PREFIX,NAMESPACE),
                       'NumberOfBuckets': -1,
                       'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',

                       'SerdeInfo': {'Parameters': {'serialization.format': '1'},
                                     'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'},
                       'SortColumns': [],
                       'StoredAsSubDirectories': False},
 'TableType': 'EXTERNAL_TABLE'
    }
)

test table parquet_uam_test deleted


{'ResponseMetadata': {'RequestId': 'aa815163-ae98-4e66-8bae-a7c376daead6',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sat, 24 Apr 2021 11:59:42 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'aa815163-ae98-4e66-8bae-a7c376daead6'},
  'RetryAttempts': 0}}

SELECT customerid , "$path"
FROM "parquet_uam_test" as a 
where customerid = '0d6f913f-9364-4898-875e-d07311d1e300' and day = 1


SELECT customerid  , "$path"
FROM "avro_uam_test" as a 
where customerid = '0d6f913f-9364-4898-875e-d07311d1e300' and day = 1

In [24]:
loc = f's3://{S3_BUCKET}/{PARQUET_PREFIX}/namespace=com.uam.datalake.v1/year=2020/month=2/day=1/version=1.0.2/'

response = glue_client.create_partition(
    
    DatabaseName=TEST_DB,
    TableName=TEST_TABLE_NAME,
    PartitionInput={
        'Values': [
            '2020','2','1', '1.0.2'
        ],        
        'StorageDescriptor': {
            'Location': loc,
            'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
            'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
            'Compressed': False,
            'NumberOfBuckets': -1,
            'SerdeInfo': {'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'},
            'BucketColumns': [],
            
        }
        
        
    }
)