mirror of
https://github.com/andre-wojtowicz/r-distributed-computing
synced 2024-11-25 15:45:28 +01:00
89 lines
2.5 KiB
R
89 lines
2.5 KiB
R
make.psock.cluster = function(names, connection.timeout, ...)
|
|
{
|
|
if (is.numeric(names))
|
|
{
|
|
names = as.integer(names[1])
|
|
if (is.na(names) || names < 1)
|
|
{
|
|
stop.script("Numeric 'names' must be >= 1")
|
|
}
|
|
names = rep("localhost", names)
|
|
}
|
|
|
|
parallel:::.check_ncores(length(names))
|
|
options = parallel:::addClusterOptions(parallel:::defaultClusterOptions,
|
|
list(...))
|
|
cl = vector("list", length(names))
|
|
|
|
for (i in seq_along(cl))
|
|
{
|
|
|
|
flog.info(paste0("[", i, "/", length(cl), "] Connecting to ",
|
|
names[[i]], " ... "))
|
|
|
|
options.copy = parallel:::addClusterOptions(options, NULL)
|
|
options.out.file = parallel:::getClusterOption("outfile", options)
|
|
if (class(options.out.file) == "lazy")
|
|
{
|
|
options.copy = parallel:::addClusterOptions(options,
|
|
list("outfile" = lazy_eval(options.out.file,
|
|
list(worker.id = i,
|
|
worker.name = names[i]))))
|
|
}
|
|
|
|
tryCatch({
|
|
cl.node =
|
|
evalWithTimeout(parallel:::newPSOCKnode(names[[i]],
|
|
options = options.copy,
|
|
rank = i),
|
|
timeout = connection.timeout,
|
|
onTimeout = "error")
|
|
cl[[i]] = cl.node
|
|
flog.info("OK")},
|
|
error = function(e) {
|
|
if ("TimeoutException" %in% class(e))
|
|
{
|
|
flog.warn("Timeout")
|
|
} else {
|
|
stop.script(e)
|
|
}
|
|
}
|
|
)
|
|
}
|
|
|
|
cl.filtered = list()
|
|
i = 1
|
|
for (j in seq_along(cl))
|
|
{
|
|
if (!is.null(cl[[j]]))
|
|
{
|
|
cl.filtered[[i]] = cl[[j]]
|
|
i = i + 1
|
|
}
|
|
}
|
|
|
|
if (length(cl) != length(cl.filtered))
|
|
{
|
|
flog.warn(paste("Unable to connect to", length(cl) - length(cl.filtered),
|
|
"nodes"))
|
|
}
|
|
|
|
if (length(cl.filtered) == 0)
|
|
{
|
|
stop.script("No remote workers")
|
|
} else {
|
|
flog.info(paste("Working on", length(cl.filtered), "nodes"))
|
|
}
|
|
|
|
class(cl.filtered) = c("SOCKcluster", "cluster")
|
|
cl.filtered
|
|
}
|
|
|
|
stop.cluster = function(cl.to.stop = cl)
|
|
{
|
|
flog.info("Workers shut down")
|
|
|
|
foreach::registerDoSEQ()
|
|
parallel::stopCluster(cl.to.stop)
|
|
}
|