diff --git a/.gitignore b/.gitignore index fcff087..c090b7d 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,7 @@ vignettes/*.pdf # Temporary files created by R markdown *.utf8.md *.knit.md +.Rproj.user + +logs/* +.R.user diff --git a/config-parallel.R b/config-parallel.R new file mode 100644 index 0000000..7ecd069 --- /dev/null +++ b/config-parallel.R @@ -0,0 +1,54 @@ +# ---- config-parallel ---- + +PARALLEL.USED.METHOD = "REMOTE" # LOCAL or REMOTE +PARALLEL.DISABLE.MKL.THREADS = TRUE + +PARALLEL.NO.JOBS.PER.CHUNK = 30 # no. jobs per worker in step 6-2 +PARALLEL.NO.JOBS.MULTIPLIER = 2 # no. jobs multiplier per job sequence + # to minimize worker time of waitiing for + # remaining workers in step 6-2 + +# local +PARALLEL.LOCAL.METHOD = "PSOCK" + +PARALLEL.LOCAL.NODES = parallel::detectCores(logical = FALSE) +PARALLEL.LOCAL.CONNECTION.TIMEOUT = 5 +PARALLEL.LOCAL.SLAVE.OUT.FILE = + lazy(file.path(LOGGER.OUTPUT.DIR, + paste0("worker-local-", worker.name, "-", worker.id, ".log"))) + +# remote +PARALLEL.REMOTE.METHOD = "PSOCK" + +PARALLEL.REMOTE.MASTER.IP = "192.168.0.1" # ip accessible from slaves +PARALLEL.REMOTE.MASTER.PORT = 11000 +PARALLEL.REMOTE.MASTER.CONNECTION.TIMEOUT = 10 +PARALLEL.REMOTE.MASTER.SSH.PROGRAM = "ssh" # ssh, ssh.exe, etc. +PARALLEL.REMOTE.MASTER.SSH.PRIV.KEY = file.path("ssh", "rsa-priv.key") +PARALLEL.REMOTE.MASTER.SSH.NULL.DEV = "/dev/null" +PARALLEL.REMOTE.MASTER.SSH.STRICT.HOST.KEY.CHECKING = "no" +PARALLEL.REMOTE.MASTER.SSH.SERVER.ALIVE.INTERVAL = 30 +PARALLEL.REMOTE.MASTER.SHELL.CMD = +paste0( + PARALLEL.REMOTE.MASTER.SSH.PROGRAM, + " -q", + " -o ConnectTimeout=", PARALLEL.REMOTE.MASTER.CONNECTION.TIMEOUT, + " -o UserKnownHostsFile=", PARALLEL.REMOTE.MASTER.SSH.NULL.DEV, + " -o StrictHostKeyChecking=", PARALLEL.REMOTE.MASTER.SSH.STRICT.HOST.KEY.CHECKING, + " -o ServerAliveInterval=", PARALLEL.REMOTE.MASTER.SSH.SERVER.ALIVE.INTERVAL, + " -i ", PARALLEL.REMOTE.MASTER.SSH.PRIV.KEY) + +PARALLEL.REMOTE.MASTER.SLAVES.FILE.PATH = "remote-connection-list.txt" + +PARALLEL.REMOTE.SLAVE.OUT.FILE = lazy(paste0("worker-remote-", worker.name, + "-", worker.id, ".log")) +PARALLEL.REMOTE.SLAVE.SSH.USER = "root" +PARALLEL.REMOTE.SLAVE.RSCRIPT.PATH = "/usr/bin/Rscript" +PARALLEL.REMOTE.SLAVE.HOMOGENEOUS = TRUE +PARALLEL.REMOTE.SLAVE.METHODS = TRUE +PARALLEL.REMOTE.SLAVE.USEXDR = TRUE + +# perform additional custom config + +if (file.exists("config-parallel.R.user")) + source("config-parallel.R.user") diff --git a/config.R b/config.R new file mode 100644 index 0000000..b0d5920 --- /dev/null +++ b/config.R @@ -0,0 +1,31 @@ +# ---- config ---- + +# randomization and output files + +SEED = 1337 +OVERWRITE.OUTPUT.FILES = TRUE # overwrite created datasets and classifiers + +# extra user configuration and init + +USER.CONFIG.FILE = "config.R.user" +USER.INIT.FILE = "init.R.user" + +# checkpoint library + +CHECKPOINT.QUICK.LOAD = FALSE # if TRUE then skip testing https and checking url +CHECKPOINT.MRAN.URL = "http://mran.microsoft.com/" +CHECKPOINT.SNAPSHOT.DATE = "2016-07-01" + +# logging system + +LOGGER.OUTPUT.DIR = "logs" +LOGGER.OUTPUT.TEST.FILE = "output-test.log" +LOGGER.LEVEL = 6 # futile.logger::INFO +LOGGER.OVERWRITE.EXISTING.FILES = TRUE + +# load custom config + +if (file.exists(USER.CONFIG.FILE)) +{ + source(USER.CONFIG.FILE) +} diff --git a/init-parallel.R b/init-parallel.R new file mode 100644 index 0000000..cb1300c --- /dev/null +++ b/init-parallel.R @@ -0,0 +1,115 @@ +# ---- init-parallel ---- + +# load setup variables + +source("config-parallel.R") +source("utils-parallel.R") + +# create cluster + +cl = if (PARALLEL.USED.METHOD == "LOCAL") +{ + if (PARALLEL.LOCAL.METHOD == "PSOCK") + { + flog.info("Creating local PSOCK cluster") + make.psock.cluster( + names = PARALLEL.LOCAL.NODES, + connection.timeout = PARALLEL.LOCAL.CONNECTION.TIMEOUT, + outfile = PARALLEL.LOCAL.SLAVE.OUT.FILE) + } else if (PARALLEL.LOCAL.METHOD == "FORK") + { + flog.info("Creating local FORK cluster") + makeForkCluster(PARALLEL.LOCAL.NODES) + } else { + stop.script(paste("Unknown local parallel cluster method:", + PARALLEL.USED.METHOD)) + } +} else if (PARALLEL.USED.METHOD == "REMOTE") +{ + if (PARALLEL.REMOTE.METHOD == "PSOCK") + { + flog.info("Creating remote PSOCK cluster") + + if (!file.exists(PARALLEL.REMOTE.MASTER.SLAVES.FILE.PATH)) + { + stop.script(paste("Unable to read list of remote hosts from", + PARALLEL.REMOTE.MASTER.SLAVES.FILE.PATH)) + } + + slaves.list = readLines(PARALLEL.REMOTE.MASTER.SLAVES.FILE.PATH) + + make.psock.cluster( + names = slaves.list, + connection.timeout = PARALLEL.REMOTE.MASTER.CONNECTION.TIMEOUT, + master = PARALLEL.REMOTE.MASTER.IP, + port = PARALLEL.REMOTE.MASTER.PORT, + rshcmd = PARALLEL.REMOTE.MASTER.SHELL.CMD, + outfile = PARALLEL.REMOTE.SLAVE.OUT.FILE, + user = PARALLEL.REMOTE.SLAVE.SSH.USER, + rscript = PARALLEL.REMOTE.SLAVE.RSCRIPT.PATH, + homogeneous = PARALLEL.REMOTE.SLAVE.HOMOGENEOUS, + methods = PARALLEL.REMOTE.SLAVE.METHODS, + useXDR = PARALLEL.REMOTE.SLAVE.USEXDR) + } + else { + stop.script(paste("Unknown remote parallel cluster method:", + PARALLEL.REMOTE.METHOD)) + } +} else { + stop.script(paste("Unknown used parallel method:", PARALLEL.USED.METHOD)) +} + + +flog.info("Exporting checkpoint constants") +clusterExport(cl, c("CHECKPOINT.QUICK.LOAD", "CHECKPOINT.MRAN.URL", + "CHECKPOINT.SNAPSHOT.DATE", "LOGGER.LEVEL")) + +clusterEvalQ(cl, { + library(checkpoint) + + if (CHECKPOINT.QUICK.LOAD) # approx. x10 faster checkpoint library loading + { + # assume https + options(checkpoint.mranUrl = CHECKPOINT.MRAN.URL) + # disable url checking + assignInNamespace("is.404", function(mran, warn = TRUE) { FALSE }, + "checkpoint") + } + + checkpoint(CHECKPOINT.SNAPSHOT.DATE, verbose = TRUE, scanForPackages = TRUE) + + library(futile.logger) + + flog.threshold(LOGGER.LEVEL) + + flog.info("Logging configured") +}) + +if (PARALLEL.DISABLE.MKL.THREADS) +{ + clusterEvalQ(cl, { + tryCatch({setMKLthreads(1); + flog.info("Set MKL threads to 1 on slave")}, + error = function(e) e) + }) +} + +flog.info("Setting cluster RNG kind") +clusterEvalQ(cl, { + RNGkind("L'Ecuyer-CMRG") +}) + +flog.info("Registering cluster") +registerDoParallel(cl) + +foreach::foreach(i = 1:foreach::getDoParWorkers()) %dopar% +{ + flog.info("Foreach startup test") +} + +flog.info(paste(rep("*", 25), collapse = "")) + +# perform additional custom init + +if (file.exists("init-parallel.R.user")) + source("init-parallel.R.user") diff --git a/init.R b/init.R new file mode 100644 index 0000000..15411db --- /dev/null +++ b/init.R @@ -0,0 +1,79 @@ +# ---- init ---- + +# stop cluster if it is already registered + +if ("cl" %in% ls() && !is.null(foreach::getDoParName())) +{ + if (foreach::getDoParName() != "doSEQ") + { + flog.info("Stopping already registered cluster") + stop.cluster() + } +} + +# clear envirionment + +rm(list = ls()) + +# load setup variables + +source("config.R") + +# load library management system + +suppressPackageStartupMessages( + library(checkpoint) +) + +if (CHECKPOINT.QUICK.LOAD) # approx. x10 faster checkpoint library loading +{ + # assume https + options(checkpoint.mranUrl = CHECKPOINT.MRAN.URL) + # disable url checking + assignInNamespace("is.404", function(mran, warn = TRUE) { FALSE }, + "checkpoint") +} + +checkpoint(CHECKPOINT.SNAPSHOT.DATE, verbose = TRUE, scanForPackages = TRUE) + +# load logging system + +suppressPackageStartupMessages( + library(futile.logger) +) + +invisible(flog.threshold(LOGGER.LEVEL)) + +if (!dir.exists(LOGGER.OUTPUT.DIR)) +{ + dir.create(LOGGER.OUTPUT.DIR, recursive = TRUE) +} + +# load libraries + +suppressPackageStartupMessages({ + library(plyr) + library(dplyr) + library(ttutils) + library(lazyeval) + library(R.utils) + library(RCurl) + # caret - core + library(caret) + library(e1071) + # caret - classifiers + library(kernlab) # SVM + # parallel computation + library(doParallel) +}) + +# load helper functions + +source("utils.R") + +# perform additional custom init + +if (file.exists(USER.INIT.FILE)) +{ + source(USER.INIT.FILE) +} diff --git a/r-distributed-computing.Rproj b/r-distributed-computing.Rproj new file mode 100644 index 0000000..9241cc8 --- /dev/null +++ b/r-distributed-computing.Rproj @@ -0,0 +1,16 @@ +Version: 1.0 + +RestoreWorkspace: Default +SaveWorkspace: Default +AlwaysSaveHistory: Default + +EnableCodeIndexing: Yes +UseSpacesForTab: Yes +NumSpacesForTab: 4 +Encoding: UTF-8 + +RnwWeave: Sweave +LaTeX: pdfLaTeX + +AutoAppendNewline: Yes +StripTrailingWhitespace: Yes diff --git a/remote-commands.sh b/remote-commands.sh new file mode 100644 index 0000000..878e203 --- /dev/null +++ b/remote-commands.sh @@ -0,0 +1,520 @@ +#!/bin/bash + +# working with WMI Rescue - small Linux image based on +# Debian distribution; see http://rescue.wmi.amu.edu.pl + +# config + +MRO_VERSION="3.3.1" +SSH_OPTIONS="-o ConnectTimeout=5 -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -q" +SSH_USER="root" +SSHPASS_PWD="wmi" +SSH_KEYS_DIR="ssh" +SSH_KEY_PRIV="rsa-priv.key" +SSH_KEY_PUB="rsa-pub.key" +MRO_INSTALL_URL="https://mran.microsoft.com/install" +HOSTS_FILE="remote-hosts.txt" +CONNECTION_LIST_FILE="remote-connection-list.txt" +HOSTS_SCANNED_FILE="remote-hosts-scanned.txt" +DEBIAN_PACKAGES_TO_INSTALL="build-essential gfortran ed htop libxml2-dev ca-certificates curl libcurl4-openssl-dev gdebi-core sshpass default-jre default-jdk libpcre3-dev zlib1g-dev liblzma-dev libbz2-dev libicu-dev" +REMOTE_DETECT_LOGICAL_CPUS="FALSE" +MIN_HOSTS=100 + +SHELL_SCRIPT=$(basename $0) +LOG_STEPS="logs/${SHELL_SCRIPT%.*}".log +HOSTS_ARRAY=() + +# messaging + +report_error() +{ + echo $1 > /tmp/command_error.$$ +} + +[[ -w /tmp ]] && report_error 0 + +# https://stackoverflow.com/a/5196220 +# modified for Debian + +# Use step(), try(), and next() to perform a series of commands and print +# [ OK ] or [FAILED] at the end. The step as a whole fails if any individual +# command fails. +# +# Example: +# step "Remounting / and /boot as read-write:" +# try mount -o remount,rw / +# try mount -o remount,rw /boot +# next +step() +{ + echo -n "* $@ " + + STEP_OK=0 + [[ -w /tmp ]] && echo $STEP_OK > /tmp/step.$$ +} + +try() +{ + # skip if previous command in step failed + [[ -f /tmp/step.$$ ]] && { PREV_STEP=$(< /tmp/step.$$); } + [[ $PREV_STEP -ne 0 ]] && return 1 + + # Check for `-b' argument to run command in the background. + local BG= + + [[ $1 == -b ]] && { BG=1; shift; } + [[ $1 == -- ]] && { shift; } + + # Run the command. + if [[ -z $BG ]]; then + "$@" + else + "$@" & + fi + + # Check if command failed and update $STEP_OK if so. + local EXIT_CODE=$? + + if [[ $EXIT_CODE -ne 0 ]]; then + STEP_OK=$EXIT_CODE + [[ -w /tmp ]] && echo $STEP_OK > /tmp/step.$$ + + if [[ -n $LOG_STEPS ]]; then + local FILE=$(readlink -m "${BASH_SOURCE[1]}") + local LINE=${BASH_LINENO[0]} + + mkdir -p $( dirname $LOG_STEPS ) + + echo "$FILE: line $LINE: Command \`$*' failed with exit code $EXIT_CODE." >> "$LOG_STEPS" + fi + fi + + return $EXIT_CODE +} + +test -t 1 && CONSOLE_STDOUT=1 || CONSOLE_STDOUT=0 +[[ $TERM != "dumb" ]] && [[ $CONSOLE_STDOUT -eq 1 ]] && CONSOLE_COLORS=1 || CONSOLE_COLORS=0 + +[ $CONSOLE_COLORS -eq 1 ] && CONSOLE_RED=$(tput setaf 1) +[ $CONSOLE_COLORS -eq 1 ] && CONSOLE_GREEN=$(tput setaf 2) +[ $CONSOLE_COLORS -eq 1 ] && CONSOLE_YELLOW=$(tput setaf 3) +[ $CONSOLE_COLORS -eq 1 ] && CONSOLE_NORMAL=$(tput sgr0) +[ $CONSOLE_COLORS -eq 1 ] && CONSOLE_RESULT_POS=$[$(tput cols)-10] || CONSOLE_RESULT_POS=0 + +next() +{ + # https://stackoverflow.com/a/5506264 + [ $CONSOLE_COLORS -eq 1 ] && tput hpa ${CONSOLE_RESULT_POS} + + [[ -f /tmp/step.$$ ]] && { STEP_OK=$(< /tmp/step.$$); rm -f /tmp/step.$$; } + [[ $STEP_OK -eq 0 ]] && printf '%s%s' "$CONSOLE_GREEN" "[ OK ]" "$CONSOLE_NORMAL" || printf '%s%s' "$CONSOLE_RED" "[FAIL]" "$CONSOLE_NORMAL" + echo + + [[ $STEP_OK -ne 0 ]] && report_error 1 + + return $STEP_OK +} + +info() +{ + echo -n "* $@ " + [ $CONSOLE_COLORS -eq 1 ] && tput hpa ${CONSOLE_RESULT_POS} + printf '%s' "[INFO]" + echo +} + +fail() +{ + echo -n "* $@ " + [ $CONSOLE_COLORS -eq 1 ] && tput hpa ${CONSOLE_RESULT_POS} + printf '%s%s' "$CONSOLE_RED" "[FAIL]" "$CONSOLE_NORMAL" + echo +} + +success() +{ + echo -n "* $@ " + [ $CONSOLE_COLORS -eq 1 ] && tput hpa ${CONSOLE_RESULT_POS} + printf '%s%s' "$CONSOLE_GREEN" "[ OK ]" "$CONSOLE_NORMAL" + echo +} + +warn() +{ + echo -n "* $@ " + [ $CONSOLE_COLORS -eq 1 ] && tput hpa ${CONSOLE_RESULT_POS} + printf '%s%s' "$CONSOLE_YELLOW" "[WARN]" "$CONSOLE_NORMAL" + echo +} + +# functions + +generate_ssh_keys() +{ + step "Generating SSH keys" + try mkdir -p ssh + try ssh-keygen -q -t rsa -b 4096 -f ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} -P "" -C "rscript@remote" + try mv ${SSH_KEYS_DIR}/${SSH_KEY_PRIV}.pub ${SSH_KEYS_DIR}/${SSH_KEY_PUB} + next + check_if_command_error +} + +install_env() +{ + step "Installing environment" + echo + try apt-get update + local DEBIAN_FRONTEND=noninteractive + try apt-get -y -o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confnew" upgrade + try apt-get -y -o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confnew" install ${DEBIAN_PACKAGES_TO_INSTALL} + try apt-get clean + next + check_if_command_error +} + +install_mro() +{ + step "Installing Microsoft R Open" + echo + + Rscript -e "invisible(TRUE)" &> /dev/null + + if [[ $? -ne 0 ]]; then + try wget ${MRO_INSTALL_URL}/mro/${MRO_VERSION}/microsoft-r-open-${MRO_VERSION}.tar.gz + try tar -xvf microsoft-r-open-${MRO_VERSION}.tar.gz + try gdebi -n microsoft-r-open/deb/microsoft-r-open-mro-${MRO_VERSION:0:3}.deb + try gdebi -n microsoft-r-open/deb/microsoft-r-open-foreachiterators-${MRO_VERSION:0:3}.deb + try gdebi -n microsoft-r-open/deb/microsoft-r-open-mkl-${MRO_VERSION:0:3}.deb + try R CMD javareconf + rm -rf microsoft-r-open* + else + echo "Microsoft R Open already installed" + fi + + try Rscript -e "install.packages('knitr')" + + + try apt-get clean + next + check_if_command_error +} + +install_r_libraries() +{ + step "Installing R libraries" + echo + try mkdir -p ~/.checkpoint + try Rscript init.R # run checkpoint + next + check_if_command_error +} + +dump_project_r_files() +{ + step "Making project R files dump" + try tar -czf project-r-files.tar.gz *.R* + next + check_if_command_error +} + +dump_r_libraries() +{ + step "Making R libraries dump" + wd=`pwd` + cd ~/ + try tar -czf $wd/checkpoint.tar.gz .checkpoint/* + cd $wd + next + check_if_command_error +} + +hosts_push_ssh_key() +{ + info "Pushing SSH keys to hosts" + for host in "${HOSTS_ARRAY[@]}"; do + step "-- ${host}" + try sshpass -p ${SSHPASS_PWD} ssh ${SSH_OPTIONS} ${SSH_USER}@${host} 'mkdir -p ~/.ssh' + try sshpass -p ${SSHPASS_PWD} scp ${SSH_OPTIONS} ${SSH_KEYS_DIR}/${SSH_KEY_PUB} ${SSH_USER}@${host}:~/.ssh + try sshpass -p ${SSHPASS_PWD} ssh ${SSH_OPTIONS} ${SSH_USER}@${host} "cat ~/.ssh/${SSH_KEY_PUB} >> ~/.ssh/authorized_keys" + try sshpass -p ${SSHPASS_PWD} ssh ${SSH_OPTIONS} ${SSH_USER}@${host} "sed -i -e 's/#PasswordAuthentication yes/PasswordAuthentication no/ig' /etc/ssh/sshd_config; service ssh restart" + next + done + check_if_command_error +} + +hosts_scan_available() +{ + HOSTS_SCANNED_ARRAY=() + + info "Scanning available hosts" + for host in "${HOSTS_ARRAY[@]}"; do + ssh -o ConnectTimeout=2 -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o PreferredAuthentications=publickey -q -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "true" + EXIT_CODE=$? + if [[ $EXIT_CODE -ne 0 ]]; then + fail "-- ${host}" + else + success "-- ${host}" + HOSTS_SCANNED_ARRAY+=("$host") + fi + done + + HOSTS_ARRAY=("${HOSTS_SCANNED_ARRAY[@]}") + + if [[ ${#HOSTS_ARRAY[@]} -eq 0 ]]; then + fail "No available hosts" + exit 1 + else + info "Available ${#HOSTS_ARRAY[@]} hosts" + fi + + if [[ ${#HOSTS_ARRAY[@]} -lt $MIN_HOSTS ]]; then + fail "Too few hosts: ${#HOSTS_ARRAY[@]} ; min.: $MIN_HOSTS" + exit 1 + fi + + if [ -f ${HOSTS_SCANNED_FILE} ] ; then rm ${HOSTS_SCANNED_FILE}; fi + + for host in "${HOSTS_ARRAY[@]}"; do + echo ${host} >> ${HOSTS_SCANNED_FILE} + done +} + +hosts_push_r_libraries_dump() +{ + info "Pushing R libraries dump to hosts" + for host in "${HOSTS_ARRAY[@]}"; do + step "-- ${host}" + try scp ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} checkpoint.tar.gz ${SSH_USER}@${host}:~/ + try ssh ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "tar -xzf checkpoint.tar.gz -C ~/; rm checkpoint.tar.gz" + next + done + check_if_command_error +} + +hosts_push_project_r_files() +{ + info "Pushing project R files to hosts" + for host in "${HOSTS_ARRAY[@]}"; do + step "-- ${host}" + try scp ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} project-r-files.tar.gz ${SSH_USER}@${host}:~/ + try ssh ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "tar -xzf project-r-files.tar.gz -C ~/; rm project-r-files.tar.gz" + next + done + check_if_command_error +} + +hosts_push_shell_script() +{ + info "Pushing shell script to hosts" + for host in "${HOSTS_ARRAY[@]}"; do + step "-- ${host}" + try scp ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SHELL_SCRIPT} ${SSH_USER}@${host}:~/ + next + done + check_if_command_error +} + +hosts_install() +{ + case "$1" in + "env") info "Installing environment on hosts" ;; + "mro") info "Installing Microsoft R Open on hosts" ;; + "r_libraries") info "Installing R libraries on hosts" ;; + *) fail "Unknown remote install command"; report_error 1; check_if_command_error + esac + + for host in "${HOSTS_ARRAY[@]}"; do + info "-- Invoking ${host}" + + { ssh ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "bash ${SHELL_SCRIPT} install_$1 &> install_$1.log" ; + endcode=$? + if [ $endcode -eq 0 ] ; then + success "-- ${host} finished" + else + fail "-- ${host} finished" + report_error 1 + fi + } & + + done + + last_workers=-1 + while true; do + current_workers=$(jobs -rp | wc -l) + if [ $current_workers -eq 0 ] ; then break; fi + if (( $current_workers % 5 == 0)) && [ "$current_workers" -ne "$last_workers" ] ; then + info "- Waiting for $current_workers hosts" + last_workers=$current_workers + fi + sleep 1 + done + + check_if_command_error +} + +hosts_install_env() { hosts_install env; } +hosts_install_mro() { hosts_install mro; } +hosts_install_r_libraries() { hosts_install r_libraries; } + +hosts_power_off() +{ + info "Power off on hosts" + for host in "${HOSTS_ARRAY[@]}"; do + step "-- ${host}" + try ssh ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "poweroff" + next + done + check_if_command_error +} + +make_remote_connection_list() +{ + info "Making remote connection list:" + if [ -f ${CONNECTION_LIST_FILE} ] ; then rm ${CONNECTION_LIST_FILE}; fi + case "$1" in + "single") + step "one connection per host" + for host in "${HOSTS_ARRAY[@]}"; do + try echo ${host} >> ${CONNECTION_LIST_FILE} + done + next + ;; + "nproc") + info "'number of cores' per host" + for host in "${HOSTS_ARRAY[@]}"; do + step "-- ${host}" + cornum=`try ssh ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} '/usr/bin/Rscript -e "cat(parallel::detectCores(logical = ${REMOTE_DETECT_LOGICAL_CPUS}))"'` + + regex='^[0-9]+$' + if ! [[ $cornum =~ $regex ]] ; then + try false + else + for ((i=1; i<=$cornum; i++)); do try echo ${host} >> ${CONNECTION_LIST_FILE}; done + echo -n "($cornum cores) " + fi + next + done + ;; + *) + fail "unknown type" + report_error 1 + esac + check_if_command_error +} + +make_remote_connection_list_single() { make_remote_connection_list single; } +make_remote_connection_list_nproc() { make_remote_connection_list nproc; } + +hosts_check_install_log() +{ + info "Checking install log on hosts" + for host in "${HOSTS_ARRAY[@]}"; do + step "-- ${host}" + echo + try ssh ${SSH_OPTIONS/-q/} -o LogLevel=error -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "cat install_$1.log" + next + done + check_if_command_error +} + +hosts_check_install_log_env() { hosts_check_install_log env; } +hosts_check_install_log_mro() { hosts_check_install_log mro; } +hosts_check_install_log_r_libraries() { hosts_check_install_log r_libraries; } + +hosts_check_worker_log() +{ + info "Checking worker log on hosts" + for host in "${HOSTS_ARRAY[@]}"; do + step "-- ${host}" + echo + try ssh ${SSH_OPTIONS/-q/} -o LogLevel=error -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "for f in worker-remote-*.log; do echo \$f; cat -n \$f; done" + next + done + check_if_command_error +} + +hosts_clean_worker_log() +{ + info "Cleaning workers logs" + for host in "${HOSTS_ARRAY[@]}"; do + step "-- ${host}" + try ssh ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "rm -f worker-remote-*.log" + next + done + check_if_command_error +} + +check_if_command_error() +{ + errcode=$(< /tmp/command_error.$$) + [[ $errcode -ne 0 ]] && { warn "Stopping script execution"; rm -f /tmp/command_error.$$; exit $errcode; } +} + +my_configure_hosts() +{ + #generate_ssh_keys + #hosts_push_ssh_key + hosts_scan_available + hosts_push_shell_script + dump_project_r_files + hosts_push_project_r_files + hosts_install_env + hosts_install_mro + #hosts_install_r_libraries + hosts_push_r_libraries_dump + make_remote_connection_list_nproc + #make_remote_connection_list_single +} + +configure_hosts() +{ + generate_ssh_keys + hosts_push_ssh_key + hosts_push_shell_script + dump_project_r_files + hosts_push_project_r_files + hosts_install_env + hosts_install_mro + hosts_install_r_libraries + #hosts_push_r_libraries_dump + make_remote_connection_list_nproc + #make_remote_connection_list_single +} + +# read hosts from file or stdin + +if [ -t 0 ]; then + if [ ! -f "$HOSTS_FILE" ] + then + info "No hosts file, working with localhost" + HOSTS_ARRAY+=("127.0.0.1") + else + readarray -t HOSTS_ARRAY < $HOSTS_FILE + fi +else + while read -r host ; do + HOSTS_ARRAY+=("$host") + done +fi + +info "Working with ${#HOSTS_ARRAY[@]} hosts" + +# read arguments as commands + +for i in "$@" +do + case "$i" in + "hosts_install"|"hosts_check_install_log"|"make_remote_connection_list"|"info"|"fail"|"success"|"warn"|"next"|"try"|"step") ;; + *) + if [ "$(type -t $i)" = "function" ]; then + $i + else + fail "Command $i not found" + report_error 127 + fi + esac + + check_if_command_error +done + +rm -f /tmp/command_error.$$ diff --git a/test.R b/test.R new file mode 100644 index 0000000..ad460e8 --- /dev/null +++ b/test.R @@ -0,0 +1,41 @@ +# ---- init ---- + +source("init.R") + +setup.logger(file.path(LOGGER.OUTPUT.DIR, LOGGER.OUTPUT.TEST.FILE), + LOGGER.OVERWRITE.EXISTING.FILES) + +source("init-parallel.R") + +# ---- foreach-test ---- + +flog.info("Test foreach") + +fdata = foreach::foreach(i = 1:foreach::getDoParWorkers(), + .combine = c) %dopar% +{ + Sys.sleep(1) + return(i) +} + +print(fdata) + +# ---- caret-test ---- + +flog.info("Test caret") + +tr.control = caret::trainControl(method = "repeatedcv", + number = 10, + repeats = 10, + allowParallel = TRUE) + +model = caret::train(form = mpg ~ ., + data = mtcars, + method = "svmLinear", + trControl = tr.control) + +print(model) + +# ---- shutdown ---- + +stop.cluster() diff --git a/utils-parallel.R b/utils-parallel.R new file mode 100644 index 0000000..d5abc06 --- /dev/null +++ b/utils-parallel.R @@ -0,0 +1,88 @@ +make.psock.cluster = function(names, connection.timeout, ...) +{ + if (is.numeric(names)) + { + names = as.integer(names[1]) + if (is.na(names) || names < 1) + { + stop.script("Numeric 'names' must be >= 1") + } + names = rep("localhost", names) + } + + parallel:::.check_ncores(length(names)) + options = parallel:::addClusterOptions(parallel:::defaultClusterOptions, + list(...)) + cl = vector("list", length(names)) + + for (i in seq_along(cl)) + { + + flog.info(paste0("[", i, "/", length(cl), "] Connecting to ", + names[[i]], " ... ")) + + options.copy = parallel:::addClusterOptions(options, NULL) + options.out.file = parallel:::getClusterOption("outfile", options) + if (class(options.out.file) == "lazy") + { + options.copy = parallel:::addClusterOptions(options, + list("outfile" = lazy_eval(options.out.file, + list(worker.id = i, + worker.name = names[i])))) + } + + tryCatch({ + cl.node = + evalWithTimeout(parallel:::newPSOCKnode(names[[i]], + options = options.copy, + rank = i), + timeout = connection.timeout, + onTimeout = "error") + cl[[i]] = cl.node + flog.info("OK")}, + error = function(e) { + if ("TimeoutException" %in% class(e)) + { + flog.warn("Timeout") + } else { + stop.script(e) + } + } + ) + } + + cl.filtered = list() + i = 1 + for (j in seq_along(cl)) + { + if (!is.null(cl[[j]])) + { + cl.filtered[[i]] = cl[[j]] + i = i + 1 + } + } + + if (length(cl) != length(cl.filtered)) + { + flog.warn(paste("Unable to connect to", length(cl) - length(cl.filtered), + "nodes")) + } + + if (length(cl.filtered) == 0) + { + stop.script("No remote workers") + } else { + flog.info(paste("Working on", length(cl.filtered), "nodes")) + } + + class(cl.filtered) = c("SOCKcluster", "cluster") + cl.filtered +} + +stop.cluster = function(cl.to.stop = cl) +{ + flog.info("Workers shut down") + + foreach::registerDoSEQ() + parallel::stopCluster(cl.to.stop) +} diff --git a/utils.R b/utils.R new file mode 100644 index 0000000..64ae030 --- /dev/null +++ b/utils.R @@ -0,0 +1,26 @@ + +# additional functions + +setup.logger = function(output.file, overwrite.existing.files) +{ + if (overwrite.existing.files & file.exists(output.file)) + { + file.remove(output.file) + } + + invisible(flog.appender(appender.tee(output.file))) +} + + +stop.script = function(error) +{ + if (is.character(error)) + { + flog.error(error) + } else { + flog.error(getMessage(error)) + } + + throw(error) +} +