cwiczenia/data_skew.py

38 lines
1.3 KiB
Python
Raw Normal View History

2018-05-27 12:25:47 +02:00
from pyspark.sql import SparkSession
import logging
import datetime
import time
import pyspark.sql.functions as f
spark = SparkSession \
.builder \
.appName("too_few_partitions") \
.enableHiveSupport() \
.getOrCreate()
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
try:
logger.info("SPARKAPP START")
start = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
orders = spark.sql("select * from hrucinska.uam_orders").withColumnRenamed("seller_id", "seller_id_order")
offers = spark.sql("select * from hrucinska.uam_offers").withColumnRenamed("offer_id", "offerId")
user_offers_orders = offers.join(orders, offers.seller_id == orders.seller_id_order, "left")
res = user_offers_orders.groupBy(user_offers_orders.seller_id).agg(f.count(user_offers_orders.offerId).alias("user_offers"), f.count(user_offers_orders.order_offer_id).alias("user_orders"))
res.write.mode("overwrite").saveAsTable("hrucinska.data_skew")
stop = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
print "czas rozpoczecia obliczen 1: ", start
print "czas zakonczenia obliczen1: ", stop
time.sleep(180)
except Exception as inst:
logger.info("SPARKAPP ERROR {0}".format(inst))
raise
finally:
logger.info("SPARKAPP STOP")