first commit
This commit is contained in:
commit
5baafeb2f4
Binary file not shown.
|
@ -0,0 +1,276 @@
|
|||
# Laboratorium – MapReduce
|
||||
|
||||
Do wykonania ćwiczeń należy skopiować repozytorium:
|
||||
```shell
|
||||
git clone https://git.wmi.amu.edu.pl/bigdata/apache_hadoop
|
||||
```
|
||||
Celem ćwiczenia jest zaprezentowanie aplikacji w oparciu o algorytm MapReduce z wykorzystaniem:
|
||||
|
||||
|
||||
|
||||
* Hadoop Streaming
|
||||
* Apache Hive
|
||||
* Apache Pig
|
||||
* Apache Spark
|
||||
|
||||
WordCount jest „odpowiednikiem Hello World” w świecie Big Data. Ćwiczenie prezentuje algorytm WordCount z wykorzystaniem różnych narzędzi.
|
||||
|
||||
Aby wykonać ćwiczenia, należy skopiować folder _books_ do systemu HDFS:
|
||||
```
|
||||
hdfs dfs -mkdir tmp
|
||||
hdfs dfs -copyFromLocal ~/apache_hadoop/mr/books tmp/books
|
||||
```
|
||||
## 1.WordCount – Hadoop Streaming
|
||||
Hadoop streaming umożliwia użytkownikom wykorzystanie mappera i reducera napisanego w dowolnym języku programowania. Jedynym wymaganiem jest obecność interpretera na każdym z węzłów.
|
||||
|
||||
|
||||
|
||||
### 1.1.Python
|
||||
|
||||
|
||||
#### 1.1.1.Mapper i Reducer
|
||||
Mapper i reducer napisane w języku Python znajdują się w folderze _~/apache_hadoop/mr/python_
|
||||
|
||||
|
||||
|
||||
#### 1.1.2.Uruchomienie algorytmu
|
||||
Aplikację można uruchomić poprzez wykonanie komendy:
|
||||
```
|
||||
bash ~/apache_hadoop/mr/python/wordcount.sh
|
||||
```
|
||||
Uruchom aplikację i wyjaśnij co jest wynikiem działania tego algorytmu?
|
||||
|
||||
|
||||
|
||||
#### 1.1.3.Dane wyjściowe
|
||||
Pliki zawierające wynik działania algorytmu znajdują się w folderze _tmp/python/output_ w systemie HDFS.
|
||||
|
||||
Analizując pliki w tym folderze, odpowiedz na pytanie ile reducerów zostało użytych podczas przebiegu aplikacji?
|
||||
|
||||
|
||||
|
||||
#### 1.1.4.Modyfikacja
|
||||
Zmodyfikuj skrypt _~/apache_hadoop/mr/python/wordcount.sh_ tak, aby użyte zostały 4 reducery.
|
||||
|
||||
|
||||
|
||||
## 2.WordCount – Hive
|
||||
Uruchom klienta Hive w konsoli Linux.
|
||||
|
||||
|
||||
|
||||
### 2.1.HiveQL
|
||||
|
||||
|
||||
#### 2.1.1.Ustawienie parametrów
|
||||
Ustaw parametr USERNAME tak, aby wskazywał nazwę użytkownika i utwórz schemat bazy:
|
||||
|
||||
```sql
|
||||
set USERNAME=<nazwa_użytkownika>
|
||||
create database if not exists ${hiveconf:USERNAME};
|
||||
use ${hiveconf:USERNAME};
|
||||
```
|
||||
|
||||
#### 2.1.2.Utworzenie tabeli doc
|
||||
Utwórz tabelę przechowującą dokumenty tekstowe w postaci wierszowej:
|
||||
|
||||
```sql
|
||||
create table ${hiveconf:USERNAME}.doc(
|
||||
text string
|
||||
) row format delimited fields terminated by '\n'
|
||||
stored as textfile;
|
||||
```
|
||||
|
||||
|
||||
|
||||
#### 2.1.3.Ładowanie danych do tabeli
|
||||
Utwórz zewnętrzną tabelę zawierającą dane z folderu _tmp/books_
|
||||
|
||||
```sql
|
||||
create external table ${hiveconf:USERNAME}.doc_(
|
||||
text string
|
||||
) row format delimited fields terminated by '\n'
|
||||
stored as textfile location '/user/${hiveconf:USERNAME}/tmp/books/';
|
||||
```
|
||||
|
||||
Następnie należy skopiować dane z tabeli _doc__ do _doc_:
|
||||
|
||||
```sql
|
||||
insert into ${hiveconf:USERNAME}.doc select * from ${hiveconf:USERNAME}.doc_;
|
||||
```
|
||||
|
||||
#### 2.1.4.Utworzenie widoku
|
||||
Kolejnym krokiem jest utworzenie widoku, zawierającego słowa wydzielone z krotek tabeli doc:
|
||||
|
||||
```sql
|
||||
CREATE VIEW ${hiveconf:USERNAME}.words
|
||||
AS SELECT cast(word as string) as word FROM (
|
||||
SELECT explode(split(text, ' ')) AS word FROM ${hiveconf:USERNAME}.doc
|
||||
) doc;
|
||||
```
|
||||
|
||||
|
||||
#### 2.1.5.Obliczenie ilości słów:
|
||||
Należy wykonać polecenie:
|
||||
|
||||
```sql
|
||||
select word, count(*) from ${hiveconf:USERNAME}.words group by word;
|
||||
```
|
||||
|
||||
|
||||
#### 2.1.6.Pominięcie widoku
|
||||
Obliczenie ilości słów jest również możliwe z pominięciem utworzenia widoku, a z wykorzystaniem podzapytania:
|
||||
|
||||
|
||||
SELECT word, count(*) from (SELECT explode(split(text, ' ')) AS ${hiveconf:USERNAME}.word FROM doc) words group by word;
|
||||
|
||||
|
||||
|
||||
|
||||
## 3.WordCount – Pig
|
||||
Uruchom Pig w konsoli.
|
||||
|
||||
|
||||
|
||||
### 3.1.Pig Latin
|
||||
|
||||
|
||||
#### 3.1.11.Ładowanie pliku
|
||||
Zdefiniuj zmienną books, ładującą tekst książek:
|
||||
|
||||
```
|
||||
books = LOAD 'tmp/books' USING TextLoader() AS (line:chararray);
|
||||
```
|
||||
|
||||
|
||||
#### 3.1.12.Wydzielenie słów
|
||||
|
||||
Zdefiniuj zmienną words wydzielającą słowa z tekstu:
|
||||
```
|
||||
words = FOREACH books GENERATE FLATTEN(TOKENIZE(line)) as word;
|
||||
```
|
||||
|
||||
|
||||
#### 3.1.13.Grupowanie
|
||||
|
||||
Pogrupój słowa:
|
||||
```
|
||||
grouped = GROUP words BY word;
|
||||
```
|
||||
|
||||
#### 3.1.14.Obliczanie ilości słów
|
||||
|
||||
Policz słowa:
|
||||
```
|
||||
wordcount = FOREACH grouped GENERATE group, COUNT(words);
|
||||
```
|
||||
|
||||
|
||||
#### 3.1.15.Wyświetlenie rezultatów
|
||||
Wyświetl wynik działania algorytmu:
|
||||
|
||||
```
|
||||
DUMP wordcount;
|
||||
```
|
||||
|
||||
|
||||
|
||||
## 4.WordCount Spark
|
||||
|
||||
|
||||
### 4.1.Scala
|
||||
Uruchom spark-shell.
|
||||
|
||||
|
||||
|
||||
#### 4.1.1.Ładowanie pliku
|
||||
Zdefiniuj zmienną ładującą plik z HDFS:
|
||||
|
||||
```
|
||||
val text_file = sc.textFile("tmp/books/")
|
||||
```
|
||||
|
||||
#### 4.1.2.Definicja funkcji MapReduce
|
||||
Spark korzysta z zalet języka Scala i umożliwia utworzenie prostej funkcji:
|
||||
|
||||
```
|
||||
val counts = text_file.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
|
||||
```
|
||||
|
||||
|
||||
#### 4.1.3.Wyświetlenie wyników
|
||||
Aby wyświetlić rezultaty wykonaj:
|
||||
|
||||
```
|
||||
counts.collect().foreach(println)
|
||||
```
|
||||
|
||||
|
||||
|
||||
#### 4.1.4.Zapis rezultatu do pliku
|
||||
Aby zapisać rezultat do pliku wykonaj:
|
||||
|
||||
```
|
||||
counts.saveAsTextFile("tmp/spark-outputs")
|
||||
```
|
||||
|
||||
|
||||
#### 4.1.5.Zapis alternatywny
|
||||
Przy tworzeniu bardziej skomplikowanych funkcji przydatny może okazać się zapis, w którym parametry wejściowe funkcji przyjmują określone nazwy lub chcemy czytać z lokalnego systemu plików/udziału NFS:
|
||||
|
||||
```
|
||||
sc.textFile("file:///home/<nazwa_uzytkownika>/apache_hadoop/mr/books")
|
||||
val counts2 = text_file.
|
||||
flatMap(txt => txt.split(" ")).
|
||||
map(word => (word, 1)).
|
||||
reduceByKey((a, b) => a + b)
|
||||
counts2.collect().foreach(println)
|
||||
counts2.saveAsTextFile("tmp/spark-outputs")
|
||||
```
|
||||
|
||||
|
||||
|
||||
### 5.1.PySpark
|
||||
Uruchom pyspark.
|
||||
|
||||
|
||||
|
||||
#### 5.1.1.Ładowanie pliku
|
||||
Zdefiniuj zmienną ładującą plik z HDFS:
|
||||
|
||||
```
|
||||
text_file = sc.textFile("hdfs:///tmp/books/*")
|
||||
```
|
||||
|
||||
|
||||
#### 5.1.2.Definicja funkcji MapReduce
|
||||
Spark korzysta z zalet języka Scala i umożliwia utworzenie prostej funkcji:
|
||||
|
||||
```
|
||||
counts = text_file.flatMap(lambda line: line.split(" ")) \ \
|
||||
.map(lambda word: (word, 1)) \ \
|
||||
.reduceByKey(lambda a, b: a + b)
|
||||
```
|
||||
|
||||
|
||||
#### 5.1.3.Wyświetlenie wyników
|
||||
Aby wyświetlić rezultaty wykonaj:
|
||||
|
||||
```
|
||||
counts.collect()
|
||||
```
|
||||
|
||||
|
||||
#### 5.1.4.Zapis rezultatu do pliku
|
||||
Aby zapisać rezultat do pliku wykonaj:
|
||||
|
||||
```
|
||||
counts.saveAsTextFile("tmp/pyspark-outputs")
|
||||
```
|
||||
|
||||
## 6. Modyfikacje (zaliczenie)
|
||||
|
||||
Czy potrafisz dokonać modyfikacji? Przykładowe pomysły:
|
||||
- posortuj słowa według ich długości
|
||||
- wyczyść nieznaczące znaki i sprowadź słowa do małych liter
|
||||
- policz wystąpienia słów o określonej ilości znaków
|
Binary file not shown.
|
@ -0,0 +1,8 @@
|
|||
Books are downloaded from
|
||||
wolnelektury.pl
|
||||
and are under CC BY-SA 3.0 license (https://creativecommons.org/licenses/by-sa/3.0/)
|
||||
|
||||
|
||||
python/mapper.py and python/reducer.py are copied from http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
|
||||
|
||||
All other components are licensed under MIT License
|
|
@ -0,0 +1,8 @@
|
|||
Books are downloaded from
|
||||
wolnelektury.pl
|
||||
and are under CC BY-SA 3.0 license (https://creativecommons.org/licenses/by-sa/3.0/)
|
||||
|
||||
|
||||
python/mapper.py and python/reducer.py are copied from http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
|
||||
|
||||
All other components are licensed under MIT License
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,14 @@
|
|||
#!/usr/bin/env python
|
||||
import sys
|
||||
import re
|
||||
|
||||
|
||||
for line in sys.stdin:
|
||||
|
||||
line = line.strip()
|
||||
words = re.findall(r'\b\w+\b', line)
|
||||
|
||||
for word in words:
|
||||
|
||||
word = re.sub(r'[^a-zA-Z0-9]', '', word).lower()
|
||||
print('%s\t%s' % (word, 1))
|
|
@ -0,0 +1,38 @@
|
|||
|
||||
from operator import itemgetter
|
||||
import sys
|
||||
|
||||
current_word = None
|
||||
current_count = 0
|
||||
word = None
|
||||
|
||||
# input comes from STDIN
|
||||
for line in sys.stdin:
|
||||
# remove leading and trailing whitespace
|
||||
line = line.strip()
|
||||
|
||||
# parse the input we got from mapper.py
|
||||
word, count = line.split('\t', 1)
|
||||
|
||||
# convert count (currently a string) to int
|
||||
try:
|
||||
count = int(count)
|
||||
except ValueError:
|
||||
# count was not a number, so silently
|
||||
# ignore/discard this line
|
||||
continue
|
||||
|
||||
# this IF-switch only works because Hadoop sorts map output
|
||||
# by key (here: word) before it is passed to the reducer
|
||||
if current_word == word:
|
||||
current_count += count
|
||||
else:
|
||||
if current_word:
|
||||
# write result to STDOUT
|
||||
print('%s\t%s' % (current_word, current_count))
|
||||
current_count = count
|
||||
current_word = word
|
||||
|
||||
# do not forget to output the last word if needed!
|
||||
if current_word == word:
|
||||
print('%s\t%s' % (current_word, current_count))
|
|
@ -0,0 +1,12 @@
|
|||
hdfs dfs -mkdir -p tmp/python
|
||||
hdfs dfs -rm -r tmp/python/output
|
||||
yarn jar /usr/lib/hadoop/hadoop-streaming.jar \
|
||||
-D mapred.reduce.tasks=2 \
|
||||
-input tmp/books \
|
||||
-output tmp/python/output \
|
||||
-mapper ~/hadoop_zaliczenie/mr/python/mapper.py \
|
||||
-reducer ~/hadoop_zaliczenie/mr/python/reducer.py \
|
||||
-file ~/hadoop_zaliczenie/mr/python/mapper.py \
|
||||
-file ~/hadoop_zaliczenie/mr/python/reducer.py
|
||||
|
||||
hdfs dfs -cat tmp/python/output/part-*
|
|
@ -0,0 +1,17 @@
|
|||
# Instalacja modułu
|
||||
|
||||
Aby zainstalować moduł należy:
|
||||
|
||||
1. Zainicjalizować moduł:
|
||||
```shell
|
||||
cd terraform/dataproc_cluster
|
||||
terraform init
|
||||
```
|
||||
2. Wykonać i zweryfikować plan:
|
||||
```shell
|
||||
terraform plan
|
||||
```
|
||||
3. Zastosować:
|
||||
```shell
|
||||
terraform apply
|
||||
```
|
|
@ -0,0 +1,71 @@
|
|||
provider "google" {
|
||||
project = var.project_id
|
||||
region = var.region
|
||||
}
|
||||
|
||||
data "google_project" "project" {}
|
||||
|
||||
resource "null_resource" "enable_api" {
|
||||
provisioner "local-exec" {
|
||||
command = "gcloud services enable dataproc.googleapis.com"
|
||||
}
|
||||
depends_on = [
|
||||
google_project_iam_member.service-agent-role
|
||||
]
|
||||
}
|
||||
|
||||
resource "google_dataproc_cluster" "sandbox" {
|
||||
name = var.cluster_name
|
||||
project = var.project_id
|
||||
region = var.region
|
||||
|
||||
cluster_config {
|
||||
|
||||
master_config {
|
||||
num_instances = 1
|
||||
machine_type = var.master_machine_type
|
||||
disk_config {
|
||||
boot_disk_size_gb = 30
|
||||
}
|
||||
}
|
||||
|
||||
worker_config {
|
||||
num_instances = 0
|
||||
}
|
||||
|
||||
software_config {
|
||||
override_properties = {
|
||||
"dataproc:dataproc.allow.zero.workers" = "true"
|
||||
}
|
||||
|
||||
optional_components = [
|
||||
"ZEPPELIN",
|
||||
# "JUPYTER",
|
||||
"HIVE_WEBHCAT",
|
||||
# "HUDI"
|
||||
]
|
||||
}
|
||||
|
||||
endpoint_config {
|
||||
enable_http_port_access = "true"
|
||||
}
|
||||
|
||||
lifecycle_config {
|
||||
idle_delete_ttl = var.idle_delete_ttl
|
||||
auto_delete_time = timeadd(timestamp(), var.delete_ttl)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
depends_on = [
|
||||
null_resource.enable_api,
|
||||
google_project_iam_member.service-agent-role
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
resource "google_project_iam_member" "service-agent-role" {
|
||||
member = "serviceAccount:service-${data.google_project.project.number}@dataproc-accounts.iam.gserviceaccount.com"
|
||||
project = data.google_project.project.project_id
|
||||
role = "roles/dataproc.serviceAgent"
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
variable "project_id" {
|
||||
description = "ID projektu Google Cloud Platform"
|
||||
}
|
||||
|
||||
variable "region" {
|
||||
description = "Region, w którym utworzony zostanie klaster"
|
||||
default = "europe-west1"
|
||||
}
|
||||
|
||||
variable "cluster_name" {
|
||||
description = "Nazwa klastra Dataproc"
|
||||
default = "hadoop-sandbox"
|
||||
}
|
||||
|
||||
variable "master_machine_type" {
|
||||
description = "Typ maszyny dla węzła głównego"
|
||||
default = "n2-standard-4"
|
||||
}
|
||||
|
||||
variable "idle_delete_ttl" {
|
||||
description = "Czas (w sekundach) po którym klaster zostanie usunięty po bezczynności. Domyślnie 30m."
|
||||
default = "18000s"
|
||||
}
|
||||
|
||||
variable "delete_ttl" {
|
||||
description = "Czas (w formacie duration) po którym klaster zostanie usunięty po jego utworzeniu. Domyślnie 180m."
|
||||
default = "180m"
|
||||
}
|
Loading…
Reference in New Issue