This commit is contained in:
Paweł Skurzyński 2024-04-05 17:14:07 +02:00
commit 75238b5071
17 changed files with 30742 additions and 0 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

BIN
lab/.DS_Store vendored Normal file

Binary file not shown.

276
lab/LAB_01.md Normal file
View File

@ -0,0 +1,276 @@
# Laboratorium MapReduce
Do wykonania ćwiczeń należy skopiować repozytorium:
```shell
git clone https://git.wmi.amu.edu.pl/s1201683/hadoop_zaliczenie
```
Celem ćwiczenia jest zaprezentowanie aplikacji w oparciu o algorytm MapReduce z wykorzystaniem:
* Hadoop Streaming
* Apache Hive
* Apache Pig
* Apache Spark
WordCount jest „odpowiednikiem Hello World” w świecie Big Data. Ćwiczenie prezentuje algorytm WordCount z wykorzystaniem różnych narzędzi.
Aby wykonać ćwiczenia, należy skopiować folder _books_ do systemu HDFS:
```
hdfs dfs -mkdir -p tmp
hdfs dfs -copyFromLocal ~/hadoop_zaliczenie/mr/books tmp/books
```
## 1.WordCount Hadoop Streaming
Hadoop streaming umożliwia użytkownikom wykorzystanie mappera i reducera napisanego w dowolnym języku programowania. Jedynym wymaganiem jest obecność interpretera na każdym z węzłów.
### 1.1.Python
#### 1.1.1.Mapper i Reducer
Mapper i reducer napisane w języku Python znajdują się w folderze _~/apache_hadoop/mr/python_
#### 1.1.2.Uruchomienie algorytmu
Aplikację można uruchomić poprzez wykonanie komendy:
```
bash ~/hadoop_zaliczenie/mr/python/wordcount.sh
```
Uruchom aplikację i wyjaśnij co jest wynikiem działania tego algorytmu?
#### 1.1.3.Dane wyjściowe
Pliki zawierające wynik działania algorytmu znajdują się w folderze _tmp/python/output_ w systemie HDFS.
Analizując pliki w tym folderze, odpowiedz na pytanie ile reducerów zostało użytych podczas przebiegu aplikacji?
#### 1.1.4.Modyfikacja
Zmodyfikuj skrypt _~/apache_hadoop/mr/python/wordcount.sh_ tak, aby użyte zostały 4 reducery.
## 2.WordCount Hive
Uruchom klienta Hive w konsoli Linux.
### 2.1.HiveQL
#### 2.1.1.Ustawienie parametrów
Ustaw parametr USERNAME tak, aby wskazywał nazwę użytkownika i utwórz schemat bazy:
```sql
set USERNAME=<nazwa_użytkownika>
create database if not exists ${hiveconf:USERNAME};
use ${hiveconf:USERNAME};
```
#### 2.1.2.Utworzenie tabeli doc
Utwórz tabelę przechowującą dokumenty tekstowe w postaci wierszowej:
```sql
create table ${hiveconf:USERNAME}.doc(
text string
) row format delimited fields terminated by '\n'
stored as textfile;
```
#### 2.1.3.Ładowanie danych do tabeli
Utwórz zewnętrzną tabelę zawierającą dane z folderu _tmp/books_
```sql
create external table ${hiveconf:USERNAME}.doc_(
text string
) row format delimited fields terminated by '\n'
stored as textfile location '/user/${hiveconf:USERNAME}/tmp/books/';
```
Następnie należy skopiować dane z tabeli _doc__ do _doc_:
```sql
insert into ${hiveconf:USERNAME}.doc select * from ${hiveconf:USERNAME}.doc_;
```
#### 2.1.4.Utworzenie widoku
Kolejnym krokiem jest utworzenie widoku, zawierającego słowa wydzielone z krotek tabeli doc:
```sql
CREATE VIEW ${hiveconf:USERNAME}.words
AS SELECT cast(word as string) as word FROM (
SELECT explode(split(text, ' ')) AS word FROM ${hiveconf:USERNAME}.doc
) doc;
```
#### 2.1.5.Obliczenie ilości słów:
Należy wykonać polecenie:
```sql
select word, count(*) from ${hiveconf:USERNAME}.words group by word;
```
#### 2.1.6.Pominięcie widoku
Obliczenie ilości słów jest również możliwe z pominięciem utworzenia widoku, a z wykorzystaniem podzapytania:
SELECT word, count(*) from (SELECT explode(split(text, ' ')) AS ${hiveconf:USERNAME}.word FROM doc) words group by word;
## 3.WordCount Pig
Uruchom Pig w konsoli.
### 3.1.Pig Latin
#### 3.1.11.Ładowanie pliku
Zdefiniuj zmienną books, ładującą tekst książek:
```
books = LOAD 'tmp/books' USING TextLoader() AS (line:chararray);
```
#### 3.1.12.Wydzielenie słów
Zdefiniuj zmienną words wydzielającą słowa z tekstu:
```
words = FOREACH books GENERATE FLATTEN(TOKENIZE(line)) as word;
```
#### 3.1.13.Grupowanie
Pogrupój słowa:
```
grouped = GROUP words BY word;
```
#### 3.1.14.Obliczanie ilości słów
Policz słowa:
```
wordcount = FOREACH grouped GENERATE group, COUNT(words);
```
#### 3.1.15.Wyświetlenie rezultatów
Wyświetl wynik działania algorytmu:
```
DUMP wordcount;
```
## 4.WordCount Spark
### 4.1.Scala
Uruchom spark-shell.
#### 4.1.1.Ładowanie pliku
Zdefiniuj zmienną ładującą plik z HDFS:
```
val text_file = sc.textFile("tmp/books/")
```
#### 4.1.2.Definicja funkcji MapReduce
Spark korzysta z zalet języka Scala i umożliwia utworzenie prostej funkcji:
```
val counts = text_file.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
```
#### 4.1.3.Wyświetlenie wyników
Aby wyświetlić rezultaty wykonaj:
```
counts.collect().foreach(println)
```
#### 4.1.4.Zapis rezultatu do pliku
Aby zapisać rezultat do pliku wykonaj:
```
counts.saveAsTextFile("tmp/spark-outputs")
```
#### 4.1.5.Zapis alternatywny
Przy tworzeniu bardziej skomplikowanych funkcji przydatny może okazać się zapis, w którym parametry wejściowe funkcji przyjmują określone nazwy lub chcemy czytać z lokalnego systemu plików/udziału NFS:
```
sc.textFile("file:///home/<nazwa_uzytkownika>/apache_hadoop/mr/books")
val counts2 = text_file.
flatMap(txt => txt.split(" ")).
map(word => (word, 1)).
reduceByKey((a, b) => a + b)
counts2.collect().foreach(println)
counts2.saveAsTextFile("tmp/spark-outputs")
```
### 5.1.PySpark
Uruchom pyspark.
#### 5.1.1.Ładowanie pliku
Zdefiniuj zmienną ładującą plik z HDFS:
```
text_file = sc.textFile("hdfs:///tmp/books/*")
```
#### 5.1.2.Definicja funkcji MapReduce
Spark korzysta z zalet języka Scala i umożliwia utworzenie prostej funkcji:
```
counts = text_file.flatMap(lambda line: line.split(" ")) \ \
.map(lambda word: (word, 1)) \ \
.reduceByKey(lambda a, b: a + b)
```
#### 5.1.3.Wyświetlenie wyników
Aby wyświetlić rezultaty wykonaj:
```
counts.collect()
```
#### 5.1.4.Zapis rezultatu do pliku
Aby zapisać rezultat do pliku wykonaj:
```
counts.saveAsTextFile("tmp/pyspark-outputs")
```
## 6. Modyfikacje (zaliczenie)
Czy potrafisz dokonać modyfikacji? Przykładowe pomysły:
- posortuj słowa według ich długości
- wyczyść nieznaczące znaki i sprowadź słowa do małych liter
- policz wystąpienia słów o określonej ilości znaków

