1
0
mirror of https://github.com/andre-wojtowicz/r-distributed-computing synced 2024-12-21 17:40:28 +01:00

Added core files

This commit is contained in:
Andrzej Wójtowicz 2016-11-08 13:49:28 +01:00
parent 9e72fcf957
commit b62bd25743
10 changed files with 974 additions and 0 deletions

4
.gitignore vendored
View File

@ -31,3 +31,7 @@ vignettes/*.pdf
# Temporary files created by R markdown
*.utf8.md
*.knit.md
.Rproj.user
logs/*
.R.user

54
config-parallel.R Normal file
View File

@ -0,0 +1,54 @@
# ---- config-parallel ----
PARALLEL.USED.METHOD = "REMOTE" # LOCAL or REMOTE
PARALLEL.DISABLE.MKL.THREADS = TRUE
PARALLEL.NO.JOBS.PER.CHUNK = 30 # no. jobs per worker in step 6-2
PARALLEL.NO.JOBS.MULTIPLIER = 2 # no. jobs multiplier per job sequence
# to minimize worker time of waitiing for
# remaining workers in step 6-2
# local
PARALLEL.LOCAL.METHOD = "PSOCK"
PARALLEL.LOCAL.NODES = parallel::detectCores(logical = FALSE)
PARALLEL.LOCAL.CONNECTION.TIMEOUT = 5
PARALLEL.LOCAL.SLAVE.OUT.FILE =
lazy(file.path(LOGGER.OUTPUT.DIR,
paste0("worker-local-", worker.name, "-", worker.id, ".log")))
# remote
PARALLEL.REMOTE.METHOD = "PSOCK"
PARALLEL.REMOTE.MASTER.IP = "192.168.0.1" # ip accessible from slaves
PARALLEL.REMOTE.MASTER.PORT = 11000
PARALLEL.REMOTE.MASTER.CONNECTION.TIMEOUT = 10
PARALLEL.REMOTE.MASTER.SSH.PROGRAM = "ssh" # ssh, ssh.exe, etc.
PARALLEL.REMOTE.MASTER.SSH.PRIV.KEY = file.path("ssh", "rsa-priv.key")
PARALLEL.REMOTE.MASTER.SSH.NULL.DEV = "/dev/null"
PARALLEL.REMOTE.MASTER.SSH.STRICT.HOST.KEY.CHECKING = "no"
PARALLEL.REMOTE.MASTER.SSH.SERVER.ALIVE.INTERVAL = 30
PARALLEL.REMOTE.MASTER.SHELL.CMD =
paste0(
PARALLEL.REMOTE.MASTER.SSH.PROGRAM,
" -q",
" -o ConnectTimeout=", PARALLEL.REMOTE.MASTER.CONNECTION.TIMEOUT,
" -o UserKnownHostsFile=", PARALLEL.REMOTE.MASTER.SSH.NULL.DEV,
" -o StrictHostKeyChecking=", PARALLEL.REMOTE.MASTER.SSH.STRICT.HOST.KEY.CHECKING,
" -o ServerAliveInterval=", PARALLEL.REMOTE.MASTER.SSH.SERVER.ALIVE.INTERVAL,
" -i ", PARALLEL.REMOTE.MASTER.SSH.PRIV.KEY)
PARALLEL.REMOTE.MASTER.SLAVES.FILE.PATH = "remote-connection-list.txt"
PARALLEL.REMOTE.SLAVE.OUT.FILE = lazy(paste0("worker-remote-", worker.name,
"-", worker.id, ".log"))
PARALLEL.REMOTE.SLAVE.SSH.USER = "root"
PARALLEL.REMOTE.SLAVE.RSCRIPT.PATH = "/usr/bin/Rscript"
PARALLEL.REMOTE.SLAVE.HOMOGENEOUS = TRUE
PARALLEL.REMOTE.SLAVE.METHODS = TRUE
PARALLEL.REMOTE.SLAVE.USEXDR = TRUE
# perform additional custom config
if (file.exists("config-parallel.R.user"))
source("config-parallel.R.user")

31
config.R Normal file
View File

@ -0,0 +1,31 @@
# ---- config ----
# randomization and output files
SEED = 1337
OVERWRITE.OUTPUT.FILES = TRUE # overwrite created datasets and classifiers
# extra user configuration and init
USER.CONFIG.FILE = "config.R.user"
USER.INIT.FILE = "init.R.user"
# checkpoint library
CHECKPOINT.QUICK.LOAD = FALSE # if TRUE then skip testing https and checking url
CHECKPOINT.MRAN.URL = "http://mran.microsoft.com/"
CHECKPOINT.SNAPSHOT.DATE = "2016-07-01"
# logging system
LOGGER.OUTPUT.DIR = "logs"
LOGGER.OUTPUT.TEST.FILE = "output-test.log"
LOGGER.LEVEL = 6 # futile.logger::INFO
LOGGER.OVERWRITE.EXISTING.FILES = TRUE
# load custom config
if (file.exists(USER.CONFIG.FILE))
{
source(USER.CONFIG.FILE)
}

115
init-parallel.R Normal file
View File

@ -0,0 +1,115 @@
# ---- init-parallel ----
# load setup variables
source("config-parallel.R")
source("utils-parallel.R")
# create cluster
cl = if (PARALLEL.USED.METHOD == "LOCAL")
{
if (PARALLEL.LOCAL.METHOD == "PSOCK")
{
flog.info("Creating local PSOCK cluster")
make.psock.cluster(
names = PARALLEL.LOCAL.NODES,
connection.timeout = PARALLEL.LOCAL.CONNECTION.TIMEOUT,
outfile = PARALLEL.LOCAL.SLAVE.OUT.FILE)
} else if (PARALLEL.LOCAL.METHOD == "FORK")
{
flog.info("Creating local FORK cluster")
makeForkCluster(PARALLEL.LOCAL.NODES)
} else {
stop.script(paste("Unknown local parallel cluster method:",
PARALLEL.USED.METHOD))
}
} else if (PARALLEL.USED.METHOD == "REMOTE")
{
if (PARALLEL.REMOTE.METHOD == "PSOCK")
{
flog.info("Creating remote PSOCK cluster")
if (!file.exists(PARALLEL.REMOTE.MASTER.SLAVES.FILE.PATH))
{
stop.script(paste("Unable to read list of remote hosts from",
PARALLEL.REMOTE.MASTER.SLAVES.FILE.PATH))
}
slaves.list = readLines(PARALLEL.REMOTE.MASTER.SLAVES.FILE.PATH)
make.psock.cluster(
names = slaves.list,
connection.timeout = PARALLEL.REMOTE.MASTER.CONNECTION.TIMEOUT,
master = PARALLEL.REMOTE.MASTER.IP,
port = PARALLEL.REMOTE.MASTER.PORT,
rshcmd = PARALLEL.REMOTE.MASTER.SHELL.CMD,
outfile = PARALLEL.REMOTE.SLAVE.OUT.FILE,
user = PARALLEL.REMOTE.SLAVE.SSH.USER,
rscript = PARALLEL.REMOTE.SLAVE.RSCRIPT.PATH,
homogeneous = PARALLEL.REMOTE.SLAVE.HOMOGENEOUS,
methods = PARALLEL.REMOTE.SLAVE.METHODS,
useXDR = PARALLEL.REMOTE.SLAVE.USEXDR)
}
else {
stop.script(paste("Unknown remote parallel cluster method:",
PARALLEL.REMOTE.METHOD))
}
} else {
stop.script(paste("Unknown used parallel method:", PARALLEL.USED.METHOD))
}
flog.info("Exporting checkpoint constants")
clusterExport(cl, c("CHECKPOINT.QUICK.LOAD", "CHECKPOINT.MRAN.URL",
"CHECKPOINT.SNAPSHOT.DATE", "LOGGER.LEVEL"))
clusterEvalQ(cl, {
library(checkpoint)
if (CHECKPOINT.QUICK.LOAD) # approx. x10 faster checkpoint library loading
{
# assume https
options(checkpoint.mranUrl = CHECKPOINT.MRAN.URL)
# disable url checking
assignInNamespace("is.404", function(mran, warn = TRUE) { FALSE },
"checkpoint")
}
checkpoint(CHECKPOINT.SNAPSHOT.DATE, verbose = TRUE, scanForPackages = TRUE)
library(futile.logger)
flog.threshold(LOGGER.LEVEL)
flog.info("Logging configured")
})
if (PARALLEL.DISABLE.MKL.THREADS)
{
clusterEvalQ(cl, {
tryCatch({setMKLthreads(1);
flog.info("Set MKL threads to 1 on slave")},
error = function(e) e)
})
}
flog.info("Setting cluster RNG kind")
clusterEvalQ(cl, {
RNGkind("L'Ecuyer-CMRG")
})
flog.info("Registering cluster")
registerDoParallel(cl)
foreach::foreach(i = 1:foreach::getDoParWorkers()) %dopar%
{
flog.info("Foreach startup test")
}
flog.info(paste(rep("*", 25), collapse = ""))
# perform additional custom init
if (file.exists("init-parallel.R.user"))
source("init-parallel.R.user")

79
init.R Normal file
View File

@ -0,0 +1,79 @@
# ---- init ----
# stop cluster if it is already registered
if ("cl" %in% ls() && !is.null(foreach::getDoParName()))
{
if (foreach::getDoParName() != "doSEQ")
{
flog.info("Stopping already registered cluster")
stop.cluster()
}
}
# clear envirionment
rm(list = ls())
# load setup variables
source("config.R")
# load library management system
suppressPackageStartupMessages(
library(checkpoint)
)
if (CHECKPOINT.QUICK.LOAD) # approx. x10 faster checkpoint library loading
{
# assume https
options(checkpoint.mranUrl = CHECKPOINT.MRAN.URL)
# disable url checking
assignInNamespace("is.404", function(mran, warn = TRUE) { FALSE },
"checkpoint")
}
checkpoint(CHECKPOINT.SNAPSHOT.DATE, verbose = TRUE, scanForPackages = TRUE)
# load logging system
suppressPackageStartupMessages(
library(futile.logger)
)
invisible(flog.threshold(LOGGER.LEVEL))
if (!dir.exists(LOGGER.OUTPUT.DIR))
{
dir.create(LOGGER.OUTPUT.DIR, recursive = TRUE)
}
# load libraries
suppressPackageStartupMessages({
library(plyr)
library(dplyr)
library(ttutils)
library(lazyeval)
library(R.utils)
library(RCurl)
# caret - core
library(caret)
library(e1071)
# caret - classifiers
library(kernlab) # SVM
# parallel computation
library(doParallel)
})
# load helper functions
source("utils.R")
# perform additional custom init
if (file.exists(USER.INIT.FILE))
{
source(USER.INIT.FILE)
}

View File

@ -0,0 +1,16 @@
Version: 1.0
RestoreWorkspace: Default
SaveWorkspace: Default
AlwaysSaveHistory: Default
EnableCodeIndexing: Yes
UseSpacesForTab: Yes
NumSpacesForTab: 4
Encoding: UTF-8
RnwWeave: Sweave
LaTeX: pdfLaTeX
AutoAppendNewline: Yes
StripTrailingWhitespace: Yes

520
remote-commands.sh Normal file
View File

@ -0,0 +1,520 @@
#!/bin/bash
# working with WMI Rescue - small Linux image based on
# Debian distribution; see http://rescue.wmi.amu.edu.pl
# config
MRO_VERSION="3.3.1"
SSH_OPTIONS="-o ConnectTimeout=5 -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -q"
SSH_USER="root"
SSHPASS_PWD="wmi"
SSH_KEYS_DIR="ssh"
SSH_KEY_PRIV="rsa-priv.key"
SSH_KEY_PUB="rsa-pub.key"
MRO_INSTALL_URL="https://mran.microsoft.com/install"
HOSTS_FILE="remote-hosts.txt"
CONNECTION_LIST_FILE="remote-connection-list.txt"
HOSTS_SCANNED_FILE="remote-hosts-scanned.txt"
DEBIAN_PACKAGES_TO_INSTALL="build-essential gfortran ed htop libxml2-dev ca-certificates curl libcurl4-openssl-dev gdebi-core sshpass default-jre default-jdk libpcre3-dev zlib1g-dev liblzma-dev libbz2-dev libicu-dev"
REMOTE_DETECT_LOGICAL_CPUS="FALSE"
MIN_HOSTS=100
SHELL_SCRIPT=$(basename $0)
LOG_STEPS="logs/${SHELL_SCRIPT%.*}".log
HOSTS_ARRAY=()
# messaging
report_error()
{
echo $1 > /tmp/command_error.$$
}
[[ -w /tmp ]] && report_error 0
# https://stackoverflow.com/a/5196220
# modified for Debian
# Use step(), try(), and next() to perform a series of commands and print
# [ OK ] or [FAILED] at the end. The step as a whole fails if any individual
# command fails.
#
# Example:
# step "Remounting / and /boot as read-write:"
# try mount -o remount,rw /
# try mount -o remount,rw /boot
# next
step()
{
echo -n "* $@ "
STEP_OK=0
[[ -w /tmp ]] && echo $STEP_OK > /tmp/step.$$
}
try()
{
# skip if previous command in step failed
[[ -f /tmp/step.$$ ]] && { PREV_STEP=$(< /tmp/step.$$); }
[[ $PREV_STEP -ne 0 ]] && return 1
# Check for `-b' argument to run command in the background.
local BG=
[[ $1 == -b ]] && { BG=1; shift; }
[[ $1 == -- ]] && { shift; }
# Run the command.
if [[ -z $BG ]]; then
"$@"
else
"$@" &
fi
# Check if command failed and update $STEP_OK if so.
local EXIT_CODE=$?
if [[ $EXIT_CODE -ne 0 ]]; then
STEP_OK=$EXIT_CODE
[[ -w /tmp ]] && echo $STEP_OK > /tmp/step.$$
if [[ -n $LOG_STEPS ]]; then
local FILE=$(readlink -m "${BASH_SOURCE[1]}")
local LINE=${BASH_LINENO[0]}
mkdir -p $( dirname $LOG_STEPS )
echo "$FILE: line $LINE: Command \`$*' failed with exit code $EXIT_CODE." >> "$LOG_STEPS"
fi
fi
return $EXIT_CODE
}
test -t 1 && CONSOLE_STDOUT=1 || CONSOLE_STDOUT=0
[[ $TERM != "dumb" ]] && [[ $CONSOLE_STDOUT -eq 1 ]] && CONSOLE_COLORS=1 || CONSOLE_COLORS=0
[ $CONSOLE_COLORS -eq 1 ] && CONSOLE_RED=$(tput setaf 1)
[ $CONSOLE_COLORS -eq 1 ] && CONSOLE_GREEN=$(tput setaf 2)
[ $CONSOLE_COLORS -eq 1 ] && CONSOLE_YELLOW=$(tput setaf 3)
[ $CONSOLE_COLORS -eq 1 ] && CONSOLE_NORMAL=$(tput sgr0)
[ $CONSOLE_COLORS -eq 1 ] && CONSOLE_RESULT_POS=$[$(tput cols)-10] || CONSOLE_RESULT_POS=0
next()
{
# https://stackoverflow.com/a/5506264
[ $CONSOLE_COLORS -eq 1 ] && tput hpa ${CONSOLE_RESULT_POS}
[[ -f /tmp/step.$$ ]] && { STEP_OK=$(< /tmp/step.$$); rm -f /tmp/step.$$; }
[[ $STEP_OK -eq 0 ]] && printf '%s%s' "$CONSOLE_GREEN" "[ OK ]" "$CONSOLE_NORMAL" || printf '%s%s' "$CONSOLE_RED" "[FAIL]" "$CONSOLE_NORMAL"
echo
[[ $STEP_OK -ne 0 ]] && report_error 1
return $STEP_OK
}
info()
{
echo -n "* $@ "
[ $CONSOLE_COLORS -eq 1 ] && tput hpa ${CONSOLE_RESULT_POS}
printf '%s' "[INFO]"
echo
}
fail()
{
echo -n "* $@ "
[ $CONSOLE_COLORS -eq 1 ] && tput hpa ${CONSOLE_RESULT_POS}
printf '%s%s' "$CONSOLE_RED" "[FAIL]" "$CONSOLE_NORMAL"
echo
}
success()
{
echo -n "* $@ "
[ $CONSOLE_COLORS -eq 1 ] && tput hpa ${CONSOLE_RESULT_POS}
printf '%s%s' "$CONSOLE_GREEN" "[ OK ]" "$CONSOLE_NORMAL"
echo
}
warn()
{
echo -n "* $@ "
[ $CONSOLE_COLORS -eq 1 ] && tput hpa ${CONSOLE_RESULT_POS}
printf '%s%s' "$CONSOLE_YELLOW" "[WARN]" "$CONSOLE_NORMAL"
echo
}
# functions
generate_ssh_keys()
{
step "Generating SSH keys"
try mkdir -p ssh
try ssh-keygen -q -t rsa -b 4096 -f ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} -P "" -C "rscript@remote"
try mv ${SSH_KEYS_DIR}/${SSH_KEY_PRIV}.pub ${SSH_KEYS_DIR}/${SSH_KEY_PUB}
next
check_if_command_error
}
install_env()
{
step "Installing environment"
echo
try apt-get update
local DEBIAN_FRONTEND=noninteractive
try apt-get -y -o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confnew" upgrade
try apt-get -y -o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confnew" install ${DEBIAN_PACKAGES_TO_INSTALL}
try apt-get clean
next
check_if_command_error
}
install_mro()
{
step "Installing Microsoft R Open"
echo
Rscript -e "invisible(TRUE)" &> /dev/null
if [[ $? -ne 0 ]]; then
try wget ${MRO_INSTALL_URL}/mro/${MRO_VERSION}/microsoft-r-open-${MRO_VERSION}.tar.gz
try tar -xvf microsoft-r-open-${MRO_VERSION}.tar.gz
try gdebi -n microsoft-r-open/deb/microsoft-r-open-mro-${MRO_VERSION:0:3}.deb
try gdebi -n microsoft-r-open/deb/microsoft-r-open-foreachiterators-${MRO_VERSION:0:3}.deb
try gdebi -n microsoft-r-open/deb/microsoft-r-open-mkl-${MRO_VERSION:0:3}.deb
try R CMD javareconf
rm -rf microsoft-r-open*
else
echo "Microsoft R Open already installed"
fi
try Rscript -e "install.packages('knitr')"
try apt-get clean
next
check_if_command_error
}
install_r_libraries()
{
step "Installing R libraries"
echo
try mkdir -p ~/.checkpoint
try Rscript init.R # run checkpoint
next
check_if_command_error
}
dump_project_r_files()
{
step "Making project R files dump"
try tar -czf project-r-files.tar.gz *.R*
next
check_if_command_error
}
dump_r_libraries()
{
step "Making R libraries dump"
wd=`pwd`
cd ~/
try tar -czf $wd/checkpoint.tar.gz .checkpoint/*
cd $wd
next
check_if_command_error
}
hosts_push_ssh_key()
{
info "Pushing SSH keys to hosts"
for host in "${HOSTS_ARRAY[@]}"; do
step "-- ${host}"
try sshpass -p ${SSHPASS_PWD} ssh ${SSH_OPTIONS} ${SSH_USER}@${host} 'mkdir -p ~/.ssh'
try sshpass -p ${SSHPASS_PWD} scp ${SSH_OPTIONS} ${SSH_KEYS_DIR}/${SSH_KEY_PUB} ${SSH_USER}@${host}:~/.ssh
try sshpass -p ${SSHPASS_PWD} ssh ${SSH_OPTIONS} ${SSH_USER}@${host} "cat ~/.ssh/${SSH_KEY_PUB} >> ~/.ssh/authorized_keys"
try sshpass -p ${SSHPASS_PWD} ssh ${SSH_OPTIONS} ${SSH_USER}@${host} "sed -i -e 's/#PasswordAuthentication yes/PasswordAuthentication no/ig' /etc/ssh/sshd_config; service ssh restart"
next
done
check_if_command_error
}
hosts_scan_available()
{
HOSTS_SCANNED_ARRAY=()
info "Scanning available hosts"
for host in "${HOSTS_ARRAY[@]}"; do
ssh -o ConnectTimeout=2 -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o PreferredAuthentications=publickey -q -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "true"
EXIT_CODE=$?
if [[ $EXIT_CODE -ne 0 ]]; then
fail "-- ${host}"
else
success "-- ${host}"
HOSTS_SCANNED_ARRAY+=("$host")
fi
done
HOSTS_ARRAY=("${HOSTS_SCANNED_ARRAY[@]}")
if [[ ${#HOSTS_ARRAY[@]} -eq 0 ]]; then
fail "No available hosts"
exit 1
else
info "Available ${#HOSTS_ARRAY[@]} hosts"
fi
if [[ ${#HOSTS_ARRAY[@]} -lt $MIN_HOSTS ]]; then
fail "Too few hosts: ${#HOSTS_ARRAY[@]} ; min.: $MIN_HOSTS"
exit 1
fi
if [ -f ${HOSTS_SCANNED_FILE} ] ; then rm ${HOSTS_SCANNED_FILE}; fi
for host in "${HOSTS_ARRAY[@]}"; do
echo ${host} >> ${HOSTS_SCANNED_FILE}
done
}
hosts_push_r_libraries_dump()
{
info "Pushing R libraries dump to hosts"
for host in "${HOSTS_ARRAY[@]}"; do
step "-- ${host}"
try scp ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} checkpoint.tar.gz ${SSH_USER}@${host}:~/
try ssh ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "tar -xzf checkpoint.tar.gz -C ~/; rm checkpoint.tar.gz"
next
done
check_if_command_error
}
hosts_push_project_r_files()
{
info "Pushing project R files to hosts"
for host in "${HOSTS_ARRAY[@]}"; do
step "-- ${host}"
try scp ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} project-r-files.tar.gz ${SSH_USER}@${host}:~/
try ssh ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "tar -xzf project-r-files.tar.gz -C ~/; rm project-r-files.tar.gz"
next
done
check_if_command_error
}
hosts_push_shell_script()
{
info "Pushing shell script to hosts"
for host in "${HOSTS_ARRAY[@]}"; do
step "-- ${host}"
try scp ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SHELL_SCRIPT} ${SSH_USER}@${host}:~/
next
done
check_if_command_error
}
hosts_install()
{
case "$1" in
"env") info "Installing environment on hosts" ;;
"mro") info "Installing Microsoft R Open on hosts" ;;
"r_libraries") info "Installing R libraries on hosts" ;;
*) fail "Unknown remote install command"; report_error 1; check_if_command_error
esac
for host in "${HOSTS_ARRAY[@]}"; do
info "-- Invoking ${host}"
{ ssh ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "bash ${SHELL_SCRIPT} install_$1 &> install_$1.log" ;
endcode=$?
if [ $endcode -eq 0 ] ; then
success "-- ${host} finished"
else
fail "-- ${host} finished"
report_error 1
fi
} &
done
last_workers=-1
while true; do
current_workers=$(jobs -rp | wc -l)
if [ $current_workers -eq 0 ] ; then break; fi
if (( $current_workers % 5 == 0)) && [ "$current_workers" -ne "$last_workers" ] ; then
info "- Waiting for $current_workers hosts"
last_workers=$current_workers
fi
sleep 1
done
check_if_command_error
}
hosts_install_env() { hosts_install env; }
hosts_install_mro() { hosts_install mro; }
hosts_install_r_libraries() { hosts_install r_libraries; }
hosts_power_off()
{
info "Power off on hosts"
for host in "${HOSTS_ARRAY[@]}"; do
step "-- ${host}"
try ssh ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "poweroff"
next
done
check_if_command_error
}
make_remote_connection_list()
{
info "Making remote connection list:"
if [ -f ${CONNECTION_LIST_FILE} ] ; then rm ${CONNECTION_LIST_FILE}; fi
case "$1" in
"single")
step "one connection per host"
for host in "${HOSTS_ARRAY[@]}"; do
try echo ${host} >> ${CONNECTION_LIST_FILE}
done
next
;;
"nproc")
info "'number of cores' per host"
for host in "${HOSTS_ARRAY[@]}"; do
step "-- ${host}"
cornum=`try ssh ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} '/usr/bin/Rscript -e "cat(parallel::detectCores(logical = ${REMOTE_DETECT_LOGICAL_CPUS}))"'`
regex='^[0-9]+$'
if ! [[ $cornum =~ $regex ]] ; then
try false
else
for ((i=1; i<=$cornum; i++)); do try echo ${host} >> ${CONNECTION_LIST_FILE}; done
echo -n "($cornum cores) "
fi
next
done
;;
*)
fail "unknown type"
report_error 1
esac
check_if_command_error
}
make_remote_connection_list_single() { make_remote_connection_list single; }
make_remote_connection_list_nproc() { make_remote_connection_list nproc; }
hosts_check_install_log()
{
info "Checking install log on hosts"
for host in "${HOSTS_ARRAY[@]}"; do
step "-- ${host}"
echo
try ssh ${SSH_OPTIONS/-q/} -o LogLevel=error -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "cat install_$1.log"
next
done
check_if_command_error
}
hosts_check_install_log_env() { hosts_check_install_log env; }
hosts_check_install_log_mro() { hosts_check_install_log mro; }
hosts_check_install_log_r_libraries() { hosts_check_install_log r_libraries; }
hosts_check_worker_log()
{
info "Checking worker log on hosts"
for host in "${HOSTS_ARRAY[@]}"; do
step "-- ${host}"
echo
try ssh ${SSH_OPTIONS/-q/} -o LogLevel=error -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "for f in worker-remote-*.log; do echo \$f; cat -n \$f; done"
next
done
check_if_command_error
}
hosts_clean_worker_log()
{
info "Cleaning workers logs"
for host in "${HOSTS_ARRAY[@]}"; do
step "-- ${host}"
try ssh ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "rm -f worker-remote-*.log"
next
done
check_if_command_error
}
check_if_command_error()
{
errcode=$(< /tmp/command_error.$$)
[[ $errcode -ne 0 ]] && { warn "Stopping script execution"; rm -f /tmp/command_error.$$; exit $errcode; }
}
my_configure_hosts()
{
#generate_ssh_keys
#hosts_push_ssh_key
hosts_scan_available
hosts_push_shell_script
dump_project_r_files
hosts_push_project_r_files
hosts_install_env
hosts_install_mro
#hosts_install_r_libraries
hosts_push_r_libraries_dump
make_remote_connection_list_nproc
#make_remote_connection_list_single
}
configure_hosts()
{
generate_ssh_keys
hosts_push_ssh_key
hosts_push_shell_script
dump_project_r_files
hosts_push_project_r_files
hosts_install_env
hosts_install_mro
hosts_install_r_libraries
#hosts_push_r_libraries_dump
make_remote_connection_list_nproc
#make_remote_connection_list_single
}
# read hosts from file or stdin
if [ -t 0 ]; then
if [ ! -f "$HOSTS_FILE" ]
then
info "No hosts file, working with localhost"
HOSTS_ARRAY+=("127.0.0.1")
else
readarray -t HOSTS_ARRAY < $HOSTS_FILE
fi
else
while read -r host ; do
HOSTS_ARRAY+=("$host")
done
fi
info "Working with ${#HOSTS_ARRAY[@]} hosts"
# read arguments as commands
for i in "$@"
do
case "$i" in
"hosts_install"|"hosts_check_install_log"|"make_remote_connection_list"|"info"|"fail"|"success"|"warn"|"next"|"try"|"step") ;;
*)
if [ "$(type -t $i)" = "function" ]; then
$i
else
fail "Command $i not found"
report_error 127
fi
esac
check_if_command_error
done
rm -f /tmp/command_error.$$

41
test.R Normal file
View File

@ -0,0 +1,41 @@
# ---- init ----
source("init.R")
setup.logger(file.path(LOGGER.OUTPUT.DIR, LOGGER.OUTPUT.TEST.FILE),
LOGGER.OVERWRITE.EXISTING.FILES)
source("init-parallel.R")
# ---- foreach-test ----
flog.info("Test foreach")
fdata = foreach::foreach(i = 1:foreach::getDoParWorkers(),
.combine = c) %dopar%
{
Sys.sleep(1)
return(i)
}
print(fdata)
# ---- caret-test ----
flog.info("Test caret")
tr.control = caret::trainControl(method = "repeatedcv",
number = 10,
repeats = 10,
allowParallel = TRUE)
model = caret::train(form = mpg ~ .,
data = mtcars,
method = "svmLinear",
trControl = tr.control)
print(model)
# ---- shutdown ----
stop.cluster()

88
utils-parallel.R Normal file
View File

@ -0,0 +1,88 @@
make.psock.cluster = function(names, connection.timeout, ...)
{
if (is.numeric(names))
{
names = as.integer(names[1])
if (is.na(names) || names < 1)
{
stop.script("Numeric 'names' must be >= 1")
}
names = rep("localhost", names)
}
parallel:::.check_ncores(length(names))
options = parallel:::addClusterOptions(parallel:::defaultClusterOptions,
list(...))
cl = vector("list", length(names))
for (i in seq_along(cl))
{
flog.info(paste0("[", i, "/", length(cl), "] Connecting to ",
names[[i]], " ... "))
options.copy = parallel:::addClusterOptions(options, NULL)
options.out.file = parallel:::getClusterOption("outfile", options)
if (class(options.out.file) == "lazy")
{
options.copy = parallel:::addClusterOptions(options,
list("outfile" = lazy_eval(options.out.file,
list(worker.id = i,
worker.name = names[i]))))
}
tryCatch({
cl.node =
evalWithTimeout(parallel:::newPSOCKnode(names[[i]],
options = options.copy,
rank = i),
timeout = connection.timeout,
onTimeout = "error")
cl[[i]] = cl.node
flog.info("OK")},
error = function(e) {
if ("TimeoutException" %in% class(e))
{
flog.warn("Timeout")
} else {
stop.script(e)
}
}
)
}
cl.filtered = list()
i = 1
for (j in seq_along(cl))
{
if (!is.null(cl[[j]]))
{
cl.filtered[[i]] = cl[[j]]
i = i + 1
}
}
if (length(cl) != length(cl.filtered))
{
flog.warn(paste("Unable to connect to", length(cl) - length(cl.filtered),
"nodes"))
}
if (length(cl.filtered) == 0)
{
stop.script("No remote workers")
} else {
flog.info(paste("Working on", length(cl.filtered), "nodes"))
}
class(cl.filtered) = c("SOCKcluster", "cluster")
cl.filtered
}
stop.cluster = function(cl.to.stop = cl)
{
flog.info("Workers shut down")
foreach::registerDoSEQ()
parallel::stopCluster(cl.to.stop)
}

26
utils.R Normal file
View File

@ -0,0 +1,26 @@
# additional functions
setup.logger = function(output.file, overwrite.existing.files)
{
if (overwrite.existing.files & file.exists(output.file))
{
file.remove(output.file)
}
invisible(flog.appender(appender.tee(output.file)))
}
stop.script = function(error)
{
if (is.character(error))
{
flog.error(error)
} else {
flog.error(getMessage(error))
}
throw(error)
}