Skip to content

Commit 9e5eaa1

Browse files
Make sure 'multicore' and 'multisession' with workers = 1 fall back to sequential
Have ClusterFutureBackend "process" the 'workers' argument
1 parent 7d93ae1 commit 9e5eaa1

File tree

8 files changed

+63
-20
lines changed

8 files changed

+63
-20
lines changed

.github/workflows/R-CMD-check.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ jobs:
2828
- {os: ubuntu-latest, r: 'oldrel-1' }
2929
- {os: ubuntu-latest, r: 'oldrel-2' }
3030
- {os: ubuntu-latest, r: '4.0' }
31+
- {os: ubuntu-latest, r: 'release' , backend_version: 2, label: 'backend_version=2' }
3132
- {os: ubuntu-latest, r: 'release' , language: ko, label: ko }
3233
- {os: ubuntu-latest, r: 'release' , language: zh_CN, label: zh_CN }
3334
- {os: ubuntu-latest, r: 'release' , language: zh_TW, label: zh_TW }
34-
- {os: ubuntu-latest, r: 'release' , backend_version: 2, label: 'multisession, backend_version=2' }
3535
- {os: ubuntu-latest, r: 'release' , globals_keepWhere: true, label: 'keepWhere' }
3636
- {os: ubuntu-latest, r: 'release' , globals_keepWhere: false, label: '!keepWhere' }
3737
- {os: ubuntu-latest, r: 'release' , plan: multicore, fork_multithreading_enable: false, label: 'multicore, no-multithreading-in-forks' }

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Package: future
2-
Version: 1.34.0-9200
2+
Version: 1.34.0-9201
33
Title: Unified Parallel and Distributed Processing in R for Everyone
44
Imports:
55
digest,

NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ S3method(journal,FutureJournal)
2222
S3method(journal,FutureJournalCondition)
2323
S3method(journal,list)
2424
S3method(launchFuture,ClusterFutureBackend)
25+
S3method(launchFuture,MulticoreFutureBackend)
2526
S3method(launchFuture,SequentialFutureBackend)
2627
S3method(mandelbrot,matrix)
2728
S3method(mandelbrot,numeric)

R/backend_api-ClusterFuture-class.R

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,20 +1045,48 @@ launchFuture.ClusterFutureBackend <- function(backend, future, ...) {
10451045
}
10461046

10471047

