1
0

cwiczenia

This commit is contained in:
Hanna Rucinska 2018-05-27 12:25:47 +02:00
commit d149f7d56e
17 changed files with 423 additions and 0 deletions

45
broadcast.py Normal file
View File

@ -0,0 +1,45 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
import logging
import datetime
import time
spark = SparkSession \
.builder \
.appName("broadcast") \
.enableHiveSupport() \
.getOrCreate()
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
try:
logger.info("SPARKAPP START")
start1 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
cat = spark.sql("select * from hrucinska.uam_categories")
offers = spark.sql("select * from hrucinska.uam_offers")
res = offers.join(cat, cat.category_id == offers.category_leaf)
print res.where(res.category_level2 == "RTV i AGD").count()
stop1 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
start2 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
res = offers.join(broadcast(cat), cat.category_id == offers.category_leaf)
print res.where(res.category_level2 == "RTV i AGD").count()
stop2 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
print "czas rozpoczecia obliczen 1: ", start1
print "czas zakonczenia obliczen1: ", stop1
print "czas rozpoczecia obliczen 2: ", start2
print "czas zakonczenia obliczen 2: ", stop2
time.sleep(180)
except Exception as inst:
logger.info("SPARKAPP ERROR {0}".format(inst))
finally:
logger.info("SPARKAPP STOP")

25
cache.py Normal file
View File

@ -0,0 +1,25 @@
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import time
spark = SparkSession \
.builder \
.appName("union_all") \
.enableHiveSupport() \
.getOrCreate()
pv = spark.sql("select * from hrucinska.uam_offers_pv")
offers = spark.sql("select * from hrucinska.uam_offers").cache()
stock = spark.sql("select * from hrucinska.uam_offers_stock")
o_pv = offers.join(pv, offers.offer_id == pv.pv_offer_id)
res = o_pv.join(stock, o_pv.offer_id == stock.stock_offer_id)
res.write.mode("overwrite").saveAsTable("hrucinska.join_all")
pv = spark.sql("select pv_offer_id as offerId, pv_ct, 0 as initial_quantity, 0 as current_quantity from hrucinska.uam_offers_pv")
stock = spark.sql("select stock_offer_id as offerId, 0 as pv_ct, initial_quantity, current_quantity from hrucinska.uam_offers_stock")
p_s = pv.union(stock)
ps = p_s.groupBy(p_s.offerId).agg(f.sum(p_s.pv_ct).alias("pv_ct"), f.max(p_s.initial_quantity).alias("iq"), f.max(p_s.current_quantity).alias("cq"))
res = offers.join(ps, offers.offer_id == ps.offerId)
res.write.mode("overwrite").saveAsTable("hrucinska.join_all2")
time.sleep(180)

35
configuration.py Normal file
View File

@ -0,0 +1,35 @@
from pyspark.sql import SparkSession
import logging
import datetime
import time
spark = SparkSession \
.builder \
.appName("configuration_test") \
.enableHiveSupport() \
.getOrCreate()
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
try:
logger.info("SPARKAPP START")
start = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
cat = spark.sql("select * from hrucinska.uam_categories")
offers = spark.sql("select * from hrucinska.uam_offers")
res = offers.join(cat, cat.category_id == offers.category_leaf)
print res.where(res.category_level2 == "RTV i AGD").count()
stop = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
print "czas rozpoczecia obliczen 1: ", start
print "czas zakonczenia obliczen1: ", stop
time.sleep(180)
except Exception as inst:
logger.info("SPARKAPP ERROR {0}".format(inst))
finally:
logger.info("SPARKAPP STOP")

37
data_skew.py Normal file
View File

