Skip to content

Commit 869851b

Browse files
FutureBackend cleanups: Move cluster() and multisession() to use Cluster- and MultisessionFutureBackend
1 parent fe3971a commit 869851b

11 files changed

+62
-533
lines changed

NAMESPACE

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,17 @@ S3method(launchFuture,SequentialFutureBackend)
2828
S3method(mandelbrot,matrix)
2929
S3method(mandelbrot,numeric)
3030
S3method(nbrOfFreeWorkers,"NULL")
31+
S3method(nbrOfFreeWorkers,ClusterFutureBackend)
32+
S3method(nbrOfFreeWorkers,SequentialFutureBackend)
3133
S3method(nbrOfFreeWorkers,cluster)
3234
S3method(nbrOfFreeWorkers,future)
3335
S3method(nbrOfFreeWorkers,logical)
3436
S3method(nbrOfFreeWorkers,multicore)
3537
S3method(nbrOfFreeWorkers,multiprocess)
3638
S3method(nbrOfFreeWorkers,uniprocess)
3739
S3method(nbrOfWorkers,"NULL")
40+
S3method(nbrOfWorkers,ClusterFutureBackend)
41+
S3method(nbrOfWorkers,SequentialFutureBackend)
3842
S3method(nbrOfWorkers,cluster)
3943
S3method(nbrOfWorkers,future)
4044
S3method(nbrOfWorkers,multiprocess)

