forked from hrucinska/cwiczenia
26 lines
1.1 KiB
Python
26 lines
1.1 KiB
Python
from pyspark.sql import SparkSession
|
|
import pyspark.sql.functions as f
|
|
import time
|
|
|
|
spark = SparkSession \
|
|
.builder \
|
|
.appName("union_all") \
|
|
.enableHiveSupport() \
|
|
.getOrCreate()
|
|
|
|
pv = spark.sql("select * from hrucinska.uam_offers_pv")
|
|
offers = spark.sql("select * from hrucinska.uam_offers").cache()
|
|
stock = spark.sql("select * from hrucinska.uam_offers_stock")
|
|
o_pv = offers.join(pv, offers.offer_id == pv.pv_offer_id)
|
|
res = o_pv.join(stock, o_pv.offer_id == stock.stock_offer_id)
|
|
res.write.mode("overwrite").saveAsTable("hrucinska.join_all")
|
|
|
|
pv = spark.sql("select pv_offer_id as offerId, pv_ct, 0 as initial_quantity, 0 as current_quantity from hrucinska.uam_offers_pv")
|
|
stock = spark.sql("select stock_offer_id as offerId, 0 as pv_ct, initial_quantity, current_quantity from hrucinska.uam_offers_stock")
|
|
p_s = pv.union(stock)
|
|
ps = p_s.groupBy(p_s.offerId).agg(f.sum(p_s.pv_ct).alias("pv_ct"), f.max(p_s.initial_quantity).alias("iq"), f.max(p_s.current_quantity).alias("cq"))
|
|
res = offers.join(ps, offers.offer_id == ps.offerId)
|
|
res.write.mode("overwrite").saveAsTable("hrucinska.join_all2")
|
|
|
|
time.sleep(180)
|