from pyspark.sql import SparkSession import logging import datetime import time import pyspark.sql.functions as f spark = SparkSession \ .builder \ .appName("too_few_partitions") \ .enableHiveSupport() \ .getOrCreate() log4jLogger = spark._jvm.org.apache.log4j logger = log4jLogger.LogManager.getLogger(__name__) try: logger.info("SPARKAPP START") start = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') orders_skew = spark.sql("select * from hrucinska.uam_orders") orders_partial_reduce = orders_skew.groupBy("seller_id", "buyer_id").agg(f.count("order_offer_id").alias("order_offer_id")) orders = orders_partial_reduce.groupBy("seller_id").agg(f.count("order_offer_id").alias("order_offer_id")).withColumnRenamed("seller_id", "seller_id_order") offers = spark.sql("select * from hrucinska.uam_offers") user_offers_orders = offers.join(orders, offers.seller_id == orders.seller_id_order, "left") res = user_offers_orders.groupBy(user_offers_orders.seller_id).agg(f.count(user_offers_orders.offer_id).alias("user_offers"), f.count(user_offers_orders.order_offer_id).alias("user_orders")) res.write.mode("overwrite").saveAsTable("hrucinska.data_skew2") stop = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') print "czas rozpoczecia obliczen2: ", start print "czas zakonczenia obliczen2: ", stop time.sleep(180) except Exception as inst: logger.info("SPARKAPP ERROR {0}".format(inst)) raise finally: logger.info("SPARKAPP STOP")