1
0
mirror of https://github.com/andre-wojtowicz/r-distributed-computing synced 2024-11-03 14:10:28 +01:00
r-distributed-computing/utils-parallel.R

89 lines
2.5 KiB
R

make.psock.cluster = function(names, connection.timeout, ...)
{
if (is.numeric(names))
{
names = as.integer(names[1])
if (is.na(names) || names < 1)
{
stop.script("Numeric 'names' must be >= 1")
}
names = rep("localhost", names)
}
parallel:::.check_ncores(length(names))
options = parallel:::addClusterOptions(parallel:::defaultClusterOptions,
list(...))
cl = vector("list", length(names))
for (i in seq_along(cl))
{
flog.info(paste0("[", i, "/", length(cl), "] Connecting to ",
names[[i]], " ... "))
options.copy = parallel:::addClusterOptions(options, NULL)
options.out.file = parallel:::getClusterOption("outfile", options)
if (class(options.out.file) == "lazy")
{
options.copy = parallel:::addClusterOptions(options,
list("outfile" = lazy_eval(options.out.file,
list(worker.id = i,
worker.name = names[i]))))
}
tryCatch({
cl.node =
evalWithTimeout(parallel:::newPSOCKnode(names[[i]],
options = options.copy,
rank = i),
timeout = connection.timeout,
onTimeout = "error")
cl[[i]] = cl.node
flog.info("OK")},
error = function(e) {
if ("TimeoutException" %in% class(e))
{
flog.warn("Timeout")
} else {
stop.script(e)
}
}
)
}
cl.filtered = list()
i = 1
for (j in seq_along(cl))
{
if (!is.null(cl[[j]]))
{
cl.filtered[[i]] = cl[[j]]
i = i + 1
}
}
if (length(cl) != length(cl.filtered))
{
flog.warn(paste("Unable to connect to", length(cl) - length(cl.filtered),
"nodes"))
}
if (length(cl.filtered) == 0)
{
stop.script("No remote workers")
} else {
flog.info(paste("Working on", length(cl.filtered), "nodes"))
}
class(cl.filtered) = c("SOCKcluster", "cluster")
cl.filtered
}
stop.cluster = function(cl.to.stop = cl)
{
flog.info("Workers shut down")
foreach::registerDoSEQ()
parallel::stopCluster(cl.to.stop)
}