Skip to content

Commit 3886998

Browse files
First prototype of ClusterFutureBackend and MultisessionFutureBackend classes
1 parent ecc8c15 commit 3886998

11 files changed

+191
-96
lines changed

NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ S3method(journal,Future)
2121
S3method(journal,FutureJournal)
2222
S3method(journal,FutureJournalCondition)
2323
S3method(journal,list)
24+
S3method(launchFuture,ClusterFutureBackend)
2425
S3method(launchFuture,SequentialFutureBackend)
2526
S3method(mandelbrot,matrix)
2627
S3method(mandelbrot,numeric)

R/backend_api-ClusterFuture-class.R

Lines changed: 153 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ as_ClusterFuture <- function(future, workers = NULL, ...) {
7272
## futures' values.
7373
## workers <- add_cluster_session_info(workers)
7474

75-
backend <- ClusterFutureBackend(workers, persistent =isTRUE(future[["persistent"]]))
75+
backend <- ClusterFutureBackend0(workers, persistent = isTRUE(future[["persistent"]]))
7676

7777
future[["backend"]] <- backend
7878

@@ -87,11 +87,11 @@ as_ClusterFuture <- function(future, workers = NULL, ...) {
8787

8888
#' @export
8989
run.ClusterFuture <- function(future, ...) {
90-
debug <- isTRUE(getOption("future.debug"))
91-
if (debug) {
92-
mdebug("run.ClusterFuture() ...")
93-
on.exit(mdebug("run.ClusterFuture() ... done"))
90+
if (getOption("future.backend.version", 1L) == 2L) {
91+
return(NextMethod())
9492
}
93+
94+
debug <- getOption("future.debug", FALSE)
9595

9696
if (future[["state"]] != 'created') {
9797
label <- future[["label"]]
@@ -103,31 +103,84 @@ run.ClusterFuture <- function(future, ...) {
103103
## also the one that evaluates/resolves/queries it.
104104
assertOwner(future)
105105

106-
backend <- future[["backend"]]
106+
workers <- future[["workers"]]
107+
data <- getFutureData(future)
108+
persistent <- isTRUE(future[["persistent"]])
109+
110+
## FutureRegistry to use
111+
reg <- sprintf("workers-%s", attr(workers, "name", exact = TRUE))
107112

108113
## Next available cluster node
109114
t_start <- Sys.time()
115+
node_idx <- requestNode(await = function() {
116+
FutureRegistry(reg, action = "collect-first", earlySignal = TRUE)
117+
}, workers = workers)
118+
future[["node"]] <- node_idx
110119

111-
## (1) Get a free worker. This will block until one is available
112-
backend$requestWorker(future)
113-
114-
## (2) Attach packages that needs to be attached
115-
## NOTE: Already take care of by evalFuture().
116-
## However, if we need to get an early error about missing packages,
117-
## we can get the error here before launching the future.
118-
if (future[["earlySignal"]]) {
119-
backend$requirePackages(future = future)
120+
## Cluster node to use
121+
cl <- workers[node_idx]
122+
123+
if (inherits(future[[".journal"]], "FutureJournal")) {
124+
appendToFutureJournal(future,
125+
event = "getWorker",
126+
category = "overhead",
127+
parent = "launch",
128+
start = t_start,
129+
stop = Sys.time()
130+
)
120131
}
121132

122-
## (2) Reset global environment of cluster node such that
133+
134+
## (i) Reset global environment of cluster node such that
123135
## previous futures are not affecting this one, which
124136
## may happen even if the future is evaluated inside a
125137
## local, e.g. local({ a <<- 1 }).
126-
## If the persistent = TRUE, this will be skipped.
127-
backend$eraseGlobalEnvironment(future = future)
138+
if (!persistent) {
139+
t_start <- Sys.time()
140+
cluster_call_blocking(cl, fun = grmall, future = future, when = "call grmall() on")
141+
if (inherits(future[[".journal"]], "FutureJournal")) {
142+
appendToFutureJournal(future,
143+
event = "eraseWorker",
144+
category = "overhead",
145+
parent = "launch",
146+
start = t_start,
147+
stop = Sys.time()
148+
)
149+
}
150+
}
128151

129-
## (3) Launch future
130-
backend$launchFuture(future)
152+
153+
## (ii) Attach packages that needs to be attached
154+
## NOTE: Already take care of by evalFuture().
155+
## However, if we need to get an early error about missing packages,
156+
## we can get the error here before launching the future.
157+
t_start <- Sys.time()
158+
packages <- future[["packages"]]
159+
if (future[["earlySignal"]] && length(packages) > 0) {
160+
if (debug) mdebugf("Attaching %d packages (%s) on cluster node #%d ...",
161+
length(packages), hpaste(sQuote(packages)), node_idx)
162+
163+
cluster_call_blocking(cl, fun = requirePackages, packages, future = future, when = "call requirePackages() on")
164+
165+
if (debug) mdebugf("Attaching %d packages (%s) on cluster node #%d ... DONE",
166+
length(packages), hpaste(sQuote(packages)), node_idx)
167+
}
168+
169+
if (inherits(future[[".journal"]], "FutureJournal")) {
170+
appendToFutureJournal(future,
171+
event = "attachPackages",
172+
category = "overhead",
173+
parent = "launch",
174+
start = t_start,
175+
stop = Sys.time()
176+
)
177+
}
178+
179+
## Add to registry
180+
FutureRegistry(reg, action = "add", future = future, earlySignal = FALSE)
181+
182+
## (iv) Launch future
183+
node_call_nonblocking(cl[[1L]], fun = evalFuture, args = list(data), when = "launch future on")
131184

132185
future[["state"]] <- 'running'
133186

@@ -318,7 +371,7 @@ receiveMessageFromWorker <- function(future, ...) {
318371
if (future[["gc"]]) {
319372
if (debug) mdebug("- Garbage collecting worker ...")
320373
## Cleanup global environment while at it
321-
if (!future[["persistent"]]) {
374+
if (!isTRUE(future[["persistent"]])) {
322375
## Blocking cluster-node call
323376
cluster_call_blocking(cl[1], fun = grmall, future = future, when = "call grmall() on")
324377
}
@@ -517,6 +570,12 @@ post_mortem_cluster_failure <- function(ex, when, node, future) {
517570

518571
## (4) POST-MORTEM ANALYSIS:
519572
postmortem <- list()
573+
574+
## (a) Inspect the 'reason' for known clues
575+
if (grepl("ignoring SIGPIPE signal", reason)) {
576+
postmortem$sigpipe <- "The SIGPIPE error suggests that the R socket connection to the parallel worker broke, which can happen for different reasons, e.g. the parallel worker crashed"
577+
}
578+
520579
## (a) Did the worker process terminate?
521580
if (!is.null(host) && is.numeric(pid)) {
522581
if (localhost) {
@@ -646,7 +705,7 @@ assertValidConnection <- function(future) {
646705

647706

648707

649-
ClusterFutureBackend <- local({
708+
ClusterFutureBackend0 <- local({
650709
indexOf <- function(futures, future) {
651710
for (ii in seq_along(futures)) {
652711
if (identical(future, futures[[ii]])) return(ii)
@@ -915,6 +974,77 @@ ClusterFutureBackend <- local({
915974
}
916975
)
917976
}
918-
}) ## ClusterFutureBackend()
977+
}) ## ClusterFutureBackend0()
919978

920979

980+
981+
982+
#' @export
983+
launchFuture.ClusterFutureBackend <- function(backend, future, ...) {
984+
debug <- isTRUE(getOption("future.debug"))
985+
if (debug) {
986+
mdebug("launchFuture() for ClusterFutureBackend ...")
987+
on.exit(mdebug("launchFuture() for ClusterFutureBackend ... done"))
988+
}
989+
990+
## Coerce Future to ClusterFuture
991+
args <- list(
992+
future,
993+
workers = backend[["workers"]]
994+
)
995+
future <- do.call(as_ClusterFuture, args = args)
996+
class(future) <- unique(c(backend$futureClasses, class(future)))
997+
998+
backend0 <- future[["backend"]]
999+
1000+
## Next available cluster node
1001+
t_start <- Sys.time()
1002+
1003+
## (1) Get a free worker. This will block until one is available
1004+
backend0$requestWorker(future)
1005+
1006+
## (2) Attach packages that needs to be attached
1007+
## NOTE: Already take care of by evalFuture().
1008+
## However, if we need to get an early error about missing packages,
1009+
## we can get the error here before launching the future.
1010+
if (future[["earlySignal"]]) {
1011+
backend0$requirePackages(future = future)
1012+
}
1013+
1014+
## (2) Reset global environment of cluster node such that
1015+
## previous futures are not affecting this one, which
1016+
## may happen even if the future is evaluated inside a
1017+
## local, e.g. local({ a <<- 1 }).
1018+
## If the persistent = TRUE, this will be skipped.
1019+
backend0$eraseGlobalEnvironment(future = future)
1020+
1021+
## (3) Launch future
1022+
backend0$launchFuture(future)
1023+
1024+
future[["state"]] <- "running"
1025+
1026+
if (debug) mdebugf("%s started", class(future)[1])
1027+
1028+
invisible(future)
1029+
}
1030+
1031+
1032+
ClusterFutureBackend <- function(workers, persistent = FALSE, ...) {
1033+
core <- new.env(parent = emptyenv())
1034+
1035+
## Record future plan tweaks, if any
1036+
args <- list(workers = workers, persistent = persistent, ...)
1037+
for (name in names(args)) {
1038+
core[[name]] <- args[[name]]
1039+
}
1040+
core$futureClasses <- c("ClusterFuture", "Future")
1041+
core <- structure(core, class = c("ClusterFutureBackend", "FutureBackend", class(core)))
1042+
core
1043+
}
1044+
1045+
MultisessionFutureBackend <- function(workers, ...) {
1046+
core <- ClusterFutureBackend(workers = workers, ...)
1047+
core$futureClasses <- c("MultisessionFuture", core$futureClasses)
1048+
core <- structure(core, class = c("MultisessionFutureBackend", class(core)))
1049+
core
1050+
}

R/backend_api-Future-class.R

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -395,9 +395,9 @@ assertOwner <- local({
395395
run.Future <- function(future, ...) {
396396
debug <- isTRUE(getOption("future.debug"))
397397
if (debug) {
398-
mdebug("run() for ", sQuote(class(future)[1]), " ...")
398+
mdebugf("run() for Future (%s) ...", sQuote(class(future)[1]))
399399
mdebug("- state: ", sQuote(future[["state"]]))
400-
on.exit(mdebug("run() for ", sQuote(class(future)[1]), " ... done"), add = TRUE)
400+
on.exit(mdebugf("run() for Future (%s) ... done", sQuote(class(future)[1])), add = TRUE)
401401
}
402402

403403
if (future[["state"]] != "created") {
@@ -408,7 +408,7 @@ run.Future <- function(future, ...) {
408408
}
409409

410410
## Sanity check: This method should only called for lazy futures
411-
stop_if_not(future[["lazy"]])
411+
# stop_if_not(future[["lazy"]])
412412

413413
if (is.null(future[["owner"]])) {
414414
future[["owner"]] <- session_uuid()
@@ -436,10 +436,19 @@ run.Future <- function(future, ...) {
436436
mdebug("- state: ", sQuote(future[["state"]]))
437437
on.exit(mdebug("run() for ", sQuote(class(future)[1]), " ... done"), add = TRUE)
438438

439+
if (debug) mprint(backend)
440+
439441
## Apply future plan tweaks
440442
args <- attr(makeFuture, "tweaks")
441443
if (is.null(args)) args <- list()
444+
args2 <- formals(makeFuture)
445+
args2[["..."]] <- NULL
446+
args2[["envir"]] <- NULL
447+
for (name in names(args2)) {
448+
args[[name]] <- args2[[name]]
449+
}
442450
backend <- do.call(backend, args = args)
451+
if (debug) mdebug(" - FutureBackend: ", commaq(class(backend)))
443452
stop_if_not(inherits(backend, "FutureBackend"))
444453

445454

R/backend_api-UniprocessFuture-class.R

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ launchFuture.SequentialFutureBackend <- function(backend, future, ...) {
162162
future
163163
}
164164

165-
SequentialFutureBackend <- function(...) {
165+
FutureBackend <- function(...) {
166166
core <- new.env(parent = emptyenv())
167167

168168
## Record future plan tweaks, if any
@@ -171,8 +171,15 @@ SequentialFutureBackend <- function(...) {
171171
core[[name]] <- args[[name]]
172172
}
173173

174-
core <- structure(core, class = c("SequentialFutureBackend", "FutureBackend", class(core)))
175-
174+
core$futureClasses <- c("FutureBackend")
175+
core <- structure(core, class = c("FutureBackend", class(core)))
176+
}
177+
178+
179+
SequentialFutureBackend <- function(...) {
180+
core <- FutureBackend(...)
181+
core$futureClasses <- c("SequentialFuture", "UniprocessFuture", core$futureClasses)
182+
core <- structure(core, class = c("SequentialFutureBackend", class(core)))
176183
core
177184
}
178185

R/utils-prune_pkg_code.R

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ prune_fcn <- function(name, envir) {
2626
body0 <- body(fcn)
2727
body <- walkAST(body0, call = prune_fcns)
2828
if (!identical(body, body0)) {
29+
attrs <- attributes(fcn)
2930
body(fcn) <- body
31+
attributes(fcn) <- attrs ## attributes are lost if body is changed
3032
assign(name, fcn, envir = envir, inherits = FALSE)
3133
return(TRUE)
3234
}

R/utils_api-tweak.R

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ tweak.future <- function(strategy, ..., penvir = parent.frame()) {
134134
## Restore attributes including class
135135
attributes(strategy2) <- attrs
136136

137-
## Record what tweaks were made
137+
## Append whatever tweaks were made
138+
args <- c(attr(strategy, "tweaks"), args)
138139
attr(strategy2, "tweaks") <- args
139140

140141
## Flag that it is tweaked

R/zzz.R

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## Update FutureBackend:s
44
attr(sequential, "backend") <- SequentialFutureBackend
5+
attr(cluster, "backend") <- ClusterFutureBackend
6+
attr(multisession, "backend") <- MultisessionFutureBackend
57

68

79
## covr: skip=all
@@ -13,7 +15,7 @@ attr(sequential, "backend") <- SequentialFutureBackend
1315
if (isTRUE(as.logical(Sys.getenv("R_FUTURE_PRUNE_PKG_CODE", "FALSE")))) {
1416
prune_pkg_code()
1517
}
16-
18+
1719
update_package_option("future.debug", mode = "logical")
1820
debug <- isTRUE(getOption("future.debug"))
1921

@@ -31,7 +33,6 @@ attr(sequential, "backend") <- SequentialFutureBackend
3133
## .GlobalEnv$.Random.seed.
3234
session_uuid(attributes = FALSE)
3335

34-
3536
## Report on future plan, if set
3637
strategy <- getOption("future.plan")
3738
if (!is.null(strategy)) {
@@ -44,7 +45,6 @@ attr(sequential, "backend") <- SequentialFutureBackend
4445
}
4546
}
4647

47-
4848
args <- parseCmdArgs()
4949
p <- args$p
5050
if (!is.null(p)) {

0 commit comments

Comments
 (0)