@ -0,0 +1,37 @@
from pyspark.sql import SparkSession
import logging
import datetime
import time
import pyspark.sql.functions as f
spark = SparkSession \
.builder \
.appName("too_few_partitions") \
.enableHiveSupport() \
.getOrCreate()
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
try:
logger.info("SPARKAPP START")
start = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
orders = spark.sql("select * from hrucinska.uam_orders").withColumnRenamed("seller_id", "seller_id_order")
offers = spark.sql("select * from hrucinska.uam_offers").withColumnRenamed("offer_id", "offerId")
user_offers_orders = offers.join(orders, offers.seller_id == orders.seller_id_order, "left")
res = user_offers_orders.groupBy(user_offers_orders.seller_id).agg(f.count(user_offers_orders.offerId).alias("user_offers"), f.count(user_offers_orders.order_offer_id).alias("user_orders"))
res.write.mode("overwrite").saveAsTable("hrucinska.data_skew")
stop = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
print "czas rozpoczecia obliczen 1: ", start
print "czas zakonczenia obliczen1: ", stop
time.sleep(180)
except Exception as inst:
logger.info("SPARKAPP ERROR {0}".format(inst))
raise
finally:
logger.info("SPARKAPP STOP")

40
data_skew2.py Normal file
View File

@ -0,0 +1,40 @@
from pyspark.sql import SparkSession
import logging
import datetime
import time
import pyspark.sql.functions as f
spark = SparkSession \
.builder \
.appName("too_few_partitions") \
.enableHiveSupport() \
.getOrCreate()
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
try:
logger.info("SPARKAPP START")
start = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
orders_skew = spark.sql("select * from hrucinska.uam_orders")
orders_partial_reduce = orders_skew.groupBy("seller_id", "buyer_id").agg(f.count("order_offer_id").alias("order_offer_id"))
orders = orders_partial_reduce.groupBy("seller_id").agg(f.count("order_offer_id").alias("order_offer_id")).withColumnRenamed("seller_id", "seller_id_order")
offers = spark.sql("select * from hrucinska.uam_offers")
user_offers_orders = offers.join(orders, offers.seller_id == orders.seller_id_order, "left")
res = user_offers_orders.groupBy(user_offers_orders.seller_id).agg(f.count(user_offers_orders.offer_id).alias("user_offers"), f.count(user_offers_orders.order_offer_id).alias("user_orders"))
res.write.mode("overwrite").saveAsTable("hrucinska.data_skew2")
stop = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
print "czas rozpoczecia obliczen2: ", start
print "czas zakonczenia obliczen2: ", stop
time.sleep(180)
except Exception as inst:
logger.info("SPARKAPP ERROR {0}".format(inst))
raise
finally:
logger.info("SPARKAPP STOP")

BIN
jersey-bundle-1.19.1.jar Normal file

Binary file not shown.

17
mysql-submit.sh Executable file
View File

@ -0,0 +1,17 @@
#!/bin/bash
export PYTHONIOENCODING=utf8
/home/faculty/hrucinska/spark/bin/spark-submit\
--master yarn\
--deploy-mode client\
--queue default\
--name "$1"\
--num-executors 10\
--conf "spark.driver.memory=1g"\
--conf "spark.driver.memoryOverhead=600m"\
--conf "spark.executor.memory=600m"\
--conf "spark.executor.memoryOverhead=300m"\
--conf "spark.sql.autoBroadcastJoinThreshold=-1"\
--conf "spark.sql.shuffle.partitions=100"\
--packages mysql:mysql-connector-java:5.1.38 $1

22
mysqlApp.py Normal file
View File

@ -0,0 +1,22 @@
from pyspark.sql import SparkSession
import logging
import datetime
import time
spark = SparkSession.builder.appName("mySqlApp").enableHiveSupport().getOrCreate()
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
try:
logger.info("SPARKAPP START")
cat = spark.sql("select * from hrucinska.uam_categories")
cat.write.mode("overwrite").format("jdbc").options(url ="jdbc:mysql://mysql.wmi.amu.edu.pl/hrucinska_spark_test",driver="com.mysql.jdbc.Driver",user="hrucinska",password="hrucinska", dbtable="uam_categories_mysql").saveAsTable("uam_categories_mysql")
time.sleep(180)
except Exception as inst:
logger.info("SPARKAPP ERROR {0}".format(inst))
finally:
logger.info("SPARKAPP STOP")

20
pyspark.sh Normal file
View File

