chmury/labs/lambda/lambda_definition.py

52 lines
1.8 KiB
Python
Raw Permalink Normal View History

import urllib.parse
import awswrangler as wr
import pandas as pd
import boto3
client_ssm = boto3.client('ssm')
def etl_function(event, context):
processed_zone_prefix = "processed-zone"
record = event["Records"][0]
raw_bucket = record["s3"]["bucket"]["name"]
key = urllib.parse.unquote(record["s3"]["object"]["key"])
event_prefix = key.split('/')[1]
full_src_path = f's3://{raw_bucket}/{key}'
# Pobierz nazwę bucketu przetworzonych danych z SSM
processed_bucket = client_ssm.get_parameter(Name='s3_processed_bucket_name')['Parameter']['Value']
print(f'Processing key = {full_src_path}')
df = wr.s3.read_json(path=full_src_path, lines=True)
filename = key.split('/')[-1][-36:]
dest_prefix = f"s3://{processed_bucket}/{processed_zone_prefix}/{event_prefix}"
df['transaction_date'] = pd.to_datetime(df['transaction_ts'], unit='s')
df['year'] = df['transaction_date'].dt.year
df['month'] = df['transaction_date'].dt.month
df['day'] = df['transaction_date'].dt.day
df['hour'] = df['transaction_date'].dt.hour
cols_to_return = ["transaction_date", "price", "amount", "dollar_amount", "type", "trans_id"]
new_keys = []
for [symbol, year, month, day, hour], data in df.groupby(['symbol', 'year', 'month', 'day', 'hour']):
partitions = f"symbol={symbol}/year={year}/month={month}/day={day}/hour={hour}"
full_key_name = '/'.join([dest_prefix, partitions, filename + '.parquet'])
print(f'Saving a new key = {full_key_name}')
new_keys.append(full_key_name)
wr.s3.to_parquet(
df=data[cols_to_return],
path=full_key_name,
compression='snappy'
)
return {
'key': key,
'statusCode': 200,
'new_keys': new_keys
}