BIN
mr/.DS_Store vendored Normal file

Binary file not shown.

8
mr/LICENSE Normal file
View File

@ -0,0 +1,8 @@
Books are downloaded from
wolnelektury.pl
and are under CC BY-SA 3.0 license (https://creativecommons.org/licenses/by-sa/3.0/)
python/mapper.py and python/reducer.py are copied from http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
All other components are licensed under MIT License

8
mr/README Normal file
View File

@ -0,0 +1,8 @@
Books are downloaded from
wolnelektury.pl
and are under CC BY-SA 3.0 license (https://creativecommons.org/licenses/by-sa/3.0/)
python/mapper.py and python/reducer.py are copied from http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
All other components are licensed under MIT License

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

BIN
mr/python/.DS_Store vendored Normal file

Binary file not shown.

18
mr/python/mapper.py Normal file
View File

@ -0,0 +1,18 @@
#!/usr/bin/env python
import sys
import re
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = re.findall(r'\b\w+\b', line) # using regex to find words
# increase counters
for word in words:
# apply regex to remove non-alphanumeric characters and convert to lowercase
word = re.sub(r'[^a-zA-Z0-9]', '', word).lower()
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
print('%s\t%s' % (word, 1))

46
mr/python/reducer.py Normal file
View File

@ -0,0 +1,46 @@
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
results = []
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
#print('%s\t%s' % (current_word, current_count))
results.append((current_word, len(current_word), current_count))
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
#print('%s\t%s' % (current_word, current_count))
results.append((current_word, len(current_word), current_count))
results.sort(key=lambda x: x[1])
for result in results:
print('%s\t%s' % (result[0], result[2]))

12
mr/python/wordcount.sh Normal file
View File

@ -0,0 +1,12 @@
hdfs dfs -mkdir -p tmp/python
hdfs dfs -rm -r tmp/python/output
yarn jar /usr/lib/hadoop/hadoop-streaming.jar \
-D mapred.reduce.tasks=2 \
-input tmp/books \
-output tmp/python/output \
-mapper ~/hadoop_zaliczenie/mr/python/mapper.py \
-reducer ~/hadoop_zaliczenie/mr/python/reducer.py \
-file ~/hadoop_zaliczenie/mr/python/mapper.py \
-file ~/hadoop_zaliczenie/mr/python/reducer.py
hdfs dfs -cat tmp/python/output/part-*

17
terraform/README.md Normal file
View File

@ -0,0 +1,17 @@
# Instalacja modułu
Aby zainstalować moduł należy:
1. Zainicjalizować moduł:
```shell
cd terraform/dataproc_cluster
terraform init
```
2. Wykonać i zweryfikować plan:
```shell
terraform plan
```
3. Zastosować:
```shell
terraform apply
```

View File

@ -0,0 +1,71 @@
provider "google" {
project = var.project_id
region = var.region
}
data "google_project" "project" {}
resource "null_resource" "enable_api" {
provisioner "local-exec" {
command = "gcloud services enable dataproc.googleapis.com"
}
depends_on = [
google_project_iam_member.service-agent-role
]
}
resource "google_dataproc_cluster" "sandbox" {
name = var.cluster_name
project = var.project_id
region = var.region
cluster_config {
master_config {
num_instances = 1
machine_type = var.master_machine_type
disk_config {
boot_disk_size_gb = 30
}
}
worker_config {
num_instances = 0
}
software_config {
override_properties = {
"dataproc:dataproc.allow.zero.workers" = "true"
}
optional_components = [
"ZEPPELIN",
# "JUPYTER",
"HIVE_WEBHCAT",
# "HUDI"
]
}
endpoint_config {
enable_http_port_access = "true"
}
lifecycle_config {
idle_delete_ttl = var.idle_delete_ttl
auto_delete_time = timeadd(timestamp(), var.delete_ttl)
}
}
depends_on = [
null_resource.enable_api,
google_project_iam_member.service-agent-role
]
}
resource "google_project_iam_member" "service-agent-role" {
member = "serviceAccount:service-${data.google_project.project.number}@dataproc-accounts.iam.gserviceaccount.com"
project = data.google_project.project.project_id
role = "roles/dataproc.serviceAgent"
}

View File

@ -0,0 +1,28 @@
variable "project_id" {
description = "ID projektu Google Cloud Platform"
}
variable "region" {
description = "Region, w którym utworzony zostanie klaster"
default = "europe-west1"
}
variable "cluster_name" {
description = "Nazwa klastra Dataproc"
default = "hadoop-sandbox"
}
variable "master_machine_type" {
description = "Typ maszyny dla węzła głównego"
default = "n2-standard-4"
}
variable "idle_delete_ttl" {
description = "Czas (w sekundach) po którym klaster zostanie usunięty po bezczynności. Domyślnie 30m."
default = "18000s"
}
variable "delete_ttl" {
description = "Czas (w formacie duration) po którym klaster zostanie usunięty po jego utworzeniu. Domyślnie 180m."
default = "180m"
}