@ -0,0 +1,20 @@
#!/bin/bash
. /etc/profile
export JAVA_HOME="/usr/lib/jvm/java-8-oracle"
/home/hc_client_bddm_prod/spark-2.3.0-bin-2.6.0-cdh5.6.0/bin/pyspark\
--master yarn\
--deploy-mode client\
--queue bddm_prod_users\
--name "OffersDailyCleanData"\
--num-executors 100\
--conf "spark.driver.memory=1g"\
--conf "spark.yarn.driver.memoryOverhead=600m"\
--conf "spark.executor.memory=2000m"\
--conf "spark.yarn.executor.memoryOverhead=1200m"\
--conf "spark.driver.extraClassPath=/opt/hive_extras/*:/opt/spark_extras/*"\
--conf "spark.yarn.appMasterEnv.JAVA_HOME=/opt/jre1.8.0"\
--conf "spark.executorEnv.JAVA_HOME=/opt/jre1.8.0"\
--packages mysql:mysql-connector-java:5.1.38

45
pythonapp.py Normal file
View File

@ -0,0 +1,45 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
import logging
import datetime
import time
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.enableHiveSupport() \
.getOrCreate()
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
try:
logger.info("SPARKAPP START")
start1 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
cat = spark.sql("select * from hrucinska.uam_categories")
offers = spark.sql("select * from hrucinska.uam_offers")
res = offers.join(cat, cat.category_id == offers.category_leaf)
print res.where(res.category_level2 == "RTV i AGD").count()
stop1 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
start2 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
res = offers.join(broadcast(cat), cat.category_id == offers.category_leaf)
print res.where(res.category_level2 == "RTV i AGD").count()
stop2 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
print "czas rozpoczecia obliczen 1: ", start1
print "czas zakonczenia obliczen1: ", stop1
print "czas rozpoczecia obliczen 2: ", start2
print "czas zakonczenia obliczen 2: ", stop2
time.sleep(180)
except Exception as inst:
logger.info("SPARKAPP ERROR {0}".format(inst))
finally:
logger.info("SPARKAPP STOP")

7
spark-submit.sh Normal file
View File

@ -0,0 +1,7 @@
#!/bin/bash
$HOME/spark/bin/spark-submit\
--master local[*]\
--name "$1"\
--conf "spark.sql.autoBroadcastJoinThreshold=-1"\
--conf "spark.sql.shuffle.partitions=50" $1

19
spark_shell.sh Normal file
View File

@ -0,0 +1,19 @@
#!/bin/bash
. /etc/profile
export JAVA_HOME="/usr/lib/jvm/java-8-oracle"
/home/hc_client_bddm_prod/spark-2.2.0-bin-2.6.0-cdh5.6.0/bin/spark-shell\
--master yarn\
--deploy-mode client\
--queue bddm_prod_users\
--name "OffersDailyCleanData"\
--num-executors 100\
--conf "spark.driver.memory=1g"\
--conf "spark.yarn.driver.memoryOverhead=600m"\
--conf "spark.executor.memory=2000m"\
--conf "spark.yarn.executor.memoryOverhead=1200m"\
--conf "spark.driver.extraClassPath=/opt/hive_extras/*:/opt/spark_extras/*"\
--conf "spark.yarn.appMasterEnv.JAVA_HOME=/opt/jre1.8.0"\
--conf "spark.executorEnv.JAVA_HOME=/opt/jre1.8.0"

21
sparksubmit.sh Normal file
View File

@ -0,0 +1,21 @@
#!/usr/bin/env bash
export HADOOP_CONF_DIR=$HOME/hadoop/conf
export HADOOP_HOME=$HOME/hadoop
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native
export SPARK_JAVA_OPTS="-Dhdp.version=2.6.3.0-235"
$HOME/spark/bin/spark-submit\
--master yarn\
--deploy-mode cluster\
--queue default\
--num-executors 10\
--conf "spark.driver.memory=1g"\
--conf "spark.driver.memoryOverhead=600m"\
--conf "spark.executor.memory=600m"\
--conf "spark.executor.memoryOverhead=300m"\
--name "$1"\
--conf "spark.sql.autoBroadcastJoinThreshold=-1"\
--conf "spark.driver.extraJavaOptions=-Dhdp.version=2.6.3.0-235"\
--conf "spark.yarn.am.extraJavaOptions=-Dhdp.version=2.6.3.0-235"\
--conf "spark.sql.shuffle.partitions=100" $1

21
sparksubmitcache.sh Normal file
View File

