52 lines
1.8 KiB
Python
52 lines
1.8 KiB
Python
import urllib.parse
|
|
import awswrangler as wr
|
|
import pandas as pd
|
|
import boto3
|
|
|
|
client_ssm = boto3.client('ssm')
|
|
|
|
def etl_function(event, context):
|
|
processed_zone_prefix = "processed-zone"
|
|
|
|
record = event["Records"][0]
|
|
raw_bucket = record["s3"]["bucket"]["name"]
|
|
key = urllib.parse.unquote(record["s3"]["object"]["key"])
|
|
event_prefix = key.split('/')[1]
|
|
full_src_path = f's3://{raw_bucket}/{key}'
|
|
|
|
# Pobierz nazwę bucketu przetworzonych danych z SSM
|
|
processed_bucket = client_ssm.get_parameter(Name='s3_processed_bucket_name')['Parameter']['Value']
|
|
|
|
print(f'Processing key = {full_src_path}')
|
|
df = wr.s3.read_json(path=full_src_path, lines=True)
|
|
|
|
filename = key.split('/')[-1][-36:]
|
|
dest_prefix = f"s3://{processed_bucket}/{processed_zone_prefix}/{event_prefix}"
|
|
|
|
df['transaction_date'] = pd.to_datetime(df['transaction_ts'], unit='s')
|
|
df['year'] = df['transaction_date'].dt.year
|
|
df['month'] = df['transaction_date'].dt.month
|
|
df['day'] = df['transaction_date'].dt.day
|
|
df['hour'] = df['transaction_date'].dt.hour
|
|
|
|
cols_to_return = ["transaction_date", "price", "amount", "dollar_amount", "type", "trans_id"]
|
|
|
|
new_keys = []
|
|
for [symbol, year, month, day, hour], data in df.groupby(['symbol', 'year', 'month', 'day', 'hour']):
|
|
partitions = f"symbol={symbol}/year={year}/month={month}/day={day}/hour={hour}"
|
|
full_key_name = '/'.join([dest_prefix, partitions, filename + '.parquet'])
|
|
|
|
print(f'Saving a new key = {full_key_name}')
|
|
new_keys.append(full_key_name)
|
|
|
|
wr.s3.to_parquet(
|
|
df=data[cols_to_return],
|
|
path=full_key_name,
|
|
compression='snappy'
|
|
)
|
|
|
|
return {
|
|
'key': key,
|
|
'statusCode': 200,
|
|
'new_keys': new_keys
|
|
} |