Skip to content

Commit 7d93ae1

Browse files
First prototype of MulticoreFutureBackend
1 parent 4ff28fc commit 7d93ae1

File tree

4 files changed

+85
-4
lines changed

4 files changed

+85
-4
lines changed

R/backend_api-ClusterFuture-class.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1009,7 +1009,7 @@ launchFuture.ClusterFutureBackend <- function(backend, future, ...) {
10091009
workers = backend[["workers"]]
10101010
)
10111011
future <- do.call(as_ClusterFuture, args = args)
1012-
class(future) <- unique(c(backend$futureClasses, class(future)))
1012+
future <- coerceFuture(backend, future)
10131013

10141014
backend0 <- future[["backend"]]
10151015

R/backend_api-MulticoreFuture-class.R

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,3 +291,75 @@ result.MulticoreFuture <- function(future, ...) {
291291

292292
result
293293
}
294+
295+
296+
297+
#' @export
298+
launchFuture.MulticoreFutureBackend <- function(backend, future, ...) {
299+
debug <- isTRUE(getOption("future.debug"))
300+
301+
if (future[["state"]] != "created") {
302+
label <- future[["label"]]
303+
if (is.null(label)) label <- "<none>"
304+
stop(FutureError(sprintf("A future ('%s') can only be launched once", label), future = future))
305+
}
306+
307+
## Assert that the process that created the future is
308+
## also the one that evaluates/resolves/queries it.
309+
assertOwner(future)
310+
311+
mcparallel <- importParallel("mcparallel")
312+
313+
future <- coerceFuture(backend, future)
314+
315+
data <- getFutureData(future, debug = debug)
316+
317+
t_start <- Sys.time()
318+
319+
## Get a free worker
320+
reg <- sprintf("multicore-%s", session_uuid())
321+
requestCore(
322+
await = function() FutureRegistry(reg, action = "collect-first", earlySignal = TRUE),
323+
workers = backend[["workers"]]
324+
)
325+
326+
if (inherits(future[[".journal"]], "FutureJournal")) {
327+
appendToFutureJournal(future,
328+
event = "getWorker",
329+
category = "other",
330+
parent = "launch",
331+
start = t_start,
332+
stop = Sys.time()
333+
)
334+
}
335+
336+
## Add to registry
337+
FutureRegistry(reg, action = "add", future = future, earlySignal = TRUE)
338+
339+
job <- local({
340+
oopts <- options(mc.cores = NULL)
341+
on.exit(options(oopts))
342+
mcparallel(evalFuture(data))
343+
})
344+
345+
future[["job"]] <- job
346+
future[["state"]] <- "running"
347+
348+
if (debug) mdebugf("%s started", class(future)[1])
349+
350+
invisible(future)
351+
}
352+
353+
354+
MulticoreFutureBackend <- function(workers, persistent = FALSE, ...) {
355+
core <- new.env(parent = emptyenv())
356+
357+
## Record future plan tweaks, if any
358+
args <- list(workers = workers, persistent = persistent, ...)
359+
for (name in names(args)) {
360+
core[[name]] <- args[[name]]
361+
}
362+
core$futureClasses <- c("MulticoreFuture", "Future")
363+
core <- structure(core, class = c("MulticoreFutureBackend", "FutureBackend", class(core)))
364+
core
365+
}

R/backend_api-UniprocessFuture-class.R

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,16 @@ SequentialFuture <- function(expr = NULL, envir = parent.frame(), substitute = T
108108

109109

110110

111+
112+
coerceFuture <- function(backend, future, ...) {
113+
UseMethod("coerceFuture")
114+
}
115+
116+
coerceFuture.FutureBackend <- function(backend, future, ...) {
117+
class(future) <- unique(c(backend$futureClasses, class(future)))
118+
future
119+
}
120+
111121
launchFuture <- function(backend, future, ...) {
112122
UseMethod("launchFuture")
113123
}
@@ -130,9 +140,7 @@ launchFuture.SequentialFutureBackend <- function(backend, future, ...) {
130140
## also the one that evaluates/resolves/queries it.
131141
assertOwner(future)
132142

133-
## Coerce to a SequentialFuture
134-
## NOTE: Has to be done before getFutureData() is called
135-
class(future) <- c("SequentialFuture", "UniprocessFuture", class(future))
143+
future <- coerceFuture(backend, future)
136144

137145
## Launch future
138146
future[["state"]] <- "running"

R/zzz.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
attr(sequential, "backend") <- SequentialFutureBackend
55
attr(cluster, "backend") <- ClusterFutureBackend
66
attr(multisession, "backend") <- MultisessionFutureBackend
7+
attr(multicore, "backend") <- MulticoreFutureBackend
78

89

910
## covr: skip=all

0 commit comments

Comments
 (0)