1
0
cwiczenia/cache.py
Hanna Rucinska d149f7d56e cwiczenia
2018-05-27 12:25:47 +02:00

26 lines
1.1 KiB
Python

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import time
spark = SparkSession \
.builder \
.appName("union_all") \
.enableHiveSupport() \
.getOrCreate()
pv = spark.sql("select * from hrucinska.uam_offers_pv")
offers = spark.sql("select * from hrucinska.uam_offers").cache()
stock = spark.sql("select * from hrucinska.uam_offers_stock")
o_pv = offers.join(pv, offers.offer_id == pv.pv_offer_id)
res = o_pv.join(stock, o_pv.offer_id == stock.stock_offer_id)
res.write.mode("overwrite").saveAsTable("hrucinska.join_all")
pv = spark.sql("select pv_offer_id as offerId, pv_ct, 0 as initial_quantity, 0 as current_quantity from hrucinska.uam_offers_pv")
stock = spark.sql("select stock_offer_id as offerId, 0 as pv_ct, initial_quantity, current_quantity from hrucinska.uam_offers_stock")
p_s = pv.union(stock)
ps = p_s.groupBy(p_s.offerId).agg(f.sum(p_s.pv_ct).alias("pv_ct"), f.max(p_s.initial_quantity).alias("iq"), f.max(p_s.current_quantity).alias("cq"))
res = offers.join(ps, offers.offer_id == ps.offerId)
res.write.mode("overwrite").saveAsTable("hrucinska.join_all2")
time.sleep(180)