Lambda bare (initial)
This commit is contained in:
parent
467bde9431
commit
dded367d3a
57
lambda_definition.py
Normal file
57
lambda_definition.py
Normal file
@ -0,0 +1,57 @@
|
||||
import urllib.parse
|
||||
import awswrangler as wr
|
||||
import pandas as pd
|
||||
|
||||
|
||||
def etl_function(event, context):
|
||||
processed_zone_prefix = "processed-zone"
|
||||
|
||||
record = event["Records"][0]
|
||||
bucket = record["s3"]["bucket"]["name"]
|
||||
key = urllib.parse.unquote(record["s3"]["object"]["key"])
|
||||
event_prefix = key.split('/')[1]
|
||||
full_src_path = 's3://{bucket}/{key}'.format(bucket=bucket, key=key)
|
||||
|
||||
print(f'Processing key = {full_src_path}')
|
||||
df = wr.s3.read_json(path=full_src_path, lines=True)
|
||||
|
||||
filename = key.split('/')[-1][-36:]
|
||||
dest_prefix = f"s3://{bucket}/{processed_zone_prefix}/{event_prefix}"
|
||||
print('Dest_pref: ')
|
||||
print(f"s3://{bucket}/{processed_zone_prefix}/{event_prefix}")
|
||||
print(dest_prefix.replace('raw','processed'))
|
||||
|
||||
df['transaction_date'] = pd.to_datetime(df['transaction_ts'], unit='s')
|
||||
df['year'] = df['transaction_date'].dt.year
|
||||
df['month'] = df['transaction_date'].dt.month
|
||||
df['day'] = df['transaction_date'].dt.day
|
||||
df['hour'] = df['transaction_date'].dt.hour
|
||||
|
||||
cols_to_return = ["transaction_date", "price", "amount", "dollar_amount", "type", "trans_id"]
|
||||
|
||||
new_keys = []
|
||||
for [symbol, year, month, day, hour], data in df.groupby(['symbol', 'year', 'month', 'day', 'hour']):
|
||||
partitions = f"symbol={symbol}/year={year}/month={month}/day={day}/hour={hour}"
|
||||
full_key_name = '/'.join([dest_prefix, partitions, filename + '.parquet'])
|
||||
|
||||
print(f'Saving a new key = {full_key_name}')
|
||||
new_keys.append(full_key_name)
|
||||
|
||||
wr.s3.to_parquet(
|
||||
df=data[cols_to_return],
|
||||
path=full_key_name,
|
||||
compression='snappy'
|
||||
)
|
||||
|
||||
return {
|
||||
'key': key,
|
||||
'statusCode': 200,
|
||||
'new_keys': new_keys
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
event = ""
|
||||
context = ""
|
||||
|
||||
response = etl_function(event, context)
|
Loading…
Reference in New Issue
Block a user