Skip to content

Commit 4e8490a

Browse files
Add internal makeFutureBackend()
1 parent 3821a68 commit 4e8490a

File tree

3 files changed

+41
-30
lines changed

3 files changed

+41
-30
lines changed

R/backend_api-ClusterFutureBackend-class.R

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@ launchFuture.ClusterFutureBackend <- function(backend, future, ...) {
6969
future[["backend"]] <- backend
7070

7171
workers <- backend[["workers"]]
72+
stop_if_not(inherits(workers, "cluster"))
73+
7274
reg <- backend[["reg"]]
75+
stop_if_not(is.character(reg), length(reg) == 1L)
76+
7377
if (debug) {
7478
mdebug("Workers:")
7579
mstr(workers)
@@ -179,6 +183,7 @@ launchFuture.ClusterFutureBackend <- function(backend, future, ...) {
179183
worker >= 1L, worker <= length(workers)
180184
)
181185
if (debug) mdebugf(" - cluster node index: %d", worker)
186+
future[["workers"]] <- workers ## FIXME
182187
data <- getFutureData(future, debug = debug)
183188
node <- workers[[worker]]
184189
## Non-blocking cluster-node call

R/backend_api-Future-class.R

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -424,37 +424,15 @@ run.Future <- function(future, ...) {
424424
}
425425

426426
## Create temporary future for a specific backend, but don't launch it
427-
makeFuture <- plan("next")
428-
if (debug) mdebug("- Future backend: ", commaq(class(makeFuture)))
427+
evaluator <- plan("next")
428+
if (debug) mdebug("- Future backend: ", commaq(class(evaluator)))
429429

430430
## Implements a FutureBackend?
431-
backend <- attr(makeFuture, "backend")
432-
if (is.function(backend)) {
433-
if (debug) mdebug("Using FutureBackend ...")
434-
mdebug("- state: ", sQuote(future[["state"]]))
435-
on.exit(mdebug("run() for ", sQuote(class(future)[1]), " ... done"), add = TRUE)
436-
437-
if (debug) mprint(backend)
438-
439-
## Apply future plan tweaks
440-
args <- attr(makeFuture, "tweaks")
441-
if (is.null(args)) args <- list()
442-
443-
args2 <- formals(makeFuture)
444-
args2$`...` <- NULL
445-
args2$envir <- NULL
446-
args2$lazy <- NULL ## bc multisession; should be removed
447-
448-
for (name in names(args2)) {
449-
args[[name]] <- args2[[name]]
450-
}
451-
backend <- do.call(backend, args = args)
452-
if (debug) mdebug(" - FutureBackend: ", commaq(class(backend)))
431+
backend <- makeFutureBackend(evaluator)
432+
if (!is.null(backend)) {
453433
if (!inherits(backend, "FutureBackend")) {
454-
stop(sprintf("[INTERNAL ERROR] run.Future(): the 'backend' generated for the %s object is not a FutureBackend object: %s", class(makeFuture)[1], class(backend)[1]))
434+
stop(sprintf("[INTERNAL ERROR] run.Future(): the 'backend' generated for the %s object is not a FutureBackend object: %s", class(evaluator)[1], class(backend)[1]))
455435
}
456-
} else {
457-
backend <- NULL
458436
}
459437

460438
## Use new FutureBackend approach?
@@ -503,12 +481,12 @@ run.Future <- function(future, ...) {
503481
has_persistent <- ("persistent" %in% names(future))
504482
if (has_persistent) args$persistent <- future[["persistent"]]
505483

506-
tmpFuture <- do.call(makeFuture, args = args)
484+
tmpFuture <- do.call(evaluator, args = args)
507485

508486
## SPECIAL: 'cluster' takes argument 'persistent' for now /HB 2023-01-17
509487
if (has_persistent) {
510-
if (inherits(makeFuture, "cluster") &&
511-
!inherits(makeFuture, "multisession")) {
488+
if (inherits(evaluator, "cluster") &&
489+
!inherits(evaluator, "multisession")) {
512490
tmpFuture[["local"]] <- !tmpFuture[["persistent"]]
513491
} else {
514492
.Defunct(msg = "Future field 'persistent' is defunct and must not be set", package = .packageName)
@@ -959,10 +937,13 @@ getFutureBackendConfigs.ClusterFuture <- function(future, ..., debug = isTRUE(ge
959937
## Does the cluster node communicate with a connection?
960938
## (if not, it's via MPI)
961939
workers <- future[["workers"]]
940+
stop_if_not(inherits(workers, "cluster"))
962941
## AD HOC/FIXME: Here 'future[["node"]]' is yet not assigned, so we look at
963942
## the first worker and assume the others are the same. /HB 2019-10-23
964943
cl <- workers[1L]
944+
stop_if_not(inherits(cl, "cluster"))
965945
node <- cl[[1L]]
946+
stop_if_not(inherits(node, c("SOCK0node", "SOCKnode")))
966947
con <- node$con
967948
if (is.null(con)) return(list())
968949

R/backend_api-FutureBackend-class.R

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,28 @@ launchFuture <- function(backend, future, ...) {
4545
launchFuture.FutureBackend <- function(backend, future, ...) {
4646
stop(sprintf("No launchFuture() method implemented for %s", sQuote(class(backend)[1])))
4747
}
48+
49+
50+
makeFutureBackend <- function(evaluator, ...) {
51+
backend <- attr(evaluator, "backend")
52+
53+
## Old future strategies do not implement a FutureBackend
54+
if (is.null(backend)) return(NULL)
55+
56+
stop_if_not(is.function(backend))
57+
58+
## Apply future plan tweaks
59+
args <- attr(evaluator, "tweaks")
60+
if (is.null(args)) args <- list()
61+
args2 <- formals(evaluator)
62+
args2$`...` <- NULL
63+
args2$envir <- NULL
64+
args2$lazy <- NULL ## bc multisession; should be removed
65+
for (name in names(args2)) {
66+
args[[name]] <- args2[[name]]
67+
}
68+
backend <- do.call(backend, args = args)
69+
stop_if_not(inherits(backend, "FutureBackend"))
70+
71+
backend
72+
}

0 commit comments

Comments
 (0)