1
0
mirror of https://github.com/andre-wojtowicz/r-distributed-computing synced 2024-11-03 14:10:28 +01:00

added renice, changing passwords, setting poweroff;

fixed displaying errors
This commit is contained in:
Andrzej Wójtowicz 2016-11-17 12:26:44 +01:00
parent 3acc1457b7
commit 2db3c06d02
5 changed files with 106 additions and 8 deletions

View File

@ -3,6 +3,7 @@
PARALLEL.USED.METHOD = "REMOTE" # LOCAL or REMOTE PARALLEL.USED.METHOD = "REMOTE" # LOCAL or REMOTE
PARALLEL.DISABLE.MKL.THREADS = TRUE PARALLEL.DISABLE.MKL.THREADS = TRUE
PARALLEL.RENICE = 19 # [-20; 19] or NA
# local # local
PARALLEL.LOCAL.METHOD = "PSOCK" PARALLEL.LOCAL.METHOD = "PSOCK"

View File

@ -15,7 +15,9 @@ cl = if (PARALLEL.USED.METHOD == "LOCAL")
make.psock.cluster( make.psock.cluster(
names = PARALLEL.LOCAL.NODES, names = PARALLEL.LOCAL.NODES,
connection.timeout = PARALLEL.LOCAL.CONNECTION.TIMEOUT, connection.timeout = PARALLEL.LOCAL.CONNECTION.TIMEOUT,
outfile = PARALLEL.LOCAL.SLAVE.OUT.FILE) outfile = PARALLEL.LOCAL.SLAVE.OUT.FILE,
renice = PARALLEL.RENICE
)
} else if (PARALLEL.LOCAL.METHOD == "FORK") } else if (PARALLEL.LOCAL.METHOD == "FORK")
{ {
flog.info("Creating local FORK cluster") flog.info("Creating local FORK cluster")
@ -49,7 +51,8 @@ cl = if (PARALLEL.USED.METHOD == "LOCAL")
rscript = PARALLEL.REMOTE.SLAVE.RSCRIPT.PATH, rscript = PARALLEL.REMOTE.SLAVE.RSCRIPT.PATH,
homogeneous = PARALLEL.REMOTE.SLAVE.HOMOGENEOUS, homogeneous = PARALLEL.REMOTE.SLAVE.HOMOGENEOUS,
methods = PARALLEL.REMOTE.SLAVE.METHODS, methods = PARALLEL.REMOTE.SLAVE.METHODS,
useXDR = PARALLEL.REMOTE.SLAVE.USEXDR) useXDR = PARALLEL.REMOTE.SLAVE.USEXDR,
renice = PARALLEL.RENICE)
} }
else { else {
stop.script(paste("Unknown remote parallel cluster method:", stop.script(paste("Unknown remote parallel cluster method:",

View File

@ -16,10 +16,12 @@ MRO_INSTALL_URL="https://mran.microsoft.com/install"
HOSTS_FILE="remote-hosts.txt" HOSTS_FILE="remote-hosts.txt"
CONNECTION_LIST_FILE="remote-connection-list.txt" CONNECTION_LIST_FILE="remote-connection-list.txt"
HOSTS_SCANNED_FILE="remote-hosts-scanned.txt" HOSTS_SCANNED_FILE="remote-hosts-scanned.txt"
DEBIAN_PACKAGES_TO_INSTALL="build-essential gfortran ed htop libxml2-dev ca-certificates curl libcurl4-openssl-dev gdebi-core sshpass default-jre default-jdk libpcre3-dev zlib1g-dev liblzma-dev libbz2-dev libicu-dev" DEBIAN_PACKAGES_TO_INSTALL="build-essential gfortran ed htop libxml2-dev ca-certificates curl libcurl4-openssl-dev gdebi-core sshpass default-jre default-jdk libpcre3-dev zlib1g-dev liblzma-dev libbz2-dev libicu-dev at"
REMOTE_DETECT_LOGICAL_CPUS="FALSE" REMOTE_DETECT_LOGICAL_CPUS="FALSE"
MIN_HOSTS=1 MIN_HOSTS=1
SWAP_PART="/dev/mapper/linux-swap" SWAP_PART="/dev/mapper/linux-swap"
NEW_PASS=""
POWEROFF_TIME="7:00"
SHELL_SCRIPT=$(basename $0) SHELL_SCRIPT=$(basename $0)
LOG_STEPS="logs/${SHELL_SCRIPT%.*}".log LOG_STEPS="logs/${SHELL_SCRIPT%.*}".log
@ -243,6 +245,28 @@ hosts_push_ssh_key()
check_if_command_error check_if_command_error
} }
hosts_change_password()
{
info "Changing user password on hosts"
for host in "${HOSTS_ARRAY[@]}"; do
step "-- ${host}"
try ssh ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "chpasswd <<< $SSH_USER:$NEW_PASS"
next
done
check_if_command_error
}
hosts_set_power_off()
{
info "Setting power-off on hosts"
for host in "${HOSTS_ARRAY[@]}"; do
step "-- ${host}"
try ssh ${SSH_OPTIONS} -i ${SSH_KEYS_DIR}/${SSH_KEY_PRIV} ${SSH_USER}@${host} "at $POWEROFF_TIME <<< poweroff &> /dev/null"
next
done
check_if_command_error
}
hosts_scan_available() hosts_scan_available()
{ {
HOSTS_SCANNED_ARRAY=() HOSTS_SCANNED_ARRAY=()
@ -480,11 +504,13 @@ my_configure_hosts()
#generate_ssh_keys #generate_ssh_keys
#hosts_push_ssh_key #hosts_push_ssh_key
hosts_scan_available hosts_scan_available
hosts_change_password
hosts_push_shell_script hosts_push_shell_script
dump_project_r_files dump_project_r_files
dump_r_libraries dump_r_libraries
hosts_push_project_r_files hosts_push_project_r_files
hosts_install_env hosts_install_env
hosts_set_power_off
hosts_install_mro hosts_install_mro
#hosts_install_r_libraries #hosts_install_r_libraries
hosts_push_r_libraries_dump hosts_push_r_libraries_dump
@ -496,12 +522,14 @@ configure_hosts()
{ {
generate_ssh_keys generate_ssh_keys
hosts_push_ssh_key hosts_push_ssh_key
hosts_change_password
hosts_push_shell_script hosts_push_shell_script
hosts_enable_swap hosts_enable_swap
dump_project_r_files dump_project_r_files
dump_r_libraries dump_r_libraries
hosts_push_project_r_files hosts_push_project_r_files
hosts_install_env hosts_install_env
hosts_set_power_off
hosts_install_mro hosts_install_mro
hosts_push_r_libraries_dump hosts_push_r_libraries_dump
#hosts_install_r_libraries #hosts_install_r_libraries
@ -509,6 +537,12 @@ configure_hosts()
#make_remote_connection_list_single #make_remote_connection_list_single
} }
# check if new password is set
if [ "$NEW_PASS" == "" ]; then
warn "Empty new password"
fi
# read hosts from file or stdin # read hosts from file or stdin
if [ -t 0 ]; then if [ -t 0 ]; then

View File

@ -33,9 +33,9 @@ make.psock.cluster = function(names, connection.timeout, ...)
tryCatch({ tryCatch({
cl.node = cl.node =
evalWithTimeout(parallel:::newPSOCKnode(names[[i]], evalWithTimeout(new.psock.node(names[[i]],
options = options.copy, options = options.copy,
rank = i), rank = i),
timeout = connection.timeout, timeout = connection.timeout,
onTimeout = "error") onTimeout = "error")
cl[[i]] = cl.node cl[[i]] = cl.node
@ -79,6 +79,65 @@ make.psock.cluster = function(names, connection.timeout, ...)
cl.filtered cl.filtered
} }
new.psock.node = function(machine = "localhost", ...,
options = parallel:::defaultClusterOptions, rank)
{
options <- parallel:::addClusterOptions(options, list(...))
if (is.list(machine)) {
options <- parallel:::addClusterOptions(options, machine)
machine <- machine$host
}
outfile <- parallel:::getClusterOption("outfile", options)
master <- if (machine == "localhost")
"localhost"
else parallel:::getClusterOption("master", options)
port <- parallel:::getClusterOption("port", options)
manual <- parallel:::getClusterOption("manual", options)
timeout <- parallel:::getClusterOption("timeout", options)
methods <- parallel:::getClusterOption("methods", options)
useXDR <- parallel:::getClusterOption("useXDR", options)
env <- paste0("MASTER=", master, " PORT=", port, " OUT=",
outfile, " TIMEOUT=", timeout, " XDR=", useXDR)
arg <- "parallel:::.slaveRSOCK()"
rscript <- if (parallel:::getClusterOption("homogeneous", options)) {
shQuote(parallel:::getClusterOption("rscript", options))
}
else "Rscript"
rscript_args <- parallel:::getClusterOption("rscript_args", options)
if (methods)
rscript_args <- c("--default-packages=datasets,utils,grDevices,graphics,stats,methods",
rscript_args)
cmd <- if (length(rscript_args))
paste(rscript, paste(rscript_args, collapse = " "), "-e",
shQuote(arg), env)
else paste(rscript, "-e", shQuote(arg), env)
renice <- parallel:::getClusterOption("renice", options)
if (!is.na(renice) && renice)
cmd <- sprintf("nice -%d %s", as.integer(renice), cmd)
if (manual) {
cat("Manually start worker on", machine, "with\n ",
cmd, "\n")
utils::flush.console()
}
else {
if (machine != "localhost") {
rshcmd <- parallel:::getClusterOption("rshcmd", options)
user <- parallel:::getClusterOption("user", options)
cmd <- shQuote(cmd)
cmd <- paste(rshcmd, "-l", user, machine, cmd)
}
if (.Platform$OS.type == "windows") {
system(cmd, wait = FALSE, input = "")
}
else system(cmd, wait = FALSE)
}
con <- socketConnection("localhost", port = port, server = TRUE,
blocking = TRUE, open = "a+b", timeout = timeout)
structure(list(con = con, host = machine, rank = rank), class = if (useXDR)
"SOCKnode"
else "SOCK0node")
}
stop.cluster = function(cl.to.stop = cl) stop.cluster = function(cl.to.stop = cl)
{ {
flog.info("Workers shut down") flog.info("Workers shut down")

View File

@ -17,10 +17,11 @@ stop.script = function(error)
if (is.character(error)) if (is.character(error))
{ {
flog.error(error) flog.error(error)
} else if ("message" %in% attributes(x)$names) {
flog.error(error$message)
} else { } else {
flog.error(getMessage(error)) try(flog.error(getMessage(error)), silent = TRUE)
} }
throw(error) throw(error)
} }