@ -0,0 +1,21 @@
#!/usr/bin/env bash
export HADOOP_CONF_DIR=$HOME/hadoop/conf
export HADOOP_HOME=$HOME/hadoop
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native
export SPARK_JAVA_OPTS="-Dhdp.version=2.6.3.0-235"
$HOME/spark/bin/spark-submit\
--master yarn\
--deploy-mode cluster\
--queue default\
--num-executors 10\
--conf "spark.driver.memory=1g"\
--conf "spark.driver.memoryOverhead=600m"\
--conf "spark.executor.memory=2000m"\
--conf "spark.executor.memoryOverhead=300m"\
--name "$1"\
--conf "spark.sql.autoBroadcastJoinThreshold=-1"\
--conf "spark.driver.extraJavaOptions=-Dhdp.version=2.6.3.0-235"\
--conf "spark.yarn.am.extraJavaOptions=-Dhdp.version=2.6.3.0-235"\
--conf "spark.sql.shuffle.partitions=100" $1

View File

@ -0,0 +1,8 @@
#!/bin/bash
$HOME/spark/bin/spark-submit\
--master local[10]\
--name "$1"\
--conf "spark.sql.autoBroadcastJoinThreshold=-1"\
--conf "spark.sql.shuffle.partitions=2" $1

36
too_few_partitions.py Normal file
View File

@ -0,0 +1,36 @@
from pyspark.sql import SparkSession
import logging
import datetime
import time
import pyspark.sql.functions as f
spark = SparkSession \
.builder \
.appName("too_few_partitions") \
.enableHiveSupport() \
.getOrCreate()
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
try:
logger.info("SPARKAPP START")
start = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
orders = spark.sql("select * from hrucinska.uam_orders union all select * from hrucinska.uam_orders union all select * from hrucinska.uam_orders").withColumnRenamed("seller_id", "seller_id_order")
offers = spark.sql("select * from hrucinska.uam_offers union all select * from hrucinska.uam_offers union all select * from hrucinska.uam_offers union all select * from hrucinska.uam_offers").withColumnRenamed("offer_id", "offerId")
user_offers_orders = offers.join(orders, offers.seller_id == orders.seller_id_order, "left")
res = user_offers_orders.groupBy(user_offers_orders.seller_id).agg(f.count(user_offers_orders.offerId).alias("user_offers"), f.count(user_offers_orders.order_offer_id).alias("user_orders"))
res.write.mode("overwrite").saveAsTable("hrucinska.uam_too_few_partitions")
stop = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
print "czas rozpoczecia obliczen 1: ", start
print "czas zakonczenia obliczen1: ", stop
time.sleep(180)
except Exception as inst:
logger.info("SPARKAPP ERROR {0}".format(inst))
raise
finally:
logger.info("SPARKAPP STOP")

25
union.py Normal file
View File

@ -0,0 +1,25 @@
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import time
spark = SparkSession \
.builder \
.appName("union_all") \
.enableHiveSupport() \
.getOrCreate()
pv = spark.sql("select * from hrucinska.uam_offers_pv")
offers = spark.sql("select * from hrucinska.uam_offers")
stock = spark.sql("select * from hrucinska.uam_offers_stock")
o_pv = offers.join(pv, offers.offer_id == pv.pv_offer_id)
res = o_pv.join(stock, o_pv.offer_id == stock.stock_offer_id)
res.write.mode("overwrite").saveAsTable("hrucinska.join_all")
pv = spark.sql("select pv_offer_id as offerId, pv_ct, 0 as initial_quantity, 0 as current_quantity from hrucinska.uam_offers_pv")
stock = spark.sql("select stock_offer_id as offerId, 0 as pv_ct, initial_quantity, current_quantity from hrucinska.uam_offers_stock")
p_s = pv.union(stock)
ps = p_s.groupBy(p_s.offerId).agg(f.sum(p_s.pv_ct).alias("pv_ct"), f.max(p_s.initial_quantity).alias("iq"), f.max(p_s.current_quantity).alias("cq"))
res = offers.join(ps, offers.offer_id == ps.offerId)
res.write.mode("overwrite").saveAsTable("hrucinska.join_all2")
time.sleep(180)