Skip to content

Commit ecc8c15

Browse files
First prototype of a SequentialFutureBackend class
1 parent 4f56b20 commit ecc8c15

File tree

7 files changed

+111
-2
lines changed

7 files changed

+111
-2
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Package: future
2-
Version: 1.34.0-9140
2+
Version: 1.34.0-9200
33
Title: Unified Parallel and Distributed Processing in R for Everyone
44
Imports:
55
digest,

NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ S3method(journal,Future)
2121
S3method(journal,FutureJournal)
2222
S3method(journal,FutureJournalCondition)
2323
S3method(journal,list)
24+
S3method(launchFuture,SequentialFutureBackend)
2425
S3method(mandelbrot,matrix)
2526
S3method(mandelbrot,numeric)
2627
S3method(nbrOfFreeWorkers,"NULL")

R/backend_api-Future-class.R

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,34 @@ run.Future <- function(future, ...) {
427427
makeFuture <- plan("next")
428428
if (debug) mdebug("- Future backend: ", commaq(class(makeFuture)))
429429

430+
## Use new FutureBackend approach?
431+
if (getOption("future.backend.version", 1L) == 2L) {
432+
## Implements a FutureBackend?
433+
backend <- attr(makeFuture, "backend")
434+
if (is.function(backend)) {
435+
if (debug) mdebug("Using FutureBackend ...")
436+
mdebug("- state: ", sQuote(future[["state"]]))
437+
on.exit(mdebug("run() for ", sQuote(class(future)[1]), " ... done"), add = TRUE)
438+
439+
## Apply future plan tweaks
440+
args <- attr(makeFuture, "tweaks")
441+
if (is.null(args)) args <- list()
442+
backend <- do.call(backend, args = args)
443+
stop_if_not(inherits(backend, "FutureBackend"))
444+
445+
446+
if (debug) mdebug(" - Launching futures ...")
447+
future2 <- launchFuture(backend, future = future)
448+
if (debug) mdebug(" - Launching futures ... done")
449+
if (debug) mdebug(" - Future launched: ", commaq(class(future2)))
450+
stop_if_not(inherits(future2, "Future"))
451+
if (debug) mdebug("Using FutureBackend ... DONE")
452+
453+
return(future2)
454+
}
455+
}
456+
457+
430458
## AD HOC/WORKAROUND: /HB 2020-12-21
431459
args <- list(
432460
quote(future[["expr"]]),

R/backend_api-UniprocessFuture-class.R

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ UniprocessFuture <- function(expr = NULL, substitute = TRUE, envir = parent.fram
2222
#' @export
2323
run.UniprocessFuture <- function(future, ...) {
2424
debug <- isTRUE(getOption("future.debug"))
25-
25+
2626
if (future[["state"]] != 'created') {
2727
label <- future[["label"]]
2828
if (is.null(label)) label <- "<none>"
@@ -105,3 +105,74 @@ SequentialFuture <- function(expr = NULL, envir = parent.frame(), substitute = T
105105
f <- UniprocessFuture(expr = expr, envir = envir, substitute = FALSE, lazy = lazy, globals = globals, ...)
106106
structure(f, class = c("SequentialFuture", class(f)))
107107
}
108+
109+
110+
111+
launchFuture <- function(backend, future, ...) {
112+
UseMethod("launchFuture")
113+
}
114+
115+
#' @export
116+
launchFuture.SequentialFutureBackend <- function(backend, future, ...) {
117+
debug <- isTRUE(getOption("future.debug"))
118+
if (debug) {
119+
mdebugf("launchFuture() for %s ...", commaq(class(backend)))
120+
on.exit(mdebugf("launchFuture() for %s ... DONE", commaq(class(backend))))
121+
}
122+
123+
if (future[["state"]] != 'created') {
124+
label <- future[["label"]]
125+
if (is.null(label)) label <- "<none>"
126+
stop(FutureError(sprintf("A future ('%s') can only be launched once", label), future = future))
127+
}
128+
129+
## Assert that the process that created the future is
130+
## also the one that evaluates/resolves/queries it.
131+
assertOwner(future)
132+
133+
## Coerce to a SequentialFuture
134+
## NOTE: Has to be done before getFutureData() is called
135+
class(future) <- c("SequentialFuture", "UniprocessFuture", class(future))
136+
137+
## Launch future
138+
future[["state"]] <- "running"
139+
140+
## Get future
141+
data <- getFutureData(future, debug = debug)
142+
143+
## Apply backend tweaks
144+
split <- backend[["split"]]
145+
if (!is.null(split)) data$capture$split <- split
146+
earlySignal <- backend[["earlySignal"]]
147+
if (!is.null(earlySignal)) future$earlySignal <- earlySignal
148+
149+
future[["result"]] <- evalFuture(data)
150+
151+
future[["state"]] <- "finished"
152+
153+
if (debug) mdebugf("%s started (and completed)", class(future)[1])
154+
155+
## Always signal immediateCondition:s and as soon as possible.
156+
## They will always be signaled if they exist.
157+
signalImmediateConditions(future)
158+
159+
## Signal conditions early, iff specified for the given future
160+
signalEarly(future, collect = FALSE)
161+
162+
future
163+
}
164+
165+
SequentialFutureBackend <- function(...) {
166+
core <- new.env(parent = emptyenv())
167+
168+
## Record future plan tweaks, if any
169+
args <- list(...)
170+
for (name in names(args)) {
171+
core[[name]] <- args[[name]]
172+
}
173+
174+
core <- structure(core, class = c("SequentialFutureBackend", "FutureBackend", class(core)))
175+
176+
core
177+
}
178+

R/utils-options.R

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,4 +362,7 @@ update_package_options <- function(debug = FALSE) {
362362
## future 1.34.0:
363363
update_package_option("future.globals.objectSize.method", mode = "character", debug = debug)
364364
update_package_option("future.plan.cleanup.legacy", mode = "logical", debug = debug)
365+
366+
## future (>= 1.34.0-9000):
367+
update_package_option("future.backend.version", mode = "integer", default = 1L, debug = debug)
365368
}

R/utils_api-tweak.R

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ tweak.future <- function(strategy, ..., penvir = parent.frame()) {
134134
## Restore attributes including class
135135
attributes(strategy2) <- attrs
136136

137+
## Record what tweaks were made
138+
attr(strategy2, "tweaks") <- args
139+
137140
## Flag that it is tweaked
138141
class(strategy2) <- c("tweaked", class)
139142

R/zzz.R

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
.package <- new.env()
22

3+
## Update FutureBackend:s
4+
attr(sequential, "backend") <- SequentialFutureBackend
5+
36

47
## covr: skip=all
58
#' @importFrom utils packageVersion

0 commit comments

Comments
 (0)