Skip to content

Commit 4c71d9f

Browse files
run() for Future does the common validation and setup now
1 parent 6c25fd5 commit 4c71d9f

9 files changed

+93
-80
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Package: future
2-
Version: 1.34.0-9206
2+
Version: 1.34.0-9207
33
Title: Unified Parallel and Distributed Processing in R for Everyone
44
Imports:
55
digest,

R/backend_api-ClusterFuture-class.R

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ as_ClusterFuture <- function(future, workers = NULL, ...) {
9696

9797
#' @export
9898
run.ClusterFuture <- function(future, ...) {
99-
if (getOption("future.backend.version", 1L) == 2L) {
99+
if (getOption("future.backend.version", 2L) == 2L) {
100100
return(NextMethod())
101101
}
102102

@@ -113,8 +113,8 @@ run.ClusterFuture <- function(future, ...) {
113113
assertOwner(future)
114114

115115
backend <- future[["backend"]]
116-
workers <- backend$workers
117-
reg <- backend$reg
116+
workers <- backend[["workers"]]
117+
reg <- backend[["reg"]]
118118

119119
data <- getFutureData(future)
120120
persistent <- isTRUE(future[["persistent"]])
@@ -369,6 +369,7 @@ receiveMessageFromWorker <- function(future, ...) {
369369
recvResult <- importParallel("recvResult")
370370

371371
backend <- future[["backend"]]
372+
stop_if_not(inherits(backend, "FutureBackend"))
372373
workers <- backend$workers
373374
reg <- backend$reg
374375

R/backend_api-ClusterFutureBackend-class.R

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,26 @@ ClusterFutureBackend <- function(workers = availableWorkers(), persistent = FALS
3131
stopf("Argument 'workers' is not of class 'cluster': %s", commaq(class(workers)))
3232
}
3333
stop_if_not(length(workers) > 0)
34-
35-
core <- FutureBackend(workers = workers, persistent = persistent, ...)
34+
35+
## Attached workers' session information, unless already done.
36+
## FIXME: We cannot do this here, because it introduces a race condition
37+
## where multiple similar requests may appear at the same time bringing
38+
## the send/receive data to be out of sync and therefore corrupt the
39+
## futures' values.
40+
## workers <- add_cluster_session_info(workers)
41+
42+
## Attach name to cluster?
43+
name <- attr(workers, "name", exact = TRUE)
44+
if (is.null(name)) {
45+
name <- digest(workers)
46+
stop_if_not(length(name) > 0, nzchar(name))
47+
attr(workers, "name") <- name
48+
}
49+
50+
## Name of the FutureRegistry
51+
reg <- sprintf("workers-%s", name)
52+
53+
core <- FutureBackend(workers = workers, persistent = persistent, reg = reg, ...)
3654
core[["futureClasses"]] <- c("ClusterFuture", core[["futureClasses"]])
3755
core <- structure(core, class = c("ClusterFutureBackend", "FutureBackend", class(core)))
3856
core
@@ -47,24 +65,23 @@ launchFuture.ClusterFutureBackend <- function(backend, future, ...) {
4765
on.exit(mdebug("launchFuture() for ClusterFutureBackend ... done"))
4866
}
4967

50-
## Coerce Future to ClusterFuture
51-
args <- list(
52-
future,
53-
workers = backend[["workers"]]
54-
)
55-
future <- do.call(as_ClusterFuture, args = args)
56-
class(future) <- backend[["futureClasses"]]
68+
## Record 'backend' in future for now
69+
future[["backend"]] <- backend
70+
71+
workers <- backend[["workers"]]
72+
reg <- backend[["reg"]]
73+
if (debug) {
74+
mdebug("Workers:")
75+
mstr(workers)
76+
mdebug("FutureRegistry: ", sQuote(reg))
77+
}
5778

5879
## Next available cluster node
5980
t_start <- Sys.time()
6081

6182
## (1) Get a free worker. This will block until one is available
6283
if (debug) mdebug("requestWorker() ...")
6384

64-
## FIXME: backend[["workers"]] != future[["workers"]]
65-
workers <- future[["workers"]]
66-
reg <- sprintf("workers-%s", attr(workers, "name", exact = TRUE))
67-
6885
node_idx <- requestNode(await = function() {
6986
FutureRegistry(reg, action = "collect-first", earlySignal = TRUE)
7087
}, workers = workers)

R/backend_api-Future-class.R

Lines changed: 51 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -427,43 +427,57 @@ run.Future <- function(future, ...) {
427427
makeFuture <- plan("next")
428428
if (debug) mdebug("- Future backend: ", commaq(class(makeFuture)))
429429

430-
## Use new FutureBackend approach?
431-
if (getOption("future.backend.version", 2L) == 2L) {
432-
## Implements a FutureBackend?
433-
backend <- attr(makeFuture, "backend")
434-
if (is.function(backend)) {
435-
if (debug) mdebug("Using FutureBackend ...")
436-
mdebug("- state: ", sQuote(future[["state"]]))
437-
on.exit(mdebug("run() for ", sQuote(class(future)[1]), " ... done"), add = TRUE)
438-
439-
if (debug) mprint(backend)
440-
441-
## Apply future plan tweaks
442-
args <- attr(makeFuture, "tweaks")
443-
if (is.null(args)) args <- list()
444-
445-
args2 <- formals(makeFuture)
446-
args2$`...` <- NULL
447-
args2$envir <- NULL
448-
args2$lazy <- NULL ## bc multisession; should be removed
449-
450-
for (name in names(args2)) {
451-
args[[name]] <- args2[[name]]
452-
}
453-
backend <- do.call(backend, args = args)
454-
if (debug) mdebug(" - FutureBackend: ", commaq(class(backend)))
455-
stop_if_not(inherits(backend, "FutureBackend"))
430+
## Implements a FutureBackend?
431+
backend <- attr(makeFuture, "backend")
432+
if (is.function(backend)) {
433+
if (debug) mdebug("Using FutureBackend ...")
434+
mdebug("- state: ", sQuote(future[["state"]]))
435+
on.exit(mdebug("run() for ", sQuote(class(future)[1]), " ... done"), add = TRUE)
456436

437+
if (debug) mprint(backend)
457438

458-
if (debug) mdebug(" - Launching futures ...")
459-
future2 <- launchFuture(backend, future = future)
460-
if (debug) mdebug(" - Launching futures ... done")
461-
if (debug) mdebug(" - Future launched: ", commaq(class(future2)))
462-
stop_if_not(inherits(future2, "Future"))
463-
if (debug) mdebug("Using FutureBackend ... DONE")
464-
465-
return(future2)
439+
## Apply future plan tweaks
440+
args <- attr(makeFuture, "tweaks")
441+
if (is.null(args)) args <- list()
442+
443+
args2 <- formals(makeFuture)
444+
args2$`...` <- NULL
445+
args2$envir <- NULL
446+
args2$lazy <- NULL ## bc multisession; should be removed
447+
448+
for (name in names(args2)) {
449+
args[[name]] <- args2[[name]]
466450
}
451+
backend <- do.call(backend, args = args)
452+
if (debug) mdebug(" - FutureBackend: ", commaq(class(backend)))
453+
stop_if_not(inherits(backend, "FutureBackend"))
454+
} else {
455+
backend <- NULL
456+
}
457+
458+
## Use new FutureBackend approach?
459+
if (inherits(backend, "FutureBackend") && getOption("future.backend.version", 2L) == 2L) {
460+
if (future[["state"]] != "created") {
461+
label <- future[["label"]]
462+
if (is.null(label)) label <- "<none>"
463+
stop(FutureError(sprintf("A future ('%s') can only be launched once", label), future = future))
464+
}
465+
466+
## Assert that the process that created the future is
467+
## also the one that evaluates/resolves/queries it.
468+
assertOwner(future)
469+
470+
## Coerce to target Future class
471+
class(future) <- backend[["futureClasses"]]
472+
473+
if (debug) mdebug(" - Launching futures ...")
474+
future2 <- launchFuture(backend, future = future)
475+
if (debug) mdebug(" - Launching futures ... done")
476+
if (debug) mdebug(" - Future launched: ", commaq(class(future2)))
477+
stop_if_not(inherits(future2, "Future"))
478+
if (debug) mdebug("Using FutureBackend ... DONE")
479+
480+
return(future2)
467481
}
468482

469483

@@ -533,6 +547,9 @@ run.Future <- function(future, ...) {
533547
future <- run(future)
534548
if (debug) mdebug("- Launch lazy future ... done")
535549
}
550+
551+
## Set FutureBackend, if it exists
552+
future[["backend"]] <- backend
536553

537554
## Sanity check: This method was only called for lazy futures
538555
stop_if_not(future[["state"]] != "created", future[["lazy"]])

R/backend_api-FutureBackend-class.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ FutureBackend <- function(...) {
1717
core[[name]] <- args[[name]]
1818
}
1919

20-
core[["futureClasses"]] <- c("FutureBackend")
20+
core[["futureClasses"]] <- c("Future")
2121
core <- structure(core, class = c("FutureBackend", class(core)))
2222
core
2323
}

R/backend_api-MulticoreFutureBackend-class.R

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,9 @@ MulticoreFutureBackend <- function(workers = availableCores(constraints = "multi
3636
#' @export
3737
launchFuture.MulticoreFutureBackend <- function(backend, future, ...) {
3838
debug <- isTRUE(getOption("future.debug"))
39-
40-
if (future[["state"]] != "created") {
41-
label <- future[["label"]]
42-
if (is.null(label)) label <- "<none>"
43-
stop(FutureError(sprintf("A future ('%s') can only be launched once", label), future = future))
44-
}
45-
46-
## Assert that the process that created the future is
47-
## also the one that evaluates/resolves/queries it.
48-
assertOwner(future)
4939

5040
mcparallel <- importParallel("mcparallel")
5141

52-
class(future) <- backend[["futureClasses"]]
53-
5442
data <- getFutureData(future, debug = debug)
5543

5644
t_start <- Sys.time()

R/backend_api-SequentialFutureBackend-class.R

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,6 @@ launchFuture.SequentialFutureBackend <- function(backend, future, ...) {
2424
on.exit(mdebugf("launchFuture() for %s ... DONE", commaq(class(backend))))
2525
}
2626

27-
if (future[["state"]] != 'created') {
28-
label <- future[["label"]]
29-
if (is.null(label)) label <- "<none>"
30-
stop(FutureError(sprintf("A future ('%s') can only be launched once", label), future = future))
31-
}
32-
33-
## Assert that the process that created the future is
34-
## also the one that evaluates/resolves/queries it.
35-
assertOwner(future)
36-
37-
class(future) <- backend[["futureClasses"]]
38-
3927
## Launch future
4028
future[["state"]] <- "running"
4129

tests/invalid-owner.R

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ options(future.debug = FALSE)
33

44
## Local functions
55
usedNodes <- function(future) {
6+
backend <- future[["backend"]]
67
## Number of unresolved cluster futures
7-
workers <- future$workers
8-
reg <- sprintf("workers-%s", attr(workers, "name"))
8+
workers <- backend[["workers"]]
9+
reg <- backend[["reg"]]
910
c(used = length(future:::FutureRegistry(reg, action = "list")), total = length(workers))
1011
}
1112

tests/multisession.R

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ library("listenv")
33

44
message("*** multisession() ...")
55

6-
for (cores in 1:availCores) {
6+
#for (cores in 1:availCores) {
7+
for (cores in 2L) {
78
message(sprintf("Testing with %d cores ...", cores))
89
options(mc.cores = cores)
910

@@ -12,7 +13,7 @@ for (cores in 1:availCores) {
1213
42L
1314
})
1415
print(f)
15-
stopifnot(inherits(f, "ClusterFuture") || (inherits(f, "SequentialFuture") && f$lazy))
16+
stopifnot(inherits(f, "ClusterFuture") || inherits(f, "SequentialFuture"))
1617

1718
print(resolved(f))
1819
y <- value(f)

0 commit comments

Comments
 (0)