R/backend_api-ClusterFuture-class.R

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,10 @@ resolved.ClusterFuture <- function(x, run = TRUE, timeout = NULL, ...) {
247247

248248
future <- x
249249
backend <- future[["backend"]]
250+
stop_if_not(inherits(backend, "FutureBackend"))
250251
workers <- backend$workers
251252
reg <- backend$reg
252-
253+
253254
## A lazy future not even launched?
254255
if (future[["state"]] == "created") {
255256
if (run) {

R/backend_api-ClusterFutureBackend-class.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
#' @keywords internal
1414
#' @rdname FutureBackend
1515
#' @export
16-
ClusterFutureBackend <- function(workers = availableWorkers(), persistent = FALSE, ...) {
16+
ClusterFutureBackend <- function(workers = availableWorkers(), persistent = FALSE, earlySignal = TRUE, ...) {
1717
if (is.function(workers)) workers <- workers()
1818
if (is.null(workers)) {
1919
getDefaultCluster <- importParallel("getDefaultCluster")

R/backend_api-FutureBackend-class.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ makeFutureBackend <- function(evaluator, ...) {
6161
args2 <- formals(evaluator)
6262
args2$`...` <- NULL
6363
args2$envir <- NULL
64-
args2$lazy <- NULL ## bc multisession; should be removed
64+
args2$lazy <- NULL ## bc multisession; should be removed
6565
for (name in names(args2)) {
6666
args[[name]] <- args2[[name]]
6767
}

R/backend_api-MulticoreFutureBackend-class.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ MulticoreFutureBackend <- function(workers = availableCores(constraints = "multi
2727
}
2828

2929
core <- FutureBackend(workers = workers, ...)
30-
core[["futureClasses"]] <- c("MulticoreFuture", core[["futureClasses"]])
30+
core[["futureClasses"]] <- c("MulticoreFuture", "MultiprocessFuture", core[["futureClasses"]])
3131
core <- structure(core, class = c("MulticoreFutureBackend", "FutureBackend", class(core)))
3232
core
3333
}

R/backend_api-cluster.R

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@
4040
#'
4141
#' @export
4242
cluster <- function(..., persistent = FALSE, workers = availableWorkers(), envir = parent.frame()) {
43-
future <- ClusterFuture(..., persistent = persistent, workers = workers, envir = envir)
44-
if (!future[["lazy"]]) future <- run(future)
45-
invisible(future)
43+
f <- Future(..., envir = envir)
44+
f[["workers"]] <- workers
45+
f[["persistent"]] <- persistent
46+
class(f) <- c("ClusterFuture", "MultiprocessFuture", "Future")
47+
f
4648
}
4749
class(cluster) <- c("cluster", "multiprocess", "future", "function")
4850
attr(cluster, "init") <- TRUE

R/backend_api-multisession.R

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,10 @@
6565
#'
6666
#' @export
6767
multisession <- function(..., workers = availableCores(), lazy = FALSE, rscript_libs = .libPaths(), envir = parent.frame()) {
68-
if (is.function(workers)) workers <- workers()
69-
workers <- structure(as.integer(workers), class = class(workers))
70-
stop_if_not(length(workers) == 1, is.finite(workers), workers >= 1)
71-
72-
## Fall back to lazy sequential futures if only a single R session can be used,
73-
## that is, then use the current main R process.
74-
if (workers == 1L && !inherits(workers, "AsIs")) {
75-
return(sequential(..., lazy = TRUE, envir = envir))
76-
}
77-
78-
future <- MultisessionFuture(..., workers = workers, lazy = lazy, rscript_libs = rscript_libs, envir = envir)
79-
if (!future[["lazy"]]) future <- run(future)
80-
invisible(future)
68+
f <- Future(..., lazy = lazy, envir = envir)
69+
f[["workers"]] <- workers
70+
class(f) <- c("MultisessionFuture", "MultiprocessFuture", "Future")
71+
f
8172
}
8273
class(multisession) <- c("multisession", "cluster", "multiprocess", "future", "function")
8374
attr(multisession, "init") <- TRUE

R/utils_api-nbrOfWorkers.R

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,27 @@ nbrOfWorkers <- function(evaluator = NULL) {
1717

1818

1919
#' @export
20-
nbrOfWorkers.cluster <- function(evaluator) {
21-
assert_no_positional_args_but_first()
22-
23-
expr <- formals(evaluator)$workers
24-
workers <- eval(expr, enclos = baseenv())
25-
if (is.function(workers)) workers <- workers()
26-
if (is.character(workers)) {
27-
stop_if_not(!anyNA(workers))
28-
workers <- length(workers)
29-
} else if (is.numeric(workers)) {
30-
} else if (inherits(workers, "cluster")) {
31-
workers <- length(workers)
32-
} else {
33-
stopf("Unsupported type of 'workers' for evaluator of class %s: %s", commaq(class(evaluator)), class(workers)[1])
34-
}
20+
nbrOfWorkers.ClusterFutureBackend <- function(evaluator) {
21+
backend <- evaluator
22+
workers <- backend[["workers"]]
23+
stop_if_not(length(workers) > 0L, inherits(workers, "cluster"))
24+
workers <- length(workers)
3525
stop_if_not(length(workers) == 1L, !is.na(workers), workers >= 1L, is.finite(workers))
36-
3726
workers
3827
}
3928

29+
#' @export
30+
nbrOfWorkers.SequentialFutureBackend <- function(evaluator) {
31+
1L
32+
}
33+
34+
#' @export
35+
nbrOfWorkers.cluster <- function(evaluator) {
36+
assert_no_positional_args_but_first()
37+
backend <- makeFutureBackend(evaluator)
38+
nbrOfWorkers(backend)
39+
}
40+
4041
#' @export
4142
nbrOfWorkers.uniprocess <- function(evaluator) {
4243
assert_no_positional_args_but_first()
@@ -106,22 +107,14 @@ nbrOfFreeWorkers <- function(evaluator = NULL, background = FALSE, ...) {
106107

107108

108109
#' @export
109-
nbrOfFreeWorkers.cluster <- function(evaluator, background = FALSE, ...) {
110-
assert_no_positional_args_but_first()
111-
112-
workers <- nbrOfWorkers(evaluator)
113-
114-
## Create a dummy, lazy future based on the future strategy ("evaluator")
115-
f <- evaluator(NULL, lazy = TRUE)
110+
nbrOfFreeWorkers.ClusterFutureBackend <- function(evaluator, ...) {
111+
backend <- evaluator
112+
workers <- backend[["workers"]]
113+
stop_if_not(length(workers) > 0L, inherits(workers, "cluster"))
114+
workers <- length(workers)
115+
reg <- backend$reg
116+
stop_if_not(length(reg) == 1L, is.character(reg), nzchar(reg))
116117

117-
## Special case
118-
if (inherits(f, "SequentialFuture")) {
119-
return(if (isTRUE(background)) 0L else 1L)
120-
}
121-
122-
name <- attr(f$workers, "name", exact = TRUE)
123-
stop_if_not(is.character(name), length(name) == 1L)
124-
reg <- sprintf("workers-%s", name)
125118
## Number of unresolved cluster futures
126119
usedNodes <- length(FutureRegistry(reg, action = "list", earlySignal = FALSE))
127120

@@ -131,6 +124,20 @@ nbrOfFreeWorkers.cluster <- function(evaluator, background = FALSE, ...) {
131124
workers
132125
}
133126

127+
#' @export
128+
nbrOfFreeWorkers.SequentialFutureBackend <- function(evaluator, background = FALSE, ...) {
129+
assert_no_positional_args_but_first()
130+
if (isTRUE(background)) 0L else 1L
131+
}
132+
133+
134+
#' @export
135+
nbrOfFreeWorkers.cluster <- function(evaluator, background = FALSE, ...) {
136+
assert_no_positional_args_but_first()
137+
backend <- makeFutureBackend(evaluator)
138+
nbrOfFreeWorkers(backend, background = background, ...)
139+
}
140+
134141

135142
#' @export
136143
nbrOfFreeWorkers.uniprocess <- function(evaluator, background = FALSE, ...) {

man/FutureBackend.Rd

Lines changed: 6 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)