mirror of
https://github.com/andre-wojtowicz/r-distributed-computing
synced 2024-11-03 14:10:28 +01:00
148 lines
5.0 KiB
R
148 lines
5.0 KiB
R
make.psock.cluster = function(names, connection.timeout, ...)
|
|
{
|
|
if (is.numeric(names))
|
|
{
|
|
names = as.integer(names[1])
|
|
if (is.na(names) || names < 1)
|
|
{
|
|
stop.script("Numeric 'names' must be >= 1")
|
|
}
|
|
names = rep("localhost", names)
|
|
}
|
|
|
|
parallel:::.check_ncores(length(names))
|
|
options = parallel:::addClusterOptions(parallel:::defaultClusterOptions,
|
|
list(...))
|
|
cl = vector("list", length(names))
|
|
|
|
for (i in seq_along(cl))
|
|
{
|
|
|
|
flog.info(paste0("[", i, "/", length(cl), "] Connecting to ",
|
|
names[[i]], " ... "))
|
|
|
|
options.copy = parallel:::addClusterOptions(options, NULL)
|
|
options.out.file = parallel:::getClusterOption("outfile", options)
|
|
if (class(options.out.file) == "lazy")
|
|
{
|
|
options.copy = parallel:::addClusterOptions(options,
|
|
list("outfile" = lazy_eval(options.out.file,
|
|
list(worker.id = i,
|
|
worker.name = names[i]))))
|
|
}
|
|
|
|
tryCatch({
|
|
cl.node =
|
|
evalWithTimeout(new.psock.node(names[[i]],
|
|
options = options.copy,
|
|
rank = i),
|
|
timeout = connection.timeout,
|
|
onTimeout = "error")
|
|
cl[[i]] = cl.node
|
|
flog.info("OK")},
|
|
error = function(e) {
|
|
if ("TimeoutException" %in% class(e))
|
|
{
|
|
flog.warn("Timeout")
|
|
} else {
|
|
stop.script(e)
|
|
}
|
|
}
|
|
)
|
|
}
|
|
|
|
cl.filtered = list()
|
|
i = 1
|
|
for (j in seq_along(cl))
|
|
{
|
|
if (!is.null(cl[[j]]))
|
|
{
|
|
cl.filtered[[i]] = cl[[j]]
|
|
i = i + 1
|
|
}
|
|
}
|
|
|
|
if (length(cl) != length(cl.filtered))
|
|
{
|
|
flog.warn(paste("Unable to connect to", length(cl) - length(cl.filtered),
|
|
"nodes"))
|
|
}
|
|
|
|
if (length(cl.filtered) == 0)
|
|
{
|
|
stop.script("No remote workers")
|
|
} else {
|
|
flog.info(paste("Working on", length(cl.filtered), "nodes"))
|
|
}
|
|
|
|
class(cl.filtered) = c("SOCKcluster", "cluster")
|
|
cl.filtered
|
|
}
|
|
|
|
new.psock.node = function(machine = "localhost", ...,
|
|
options = parallel:::defaultClusterOptions, rank)
|
|
{
|
|
options <- parallel:::addClusterOptions(options, list(...))
|
|
if (is.list(machine)) {
|
|
options <- parallel:::addClusterOptions(options, machine)
|
|
machine <- machine$host
|
|
}
|
|
outfile <- parallel:::getClusterOption("outfile", options)
|
|
master <- if (machine == "localhost")
|
|
"localhost"
|
|
else parallel:::getClusterOption("master", options)
|
|
port <- parallel:::getClusterOption("port", options)
|
|
manual <- parallel:::getClusterOption("manual", options)
|
|
timeout <- parallel:::getClusterOption("timeout", options)
|
|
methods <- parallel:::getClusterOption("methods", options)
|
|
useXDR <- parallel:::getClusterOption("useXDR", options)
|
|
env <- paste0("MASTER=", master, " PORT=", port, " OUT=",
|
|
outfile, " TIMEOUT=", timeout, " XDR=", useXDR)
|
|
arg <- "parallel:::.slaveRSOCK()"
|
|
rscript <- if (parallel:::getClusterOption("homogeneous", options)) {
|
|
shQuote(parallel:::getClusterOption("rscript", options))
|
|
}
|
|
else "Rscript"
|
|
rscript_args <- parallel:::getClusterOption("rscript_args", options)
|
|
if (methods)
|
|
rscript_args <- c("--default-packages=datasets,utils,grDevices,graphics,stats,methods",
|
|
rscript_args)
|
|
cmd <- if (length(rscript_args))
|
|
paste(rscript, paste(rscript_args, collapse = " "), "-e",
|
|
shQuote(arg), env)
|
|
else paste(rscript, "-e", shQuote(arg), env)
|
|
renice <- parallel:::getClusterOption("renice", options)
|
|
if (!is.na(renice) && renice)
|
|
cmd <- sprintf("nice -%d %s", as.integer(renice), cmd)
|
|
if (manual) {
|
|
cat("Manually start worker on", machine, "with\n ",
|
|
cmd, "\n")
|
|
utils::flush.console()
|
|
}
|
|
else {
|
|
if (machine != "localhost") {
|
|
rshcmd <- parallel:::getClusterOption("rshcmd", options)
|
|
user <- parallel:::getClusterOption("user", options)
|
|
cmd <- shQuote(cmd)
|
|
cmd <- paste(rshcmd, "-l", user, machine, cmd)
|
|
}
|
|
if (.Platform$OS.type == "windows") {
|
|
system(cmd, wait = FALSE, input = "")
|
|
}
|
|
else system(cmd, wait = FALSE)
|
|
}
|
|
con <- socketConnection("localhost", port = port, server = TRUE,
|
|
blocking = TRUE, open = "a+b", timeout = timeout)
|
|
structure(list(con = con, host = machine, rank = rank), class = if (useXDR)
|
|
"SOCKnode"
|
|
else "SOCK0node")
|
|
}
|
|
|
|
stop.cluster = function(cl.to.stop = cl)
|
|
{
|
|
flog.info("Workers shut down")
|
|
|
|
foreach::registerDoSEQ()
|
|
parallel::stopCluster(cl.to.stop)
|
|
}
|