commit d149f7d56e78b757aabeb5e7b415934976839b27 Author: Hanna Rucinska Date: Sun May 27 12:25:47 2018 +0200 cwiczenia diff --git a/broadcast.py b/broadcast.py new file mode 100644 index 0000000..96299b3 --- /dev/null +++ b/broadcast.py @@ -0,0 +1,45 @@ +from pyspark.sql import SparkSession +from pyspark.sql.functions import broadcast +import logging +import datetime +import time + +spark = SparkSession \ + .builder \ + .appName("broadcast") \ + .enableHiveSupport() \ + .getOrCreate() + +log4jLogger = spark._jvm.org.apache.log4j +logger = log4jLogger.LogManager.getLogger(__name__) + +try: + logger.info("SPARKAPP START") + + start1 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + + cat = spark.sql("select * from hrucinska.uam_categories") + offers = spark.sql("select * from hrucinska.uam_offers") + res = offers.join(cat, cat.category_id == offers.category_leaf) + print res.where(res.category_level2 == "RTV i AGD").count() + + stop1 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + start2 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + + res = offers.join(broadcast(cat), cat.category_id == offers.category_leaf) + print res.where(res.category_level2 == "RTV i AGD").count() + + stop2 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + + print "czas rozpoczecia obliczen 1: ", start1 + print "czas zakonczenia obliczen1: ", stop1 + + print "czas rozpoczecia obliczen 2: ", start2 + print "czas zakonczenia obliczen 2: ", stop2 + + time.sleep(180) + +except Exception as inst: + logger.info("SPARKAPP ERROR {0}".format(inst)) +finally: + logger.info("SPARKAPP STOP") diff --git a/cache.py b/cache.py new file mode 100644 index 0000000..39f2cc7 --- /dev/null +++ b/cache.py @@ -0,0 +1,25 @@ +from pyspark.sql import SparkSession +import pyspark.sql.functions as f +import time + +spark = SparkSession \ + .builder \ + .appName("union_all") \ + .enableHiveSupport() \ + .getOrCreate() + +pv = spark.sql("select * from hrucinska.uam_offers_pv") +offers = spark.sql("select * from hrucinska.uam_offers").cache() +stock = spark.sql("select * from hrucinska.uam_offers_stock") +o_pv = offers.join(pv, offers.offer_id == pv.pv_offer_id) +res = o_pv.join(stock, o_pv.offer_id == stock.stock_offer_id) +res.write.mode("overwrite").saveAsTable("hrucinska.join_all") + +pv = spark.sql("select pv_offer_id as offerId, pv_ct, 0 as initial_quantity, 0 as current_quantity from hrucinska.uam_offers_pv") +stock = spark.sql("select stock_offer_id as offerId, 0 as pv_ct, initial_quantity, current_quantity from hrucinska.uam_offers_stock") +p_s = pv.union(stock) +ps = p_s.groupBy(p_s.offerId).agg(f.sum(p_s.pv_ct).alias("pv_ct"), f.max(p_s.initial_quantity).alias("iq"), f.max(p_s.current_quantity).alias("cq")) +res = offers.join(ps, offers.offer_id == ps.offerId) +res.write.mode("overwrite").saveAsTable("hrucinska.join_all2") + +time.sleep(180) diff --git a/configuration.py b/configuration.py new file mode 100644 index 0000000..e9ac3d0 --- /dev/null +++ b/configuration.py @@ -0,0 +1,35 @@ +from pyspark.sql import SparkSession +import logging +import datetime +import time + +spark = SparkSession \ + .builder \ + .appName("configuration_test") \ + .enableHiveSupport() \ + .getOrCreate() + +log4jLogger = spark._jvm.org.apache.log4j +logger = log4jLogger.LogManager.getLogger(__name__) + +try: + logger.info("SPARKAPP START") + + start = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + + cat = spark.sql("select * from hrucinska.uam_categories") + offers = spark.sql("select * from hrucinska.uam_offers") + res = offers.join(cat, cat.category_id == offers.category_leaf) + print res.where(res.category_level2 == "RTV i AGD").count() + + stop = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + + print "czas rozpoczecia obliczen 1: ", start + print "czas zakonczenia obliczen1: ", stop + + time.sleep(180) + +except Exception as inst: + logger.info("SPARKAPP ERROR {0}".format(inst)) +finally: + logger.info("SPARKAPP STOP") diff --git a/data_skew.py b/data_skew.py new file mode 100644 index 0000000..2b44fd1 --- /dev/null +++ b/data_skew.py @@ -0,0 +1,37 @@ +from pyspark.sql import SparkSession +import logging +import datetime +import time +import pyspark.sql.functions as f + +spark = SparkSession \ + .builder \ + .appName("too_few_partitions") \ + .enableHiveSupport() \ + .getOrCreate() + +log4jLogger = spark._jvm.org.apache.log4j +logger = log4jLogger.LogManager.getLogger(__name__) + +try: + logger.info("SPARKAPP START") + start = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + + orders = spark.sql("select * from hrucinska.uam_orders").withColumnRenamed("seller_id", "seller_id_order") + offers = spark.sql("select * from hrucinska.uam_offers").withColumnRenamed("offer_id", "offerId") + user_offers_orders = offers.join(orders, offers.seller_id == orders.seller_id_order, "left") + res = user_offers_orders.groupBy(user_offers_orders.seller_id).agg(f.count(user_offers_orders.offerId).alias("user_offers"), f.count(user_offers_orders.order_offer_id).alias("user_orders")) + res.write.mode("overwrite").saveAsTable("hrucinska.data_skew") + + stop = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + + print "czas rozpoczecia obliczen 1: ", start + print "czas zakonczenia obliczen1: ", stop + + time.sleep(180) + +except Exception as inst: + logger.info("SPARKAPP ERROR {0}".format(inst)) + raise +finally: + logger.info("SPARKAPP STOP") diff --git a/data_skew2.py b/data_skew2.py new file mode 100644 index 0000000..3c8dee0 --- /dev/null +++ b/data_skew2.py @@ -0,0 +1,40 @@ +from pyspark.sql import SparkSession +import logging +import datetime +import time +import pyspark.sql.functions as f + +spark = SparkSession \ + .builder \ + .appName("too_few_partitions") \ + .enableHiveSupport() \ + .getOrCreate() + +log4jLogger = spark._jvm.org.apache.log4j +logger = log4jLogger.LogManager.getLogger(__name__) + +try: + logger.info("SPARKAPP START") + + start = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + + orders_skew = spark.sql("select * from hrucinska.uam_orders") + orders_partial_reduce = orders_skew.groupBy("seller_id", "buyer_id").agg(f.count("order_offer_id").alias("order_offer_id")) + orders = orders_partial_reduce.groupBy("seller_id").agg(f.count("order_offer_id").alias("order_offer_id")).withColumnRenamed("seller_id", "seller_id_order") + offers = spark.sql("select * from hrucinska.uam_offers") + user_offers_orders = offers.join(orders, offers.seller_id == orders.seller_id_order, "left") + res = user_offers_orders.groupBy(user_offers_orders.seller_id).agg(f.count(user_offers_orders.offer_id).alias("user_offers"), f.count(user_offers_orders.order_offer_id).alias("user_orders")) + res.write.mode("overwrite").saveAsTable("hrucinska.data_skew2") + + stop = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + + print "czas rozpoczecia obliczen2: ", start + print "czas zakonczenia obliczen2: ", stop + + time.sleep(180) + +except Exception as inst: + logger.info("SPARKAPP ERROR {0}".format(inst)) + raise +finally: + logger.info("SPARKAPP STOP") diff --git a/jersey-bundle-1.19.1.jar b/jersey-bundle-1.19.1.jar new file mode 100644 index 0000000..2547103 Binary files /dev/null and b/jersey-bundle-1.19.1.jar differ diff --git a/mysql-submit.sh b/mysql-submit.sh new file mode 100755 index 0000000..1940cd7 --- /dev/null +++ b/mysql-submit.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +export PYTHONIOENCODING=utf8 + +/home/faculty/hrucinska/spark/bin/spark-submit\ + --master yarn\ + --deploy-mode client\ + --queue default\ + --name "$1"\ + --num-executors 10\ + --conf "spark.driver.memory=1g"\ + --conf "spark.driver.memoryOverhead=600m"\ + --conf "spark.executor.memory=600m"\ + --conf "spark.executor.memoryOverhead=300m"\ + --conf "spark.sql.autoBroadcastJoinThreshold=-1"\ + --conf "spark.sql.shuffle.partitions=100"\ + --packages mysql:mysql-connector-java:5.1.38 $1 diff --git a/mysqlApp.py b/mysqlApp.py new file mode 100644 index 0000000..02864d2 --- /dev/null +++ b/mysqlApp.py @@ -0,0 +1,22 @@ +from pyspark.sql import SparkSession +import logging +import datetime +import time + +spark = SparkSession.builder.appName("mySqlApp").enableHiveSupport().getOrCreate() + +log4jLogger = spark._jvm.org.apache.log4j +logger = log4jLogger.LogManager.getLogger(__name__) + +try: + logger.info("SPARKAPP START") + + cat = spark.sql("select * from hrucinska.uam_categories") + cat.write.mode("overwrite").format("jdbc").options(url ="jdbc:mysql://mysql.wmi.amu.edu.pl/hrucinska_spark_test",driver="com.mysql.jdbc.Driver",user="hrucinska",password="hrucinska", dbtable="uam_categories_mysql").saveAsTable("uam_categories_mysql") + + time.sleep(180) + +except Exception as inst: + logger.info("SPARKAPP ERROR {0}".format(inst)) +finally: + logger.info("SPARKAPP STOP") diff --git a/pyspark.sh b/pyspark.sh new file mode 100644 index 0000000..b14b034 --- /dev/null +++ b/pyspark.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +. /etc/profile + +export JAVA_HOME="/usr/lib/jvm/java-8-oracle" + +/home/hc_client_bddm_prod/spark-2.3.0-bin-2.6.0-cdh5.6.0/bin/pyspark\ + --master yarn\ + --deploy-mode client\ + --queue bddm_prod_users\ + --name "OffersDailyCleanData"\ + --num-executors 100\ + --conf "spark.driver.memory=1g"\ + --conf "spark.yarn.driver.memoryOverhead=600m"\ + --conf "spark.executor.memory=2000m"\ + --conf "spark.yarn.executor.memoryOverhead=1200m"\ + --conf "spark.driver.extraClassPath=/opt/hive_extras/*:/opt/spark_extras/*"\ + --conf "spark.yarn.appMasterEnv.JAVA_HOME=/opt/jre1.8.0"\ + --conf "spark.executorEnv.JAVA_HOME=/opt/jre1.8.0"\ + --packages mysql:mysql-connector-java:5.1.38 diff --git a/pythonapp.py b/pythonapp.py new file mode 100644 index 0000000..259b55f --- /dev/null +++ b/pythonapp.py @@ -0,0 +1,45 @@ +from pyspark.sql import SparkSession +from pyspark.sql.functions import broadcast +import logging +import datetime +import time + +spark = SparkSession \ + .builder \ + .appName("Python Spark SQL basic example") \ + .enableHiveSupport() \ + .getOrCreate() + +log4jLogger = spark._jvm.org.apache.log4j +logger = log4jLogger.LogManager.getLogger(__name__) + +try: + logger.info("SPARKAPP START") + + start1 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + + cat = spark.sql("select * from hrucinska.uam_categories") + offers = spark.sql("select * from hrucinska.uam_offers") + res = offers.join(cat, cat.category_id == offers.category_leaf) + print res.where(res.category_level2 == "RTV i AGD").count() + + stop1 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + start2 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + + res = offers.join(broadcast(cat), cat.category_id == offers.category_leaf) + print res.where(res.category_level2 == "RTV i AGD").count() + + stop2 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + + print "czas rozpoczecia obliczen 1: ", start1 + print "czas zakonczenia obliczen1: ", stop1 + + print "czas rozpoczecia obliczen 2: ", start2 + print "czas zakonczenia obliczen 2: ", stop2 + + time.sleep(180) + +except Exception as inst: + logger.info("SPARKAPP ERROR {0}".format(inst)) +finally: + logger.info("SPARKAPP STOP") diff --git a/spark-submit.sh b/spark-submit.sh new file mode 100644 index 0000000..2af30ab --- /dev/null +++ b/spark-submit.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +$HOME/spark/bin/spark-submit\ + --master local[*]\ + --name "$1"\ + --conf "spark.sql.autoBroadcastJoinThreshold=-1"\ + --conf "spark.sql.shuffle.partitions=50" $1 diff --git a/spark_shell.sh b/spark_shell.sh new file mode 100644 index 0000000..02ddb6b --- /dev/null +++ b/spark_shell.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +. /etc/profile + +export JAVA_HOME="/usr/lib/jvm/java-8-oracle" + +/home/hc_client_bddm_prod/spark-2.2.0-bin-2.6.0-cdh5.6.0/bin/spark-shell\ + --master yarn\ + --deploy-mode client\ + --queue bddm_prod_users\ + --name "OffersDailyCleanData"\ + --num-executors 100\ + --conf "spark.driver.memory=1g"\ + --conf "spark.yarn.driver.memoryOverhead=600m"\ + --conf "spark.executor.memory=2000m"\ + --conf "spark.yarn.executor.memoryOverhead=1200m"\ + --conf "spark.driver.extraClassPath=/opt/hive_extras/*:/opt/spark_extras/*"\ + --conf "spark.yarn.appMasterEnv.JAVA_HOME=/opt/jre1.8.0"\ + --conf "spark.executorEnv.JAVA_HOME=/opt/jre1.8.0" diff --git a/sparksubmit.sh b/sparksubmit.sh new file mode 100644 index 0000000..f2853bd --- /dev/null +++ b/sparksubmit.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +export HADOOP_CONF_DIR=$HOME/hadoop/conf +export HADOOP_HOME=$HOME/hadoop +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native +export SPARK_JAVA_OPTS="-Dhdp.version=2.6.3.0-235" + +$HOME/spark/bin/spark-submit\ + --master yarn\ + --deploy-mode cluster\ + --queue default\ + --num-executors 10\ + --conf "spark.driver.memory=1g"\ + --conf "spark.driver.memoryOverhead=600m"\ + --conf "spark.executor.memory=600m"\ + --conf "spark.executor.memoryOverhead=300m"\ + --name "$1"\ + --conf "spark.sql.autoBroadcastJoinThreshold=-1"\ + --conf "spark.driver.extraJavaOptions=-Dhdp.version=2.6.3.0-235"\ + --conf "spark.yarn.am.extraJavaOptions=-Dhdp.version=2.6.3.0-235"\ + --conf "spark.sql.shuffle.partitions=100" $1 diff --git a/sparksubmitcache.sh b/sparksubmitcache.sh new file mode 100644 index 0000000..49ee6a0 --- /dev/null +++ b/sparksubmitcache.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +export HADOOP_CONF_DIR=$HOME/hadoop/conf +export HADOOP_HOME=$HOME/hadoop +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native +export SPARK_JAVA_OPTS="-Dhdp.version=2.6.3.0-235" + +$HOME/spark/bin/spark-submit\ + --master yarn\ + --deploy-mode cluster\ + --queue default\ + --num-executors 10\ + --conf "spark.driver.memory=1g"\ + --conf "spark.driver.memoryOverhead=600m"\ + --conf "spark.executor.memory=2000m"\ + --conf "spark.executor.memoryOverhead=300m"\ + --name "$1"\ + --conf "spark.sql.autoBroadcastJoinThreshold=-1"\ + --conf "spark.driver.extraJavaOptions=-Dhdp.version=2.6.3.0-235"\ + --conf "spark.yarn.am.extraJavaOptions=-Dhdp.version=2.6.3.0-235"\ + --conf "spark.sql.shuffle.partitions=100" $1 diff --git a/too_few_partitions-submit_bad.sh b/too_few_partitions-submit_bad.sh new file mode 100644 index 0000000..a3fa973 --- /dev/null +++ b/too_few_partitions-submit_bad.sh @@ -0,0 +1,8 @@ +#!/bin/bash + + +$HOME/spark/bin/spark-submit\ + --master local[10]\ + --name "$1"\ + --conf "spark.sql.autoBroadcastJoinThreshold=-1"\ + --conf "spark.sql.shuffle.partitions=2" $1 diff --git a/too_few_partitions.py b/too_few_partitions.py new file mode 100644 index 0000000..b0f9fe5 --- /dev/null +++ b/too_few_partitions.py @@ -0,0 +1,36 @@ +from pyspark.sql import SparkSession +import logging +import datetime +import time +import pyspark.sql.functions as f + +spark = SparkSession \ + .builder \ + .appName("too_few_partitions") \ + .enableHiveSupport() \ + .getOrCreate() + +log4jLogger = spark._jvm.org.apache.log4j +logger = log4jLogger.LogManager.getLogger(__name__) + +try: + logger.info("SPARKAPP START") + start = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + + orders = spark.sql("select * from hrucinska.uam_orders union all select * from hrucinska.uam_orders union all select * from hrucinska.uam_orders").withColumnRenamed("seller_id", "seller_id_order") + offers = spark.sql("select * from hrucinska.uam_offers union all select * from hrucinska.uam_offers union all select * from hrucinska.uam_offers union all select * from hrucinska.uam_offers").withColumnRenamed("offer_id", "offerId") + user_offers_orders = offers.join(orders, offers.seller_id == orders.seller_id_order, "left") + res = user_offers_orders.groupBy(user_offers_orders.seller_id).agg(f.count(user_offers_orders.offerId).alias("user_offers"), f.count(user_offers_orders.order_offer_id).alias("user_orders")) + res.write.mode("overwrite").saveAsTable("hrucinska.uam_too_few_partitions") + + stop = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') + + print "czas rozpoczecia obliczen 1: ", start + print "czas zakonczenia obliczen1: ", stop + time.sleep(180) + +except Exception as inst: + logger.info("SPARKAPP ERROR {0}".format(inst)) + raise +finally: + logger.info("SPARKAPP STOP") diff --git a/union.py b/union.py new file mode 100644 index 0000000..70a806e --- /dev/null +++ b/union.py @@ -0,0 +1,25 @@ +from pyspark.sql import SparkSession +import pyspark.sql.functions as f +import time + +spark = SparkSession \ + .builder \ + .appName("union_all") \ + .enableHiveSupport() \ + .getOrCreate() + +pv = spark.sql("select * from hrucinska.uam_offers_pv") +offers = spark.sql("select * from hrucinska.uam_offers") +stock = spark.sql("select * from hrucinska.uam_offers_stock") +o_pv = offers.join(pv, offers.offer_id == pv.pv_offer_id) +res = o_pv.join(stock, o_pv.offer_id == stock.stock_offer_id) +res.write.mode("overwrite").saveAsTable("hrucinska.join_all") + +pv = spark.sql("select pv_offer_id as offerId, pv_ct, 0 as initial_quantity, 0 as current_quantity from hrucinska.uam_offers_pv") +stock = spark.sql("select stock_offer_id as offerId, 0 as pv_ct, initial_quantity, current_quantity from hrucinska.uam_offers_stock") +p_s = pv.union(stock) +ps = p_s.groupBy(p_s.offerId).agg(f.sum(p_s.pv_ct).alias("pv_ct"), f.max(p_s.initial_quantity).alias("iq"), f.max(p_s.current_quantity).alias("cq")) +res = offers.join(ps, offers.offer_id == ps.offerId) +res.write.mode("overwrite").saveAsTable("hrucinska.join_all2") + +time.sleep(180)