Skip to content

Commit 6811016

Browse files
Record R options future.wait.{timeout,interval,alpha} when setting up the Cluster- and MulicoreFutureBackend
1 parent f1ea180 commit 6811016

File tree

3 files changed

+37
-9
lines changed

3 files changed

+37
-9
lines changed

R/backend_api-ClusterFutureBackend-class.R

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,17 @@ ClusterFutureBackend <- function(workers = availableWorkers(), persistent = FALS
5050
## Name of the FutureRegistry
5151
reg <- sprintf("workers-%s", name)
5252

53-
core <- FutureBackend(workers = workers, persistent = persistent, reg = reg, earlySignal = earlySignal, gc = gc, ...)
53+
core <- FutureBackend(
54+
workers = workers,
55+
persistent = persistent,
56+
reg = reg,
57+
earlySignal = earlySignal,
58+
gc = gc,
59+
future.wait.timeout = getOption("future.wait.timeout", 30 * 24 * 60 * 60),
60+
future.wait.interval = getOption("future.wait.interval", 0.01),
61+
future.wait.alpha = getOption("future.wait.alpha", 1.01),
62+
...
63+
)
5464
core[["futureClasses"]] <- c("ClusterFuture", core[["futureClasses"]])
5565
core <- structure(core, class = c("ClusterFutureBackend", "FutureBackend", class(core)))
5666
core
@@ -86,9 +96,13 @@ launchFuture.ClusterFutureBackend <- function(backend, future, ...) {
8696
## (1) Get a free worker. This will block until one is available
8797
if (debug) mdebug("requestWorker() ...")
8898

99+
timeout <- backend[["future.wait.timeout"]]
100+
delta <- backend[["future.wait.interval"]]
101+
alpha <- backend[["future.wait.alpha"]]
102+
89103
node_idx <- requestNode(await = function() {
90104
FutureRegistry(reg, action = "collect-first", earlySignal = TRUE)
91-
}, workers = workers)
105+
}, workers = workers, timeout = timeout, delta = delta, alpha = alpha)
92106
future[["node"]] <- node_idx
93107

94108
if (inherits(future[[".journal"]], "FutureJournal")) {

R/backend_api-MulticoreFuture-class.R

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,13 +228,18 @@ launchFuture.MulticoreFutureBackend <- function(backend, future, ...) {
228228
data <- getFutureData(future, debug = debug)
229229

230230
t_start <- Sys.time()
231-
231+
232+
workers <- backend[["workers"]]
233+
reg <- backend[["reg"]]
234+
235+
timeout <- backend[["future.wait.timeout"]]
236+
delta <- backend[["future.wait.interval"]]
237+
alpha <- backend[["future.wait.alpha"]]
238+
232239
## Get a free worker
233-
reg <- sprintf("multicore-%s", session_uuid())
234-
requestCore(
235-
await = function() FutureRegistry(reg, action = "collect-first", earlySignal = TRUE),
236-
workers = backend[["workers"]]
237-
)
240+
requestCore(await = function() {
241+
FutureRegistry(reg, action = "collect-first", earlySignal = TRUE)
242+
}, workers = workers, timeout = timeout, delta = delta, alpha = alpha)
238243

239244
if (inherits(future[[".journal"]], "FutureJournal")) {
240245
appendToFutureJournal(future,

R/backend_api-MulticoreFutureBackend-class.R

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,16 @@ MulticoreFutureBackend <- function(workers = availableCores(constraints = "multi
2727
return(SequentialFutureBackend(...))
2828
}
2929

30-
core <- FutureBackend(workers = workers, ...)
30+
reg <- sprintf("multicore-%s", session_uuid())
31+
32+
core <- FutureBackend(
33+
workers = workers,
34+
reg = reg,
35+
future.wait.timeout = getOption("future.wait.timeout", 30 * 24 * 60 * 60),
36+
future.wait.interval = getOption("future.wait.interval", 0.01),
37+
future.wait.alpha = getOption("future.wait.alpha", 1.01),
38+
...
39+
)
3140
core[["futureClasses"]] <- c("MulticoreFuture", "MultiprocessFuture", core[["futureClasses"]])
3241
core <- structure(core, class = c("MulticoreFutureBackend", "FutureBackend", class(core)))
3342
core

0 commit comments

Comments
 (0)