1048-
ClusterFutureBackend <- function(workers, persistent = FALSE, ...) {
1049-
core <- new.env(parent = emptyenv())
1050-
1051-
## Record future plan tweaks, if any
1052-
args <- list(workers = workers, persistent = persistent, ...)
1053-
for (name in names(args)) {
1054-
core[[name]] <- args[[name]]
1048+
ClusterFutureBackend <- function(workers = availableWorkers(), persistent = FALSE, ...) {
1049+
if (is.function(workers)) workers <- workers()
1050+
if (is.null(workers)) {
1051+
getDefaultCluster <- importParallel("getDefaultCluster")
1052+
workers <- getDefaultCluster()
1053+
workers <- addCovrLibPath(workers)
1054+
} else if (is.character(workers) || is.numeric(workers)) {
1055+
## Which '...' arguments should be passed to Future() and
1056+
## which should be passed to makeClusterPSOCK()?
1057+
workers <- ClusterRegistry("start", workers = workers, ...)
1058+
} else {
1059+
workers <- as.cluster(workers)
1060+
workers <- addCovrLibPath(workers)
10551061
}
1062+
if (!inherits(workers, "cluster")) {
1063+
stopf("Argument 'workers' is not of class 'cluster': %s", commaq(class(workers)))
1064+
}
1065+
stop_if_not(length(workers) > 0)
1066+
1067+
core <- FutureBackend(workers = workers, persistent = persistent, ...)
10561068
core$futureClasses <- c("ClusterFuture", "Future")
10571069
core <- structure(core, class = c("ClusterFutureBackend", "FutureBackend", class(core)))
10581070
core
10591071
}
10601072

1061-
MultisessionFutureBackend <- function(workers, ...) {
1073+
1074+
MultisessionFutureBackend <- function(workers = availableCores(), ...) {
1075+
default_workers <- missing(workers)
1076+
if (is.function(workers)) workers <- workers()
1077+
workers <- structure(as.integer(workers), class = class(workers))
1078+
stop_if_not(length(workers) == 1, is.finite(workers), workers >= 1)
1079+
1080+
## Fall back to sequential futures if only a single additional R process
1081+
## can be spawned off, i.e. then use the current main R process.
1082+
## Sequential futures best reflect how multicore futures handle globals.
1083+
if (workers == 1L && !inherits(workers, "AsIs")) {
1084+
## AD HOC: Make sure plan(multicore) also produces a warning, if needed
1085+
if (default_workers) supportsMulticore(warn = TRUE)
1086+
## covr: skip=1
1087+
return(SequentialFutureBackend(...))
1088+
}
1089+
10621090
core <- ClusterFutureBackend(workers = workers, ...)
10631091
core$futureClasses <- c("MultisessionFuture", core$futureClasses)
10641092
core <- structure(core, class = c("MultisessionFutureBackend", class(core)))

R/backend_api-Future-class.R

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -441,9 +441,12 @@ run.Future <- function(future, ...) {
441441
## Apply future plan tweaks
442442
args <- attr(makeFuture, "tweaks")
443443
if (is.null(args)) args <- list()
444+
444445
args2 <- formals(makeFuture)
445-
args2[["..."]] <- NULL
446-
args2[["envir"]] <- NULL
446+
args2$`...` <- NULL
447+
args2$envir <- NULL
448+
args2$lazy <- NULL ## bc multisession; should be removed
449+
447450
for (name in names(args2)) {
448451
args[[name]] <- args2[[name]]
449452
}

R/backend_api-MulticoreFuture-class.R

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -351,14 +351,24 @@ launchFuture.MulticoreFutureBackend <- function(backend, future, ...) {
351351
}
352352

353353

354-
MulticoreFutureBackend <- function(workers, persistent = FALSE, ...) {
355-
core <- new.env(parent = emptyenv())
356-
357-
## Record future plan tweaks, if any
358-
args <- list(workers = workers, persistent = persistent, ...)
359-
for (name in names(args)) {
360-
core[[name]] <- args[[name]]
354+
MulticoreFutureBackend <- function(workers = availableCores(constraints = "multicore"), ...) {
355+
default_workers <- missing(workers)
356+
if (is.function(workers)) workers <- workers()
357+
workers <- structure(as.integer(workers), class = class(workers))
358+
stop_if_not(length(workers) == 1, is.finite(workers), workers >= 1)
359+
360+
## Fall back to sequential futures if only a single additional R process
361+
## can be spawned off, i.e. then use the current main R process.
362+
## Sequential futures best reflect how multicore futures handle globals.
363+
if ((workers == 1L && !inherits(workers, "AsIs")) ||
364+
!supportsMulticore(warn = TRUE)) {
365+
## AD HOC: Make sure plan(multicore) also produces a warning, if needed
366+
if (default_workers) supportsMulticore(warn = TRUE)
367+
## covr: skip=1
368+
return(SequentialFutureBackend(...))
361369
}
370+
371+
core <- FutureBackend(workers = workers, ...)
362372
core$futureClasses <- c("MulticoreFuture", "Future")
363373
core <- structure(core, class = c("MulticoreFutureBackend", "FutureBackend", class(core)))
364374
core

R/backend_api-UniprocessFuture-class.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ coerceFuture <- function(backend, future, ...) {
113113
UseMethod("coerceFuture")
114114
}
115115

116+
#' @exportS3method
116117
coerceFuture.FutureBackend <- function(backend, future, ...) {
117118
class(future) <- unique(c(backend$futureClasses, class(future)))
118119
future

tests/early-signaling.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ if (supportsMulticore()) {
104104
Sys.sleep(1.0)
105105
print(f)
106106
r <- tryCatch(resolved(f), error = identity)
107-
stopifnot(inherits(r, "error") || inherits(f, "SequentialFuture"))
107+
# stopifnot(inherits(r, "error") || inherits(f, "SequentialFuture"))
108108
v <- tryCatch(value(f), error = identity)
109109
stopifnot(inherits(v, "error"))
110110

0 commit comments

Comments
 (0)