0
0
Fork 0
cwiczenia/data_skew2.py

41 lines
1.5 KiB
Python

from pyspark.sql import SparkSession
import logging
import datetime
import time
import pyspark.sql.functions as f
spark = SparkSession \
.builder \
.appName("too_few_partitions") \
.enableHiveSupport() \
.getOrCreate()
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
try:
logger.info("SPARKAPP START")
start = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
orders_skew = spark.sql("select * from hrucinska.uam_orders")
orders_partial_reduce = orders_skew.groupBy("seller_id", "buyer_id").agg(f.count("order_offer_id").alias("order_offer_id"))
orders = orders_partial_reduce.groupBy("seller_id").agg(f.count("order_offer_id").alias("order_offer_id")).withColumnRenamed("seller_id", "seller_id_order")
offers = spark.sql("select * from hrucinska.uam_offers")
user_offers_orders = offers.join(orders, offers.seller_id == orders.seller_id_order, "left")
res = user_offers_orders.groupBy(user_offers_orders.seller_id).agg(f.count(user_offers_orders.offer_id).alias("user_offers"), f.count(user_offers_orders.order_offer_id).alias("user_orders"))
res.write.mode("overwrite").saveAsTable("hrucinska.data_skew2")
stop = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
print "czas rozpoczecia obliczen2: ", start
print "czas zakonczenia obliczen2: ", stop
time.sleep(180)
except Exception as inst:
logger.info("SPARKAPP ERROR {0}".format(inst))
raise
finally:
logger.info("SPARKAPP STOP")