r-distributed-computing/utils-parallel.R

148 lines
5.0 KiB
R

make.psock.cluster = function(names, connection.timeout, ...)
{
if (is.numeric(names))
{
names = as.integer(names[1])
if (is.na(names) || names < 1)
{
stop.script("Numeric 'names' must be >= 1")
}
names = rep("localhost", names)
}
parallel:::.check_ncores(length(names))
options = parallel:::addClusterOptions(parallel:::defaultClusterOptions,
list(...))
cl = vector("list", length(names))
for (i in seq_along(cl))
{
flog.info(paste0("[", i, "/", length(cl), "] Connecting to ",
names[[i]], " ... "))
options.copy = parallel:::addClusterOptions(options, NULL)
options.out.file = parallel:::getClusterOption("outfile", options)
if (class(options.out.file) == "lazy")
{
options.copy = parallel:::addClusterOptions(options,
list("outfile" = lazy_eval(options.out.file,
list(worker.id = i,
worker.name = names[i]))))
}
tryCatch({
cl.node =
evalWithTimeout(new.psock.node(names[[i]],
options = options.copy,
rank = i),
timeout = connection.timeout,
onTimeout = "error")
cl[[i]] = cl.node
flog.info("OK")},
error = function(e) {
if ("TimeoutException" %in% class(e))
{
flog.warn("Timeout")
} else {
stop.script(e)
}
}
)
}
cl.filtered = list()
i = 1
for (j in seq_along(cl))
{
if (!is.null(cl[[j]]))
{
cl.filtered[[i]] = cl[[j]]
i = i + 1
}
}
if (length(cl) != length(cl.filtered))
{
flog.warn(paste("Unable to connect to", length(cl) - length(cl.filtered),
"nodes"))
}
if (length(cl.filtered) == 0)
{
stop.script("No remote workers")
} else {
flog.info(paste("Working on", length(cl.filtered), "nodes"))
}
class(cl.filtered) = c("SOCKcluster", "cluster")
cl.filtered
}
new.psock.node = function(machine = "localhost", ...,
options = parallel:::defaultClusterOptions, rank)
{
options <- parallel:::addClusterOptions(options, list(...))
if (is.list(machine)) {
options <- parallel:::addClusterOptions(options, machine)
machine <- machine$host
}
outfile <- parallel:::getClusterOption("outfile", options)
master <- if (machine == "localhost")
"localhost"
else parallel:::getClusterOption("master", options)
port <- parallel:::getClusterOption("port", options)
manual <- parallel:::getClusterOption("manual", options)
timeout <- parallel:::getClusterOption("timeout", options)
methods <- parallel:::getClusterOption("methods", options)
useXDR <- parallel:::getClusterOption("useXDR", options)
env <- paste0("MASTER=", master, " PORT=", port, " OUT=",
outfile, " TIMEOUT=", timeout, " XDR=", useXDR)
arg <- "parallel:::.slaveRSOCK()"
rscript <- if (parallel:::getClusterOption("homogeneous", options)) {
shQuote(parallel:::getClusterOption("rscript", options))
}
else "Rscript"
rscript_args <- parallel:::getClusterOption("rscript_args", options)
if (methods)
rscript_args <- c("--default-packages=datasets,utils,grDevices,graphics,stats,methods",
rscript_args)
cmd <- if (length(rscript_args))
paste(rscript, paste(rscript_args, collapse = " "), "-e",
shQuote(arg), env)
else paste(rscript, "-e", shQuote(arg), env)
renice <- parallel:::getClusterOption("renice", options)
if (!is.na(renice) && renice)
cmd <- sprintf("nice -%d %s", as.integer(renice), cmd)
if (manual) {
cat("Manually start worker on", machine, "with\n ",
cmd, "\n")
utils::flush.console()
}
else {
if (machine != "localhost") {
rshcmd <- parallel:::getClusterOption("rshcmd", options)
user <- parallel:::getClusterOption("user", options)
cmd <- shQuote(cmd)
cmd <- paste(rshcmd, "-l", user, machine, cmd)
}
if (.Platform$OS.type == "windows") {
system(cmd, wait = FALSE, input = "")
}
else system(cmd, wait = FALSE)
}
con <- socketConnection("localhost", port = port, server = TRUE,
blocking = TRUE, open = "a+b", timeout = timeout)
structure(list(con = con, host = machine, rank = rank), class = if (useXDR)
"SOCKnode"
else "SOCK0node")
}
stop.cluster = function(cl.to.stop = cl)
{
flog.info("Workers shut down")
foreach::registerDoSEQ()
parallel::stopCluster(cl.to.stop)
}