2021-04-24 15:54:14 +02:00
|
|
|
import urllib.parse
|
|
|
|
import awswrangler as wr
|
|
|
|
import pandas as pd
|
2024-05-12 11:12:29 +02:00
|
|
|
import boto3
|
2021-04-24 15:54:14 +02:00
|
|
|
|
2024-05-12 11:12:29 +02:00
|
|
|
client_ssm = boto3.client('ssm')
|
2021-04-24 15:54:14 +02:00
|
|
|
|
|
|
|
def etl_function(event, context):
|
|
|
|
processed_zone_prefix = "processed-zone"
|
2024-05-12 11:12:29 +02:00
|
|
|
|
2021-04-24 15:54:14 +02:00
|
|
|
record = event["Records"][0]
|
2024-05-12 11:12:29 +02:00
|
|
|
src_bucket = record["s3"]["bucket"]["name"]
|
|
|
|
target_bucket = client_ssm.get_parameter(Name='s3_processed_bucket_name')['Parameter']['Value']
|
2021-04-24 15:54:14 +02:00
|
|
|
key = urllib.parse.unquote(record["s3"]["object"]["key"])
|
|
|
|
event_prefix = key.split('/')[1]
|
2024-05-12 11:12:29 +02:00
|
|
|
full_src_path = 's3://{src_bucket}/{key}'.format(src_bucket=src_bucket, key=key)
|
2021-04-24 15:54:14 +02:00
|
|
|
|
|
|
|
print(f'Processing key = {full_src_path}')
|
|
|
|
df = wr.s3.read_json(path=full_src_path, lines=True)
|
|
|
|
|
|
|
|
filename = key.split('/')[-1][-36:]
|
2024-05-12 11:12:29 +02:00
|
|
|
dest_prefix = f"s3://{target_bucket}/{processed_zone_prefix}/{event_prefix}"
|
2021-04-24 15:54:14 +02:00
|
|
|
|
|
|
|
df['transaction_date'] = pd.to_datetime(df['transaction_ts'], unit='s')
|
|
|
|
df['year'] = df['transaction_date'].dt.year
|
|
|
|
df['month'] = df['transaction_date'].dt.month
|
|
|
|
df['day'] = df['transaction_date'].dt.day
|
|
|
|
df['hour'] = df['transaction_date'].dt.hour
|
|
|
|
|
|
|
|
cols_to_return = ["transaction_date", "price", "amount", "dollar_amount", "type", "trans_id"]
|
|
|
|
|
|
|
|
new_keys = []
|
|
|
|
for [symbol, year, month, day, hour], data in df.groupby(['symbol', 'year', 'month', 'day', 'hour']):
|
|
|
|
partitions = f"symbol={symbol}/year={year}/month={month}/day={day}/hour={hour}"
|
|
|
|
full_key_name = '/'.join([dest_prefix, partitions, filename + '.parquet'])
|
|
|
|
|
|
|
|
print(f'Saving a new key = {full_key_name}')
|
|
|
|
new_keys.append(full_key_name)
|
|
|
|
|
|
|
|
wr.s3.to_parquet(
|
|
|
|
df=data[cols_to_return],
|
|
|
|
path=full_key_name,
|
|
|
|
compression='snappy'
|
|
|
|
)
|
|
|
|
|
|
|
|
return {
|
|
|
|
'key': key,
|
|
|
|
'statusCode': 200,
|
|
|
|
'new_keys': new_keys
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
event = ""
|
|
|
|
context = ""
|
|
|
|
|
|
|
|
response = etl_function(event, context)
|