Skip to content

Commit fd82434

Browse files
Merge branch 'develop' of github.com:HenrikBengtsson/future into develop
2 parents d527890 + 58b0660 commit fd82434

File tree

6 files changed

+247
-31
lines changed

6 files changed

+247
-31
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Package: future
2-
Version: 1.49.0-9040
2+
Version: 1.49.0-9042
33
Title: Unified Parallel and Distributed Processing in R for Everyone
44
Depends:
55
R (>= 3.2.0)

R/utils-options.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@
241241
#' future.output.windows.reencode
242242
#' future.journal
243243
#' future.globals.objectSize.method
244+
#' future.ClusterFuture.clusterEvalQ
244245
#'
245246
#' R_FUTURE_STARTUP_SCRIPT
246247
#' R_FUTURE_DEBUG
@@ -267,6 +268,7 @@
267268
#' R_FUTURE_OUTPUT_WINDOWS_REENCODE
268269
#' R_FUTURE_JOURNAL
269270
#' R_FUTURE_GLOBALS_OBJECTSIZE_METHOD
271+
#' R_FUTURE_CLUSTERFUTURE_CLUSTEREVALQ
270272
#'
271273
#' future.cmdargs
272274
#' .future.R
@@ -461,4 +463,6 @@ update_package_options <- function(debug = FALSE) {
461463
update_package_option("future.globals.method.default", mode = "character", split = ",", default = c("ordered", "dfs"), debug = debug)
462464

463465
update_package_option("future.debug.indent", mode = "character", default = " ", debug = debug)
466+
467+
update_package_option("future.ClusterFuture.clusterEvalQ", mode = "character", default = "warning", debug = debug)
464468
}

R/utils_api-makeClusterFuture.R

Lines changed: 223 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,10 @@
6363
#' `clusterEvalQ(cl[1:2], ...)`, and `clusterEvalQ(cl[2:1], ...)` in
6464
#' the above example will all give an error.
6565
#'
66-
#' That said, there will be no error produced when calling
67-
#' `clusterEvalQ(cl, { a <- 42 })`, but we can still not rely on
68-
#' variable `a` being available in following parallel tasks. Again,
69-
#' this is because each parallel task, including the above ones, may
70-
#' be processes on random or transient parallel workers.
71-
#'
72-
#' One exception to the latter limitation is `clusterSetRNGStream()`,
73-
#' which can be safely used with future clusters. See below for more
74-
#' details.
66+
#' Exceptions to the latter limitation are `clusterSetRNGStream()`
67+
#' and `clusterExport()`, which can be safely used with future clusters.
68+
#' See below for more details.
69+
#' If `clusterEvalQ()` is called, a warning is produced.
7570
#'
7671
#' @section clusterSetRNGStream:
7772
#' [parallel::clusterSetRNGStream()] distributes "L'Ecuyer-CMRG" RNG
@@ -82,6 +77,11 @@
8277
#' makes sure `clusterSetRNGStream()` has the expected effect although
8378
#' futures are stateless.
8479
#'
80+
#' @section clusterExport:
81+
#' [parallel::clusterExport()] assign values to the cluster nodes.
82+
#' Specifically, these values are recorded and are used as globals
83+
#' for all futures created there on.
84+
#'
8585
#' @aliases FUTURE
8686
#' @keywords internal
8787
#'
@@ -112,25 +112,56 @@ makeClusterFuture <- function(specs = nbrOfWorkers(), ...) {
112112
stop("All arguments must be named")
113113
}
114114
}
115+
if (is.null(options[["globals"]])) {
116+
options[["globals"]] <- formals(future)[["globals"]]
117+
}
115118

119+
env <- new.env(parent = emptyenv())
120+
env[["backend"]] <- backend
121+
116122
cl <- vector("list", length = n)
117123
for (kk in seq_along(cl)) {
118124
node <- new.env(parent = emptyenv())
125+
node[["index"]] <- kk
119126
node[["options"]] <- options
120127
node[["backend"]] <- backend
128+
node[["cluster_env"]] <- env
121129
class(node) <- c("FutureNode")
122130
cl[[kk]] <- node
123131
}
124-
attr(cl, "backend") <- backend
132+
attr(cl, "cluster_env") <- env
125133
class(cl) <- c("FutureCluster", "cluster")
134+
env[["cluster"]] <- cl
126135
cl
127136
}
128137

129138

139+
#' @importFrom utils str
130140
#' @rawNamespace if (getRversion() >= "4.4") S3method(print,FutureCluster)
131141
print.FutureCluster <- function(x, ...) {
132142
cat(sprintf("A %s cluster with %d node\n", sQuote(class(x)[1]), length(x)))
133-
backend <- attr(x, "backend")
143+
144+
cluster_env <- attr(x, "cluster_env")
145+
exports <- cluster_env[["exports"]]
146+
names <- names(exports)
147+
types <- vapply(exports, FUN.VALUE = NA_character_, FUN = typeof)
148+
info <- sprintf("%s (%s)", names, types)
149+
cat(sprintf("Exports: [n=%d] %s\n", length(exports), comma(info)))
150+
151+
clusterEvalQs <- cluster_env[["clusterEvalQs"]]
152+
n <- length(clusterEvalQs)
153+
if (n > 0) {
154+
cat(sprintf("clusterEvalQ() calls ignored: [n=%d]:\n", n))
155+
if (n > 3) clusterEvalQs <- clusterEvalQs[1:3]
156+
exprs <- lapply(clusterEvalQs, FUN = function(x) {
157+
expr <- x[["expression"]]
158+
attributes(expr) <- NULL
159+
expr
160+
})
161+
str(exprs)
162+
}
163+
164+
backend <- cluster_env[["backend"]]
134165
print(backend)
135166

136167
plan_backend <- plan("backend")
@@ -165,17 +196,19 @@ sendData.FutureNode <- function(node, data) {
165196
##
166197
## => sendData(con, data = list(type = "EXEC", data = list(fun = fun, args = args, return = TRUE), tag = NULL))
167198

168-
debug <- isTRUE(getOption("parallel.future.debug"))
199+
index <- node[["index"]]
200+
201+
debug <- isTRUE(getOption("future.debug"))
169202
if (debug) {
170-
message(sprintf("sendData() for %s ...", class(node)[1]))
171-
on.exit(message(sprintf("sendData() for %s ... done", class(node)[1])))
203+
message(sprintf("sendData() for %s #%d ...", class(node)[1], index))
204+
on.exit(message(sprintf("sendData() for %s %d ... done", class(node)[1], index)))
172205
}
173206

174207
type <- data[["type"]]
175208
if (debug) message(sprintf("| type: %s", sQuote(type)))
176209

177210
## Assert that future backend has not changed
178-
backend <- node[["backend"]]
211+
backend <- node[["cluster_env"]][["backend"]]
179212
plan_backend <- plan("backend")
180213
if (!identical(plan_backend[["uuid"]], backend[["uuid"]])) {
181214
stop(FutureError(
@@ -201,13 +234,84 @@ sendData.FutureNode <- function(node, data) {
201234
## parallel:::recvResult() expects element 'value'
202235
node[["future"]] <- ConstantFuture(list(value = NULL), seed = seed, substitute = FALSE)
203236
return(invisible(node))
237+
}
238+
239+
## SPECIAL CASE #2: Called via clusterExport()?
240+
if (called_via_clusterExport()) {
241+
if (debug) message("Detected: clusterExport()")
242+
args <- data[["args"]]
243+
if (debug) message(sprintf("Exports: [n=%d] %s", length(args), commaq(names(args))))
244+
cluster_env <- node[["cluster_env"]]
245+
exports <- cluster_env[["exports"]]
246+
if (is.null(exports)) exports <- list()
247+
## Append <name>=<value> to 'exports'
248+
name <- args[[1]]
249+
value <- args[[2]]
250+
exports[[name]] <- value
251+
cluster_env[["exports"]] <- exports
252+
ns <- getNamespace("future")
253+
ConstantFuture <- get("ConstantFuture", mode = "function", envir = ns, inherits = FALSE)
254+
## parallel:::recvResult() expects element 'value'
255+
node[["future"]] <- ConstantFuture(list(value = NULL), substitute = FALSE)
256+
return(invisible(node))
257+
}
258+
259+
## SPECIAL CASE #3: Called via clusterEvalQ()?
260+
if (index == 1L && called_via_clusterEvalQ()) {
261+
if (debug) message("Detected: clusterEvalQ()")
262+
args <- data[["args"]]
263+
expr <- args[[1]]
264+
calls <- sys.calls()
265+
if (debug) {
266+
message("Expression:")
267+
mprint(expr)
268+
}
269+
270+
cluster_env <- node[["cluster_env"]]
271+
272+
## Record ignored clusterEvalQ() expressions
273+
clusterEvalQs <- cluster_env[["clusterEvalQs"]]
274+
if (is.null(clusterEvalQs)) clusterEvalQs <- list()
275+
call <- list(expression = expr, calls = calls)
276+
clusterEvalQs <- c(clusterEvalQs, list(call))
277+
cluster_env[["clusterEvalQs"]] <- clusterEvalQs
278+
279+
## Warn about ignored clusterEvalQ() call?
280+
action <- getOption("future.ClusterFuture.clusterEvalQ", "warning")
281+
if (action != "ignore") {
282+
cluster <- cluster_env[["cluster"]]
283+
code <- deparse(expr)
284+
code <- paste(code, collapse = " ")
285+
code <- substring(code, first = 1L, last = 30L)
286+
code <- gsub(" +", " ", code)
287+
msg <- sprintf("parallel::clusterEvalQ() is not supported by %s clusters. Ignoring expression: %s", class(cluster)[[1]], code)
288+
if (action == "warning") {
289+
warning(FutureWarning(msg))
290+
} else if (action == "error") {
291+
stop(FutureError(msg))
292+
}
293+
}
204294
}
205295

206296
options <- node[["options"]]
207297
if ("seed" %in% names(node)) {
208298
options[["seed"]] <- node[["seed"]]
209299
}
210-
300+
301+
cluster_env <- node[["cluster_env"]]
302+
exports <- cluster_env[["exports"]]
303+
if (length(exports) > 0) {
304+
globals <- options[["globals"]]
305+
if (is.logical(globals)) {
306+
attr(globals, "add") <- c(exports, attr(globals, "add"))
307+
} else if (is.character(globals)) {
308+
attr(globals, "add") <- c(exports, attr(globals, "add"))
309+
} else if (is.list(globals)) {
310+
globals <- c(exports, globals)
311+
}
312+
options[["globals"]] <- globals
313+
}
314+
211315
node[["future"]] <- local({
212316
if (debug) {
213317
message("| Create future ...")
@@ -225,9 +329,11 @@ sendData.FutureNode <- function(node, data) {
225329
}
226330
fun <- data[["fun"]]
227331
args <- data[["args"]]
332+
228333
expr <- quote(do.call(fun, args = args))
229334
future_args <- list(expr = quote(expr), substitute = FALSE)
230335
future_args <- c(future_args, options)
336+
231337
if (debug) {
232338
out <- capture.output(str(list(args = future_args)))
233339
out <- sprintf("| : %s", out)
@@ -262,7 +368,7 @@ sendData.FutureNode <- function(node, data) {
262368
#' @rawNamespace if (getRversion() >= "4.4") importFrom(parallel,recvData)
263369
#' @rawNamespace if (getRversion() >= "4.4") S3method(recvData,FutureNode)
264370
recvData.FutureNode <- function(node) {
265-
debug <- isTRUE(getOption("parallel.future.debug"))
371+
debug <- isTRUE(getOption("future.debug"))
266372
if (debug) {
267373
message(sprintf("recvData() for %s ...", class(node)[1]))
268374
on.exit(message(sprintf("recvData() for %s ... done", class(node)[1])))
@@ -278,7 +384,7 @@ recvData.FutureNode <- function(node) {
278384
print(utils::ls.str(result))
279385
}
280386

281-
if ("seed" %in% names(node)) {
387+
if ("seed" %in% names(node) && !is.null(result[["seed"]])) {
282388
if (debug) mdebug("Updating the node's RNG state")
283389
node[["seed"]] <- result[["seed"]]
284390
}
@@ -326,4 +432,102 @@ called_via_clusterSetRNGStream <- function(calls = sys.calls()) {
326432
return(TRUE)
327433
}
328434
FALSE
329-
}
435+
} ## called_via_clusterSetRNGStream()
436+
437+
438+
439+
440+
# Dotted pair list of 6
441+
# $ : language clusterExport(cl, varlist = c("a", "b"))
442+
# $ : language clusterCall(cl, gets, name, get(name, envir = envir))
443+
# $ : language sendCall(cl[[i]], fun, list(...))
444+
# $ : language postNode(con, "EXEC", list(fun = fun, args = args, return = return, tag = tag))
445+
# $ : language sendData(con, list(type = type, data = value, tag = tag))
446+
# $ : language sendData.FutureNode(con, list(type = type, data = value, tag = tag))
447+
called_via_clusterExport <- function(calls = sys.calls()) {
448+
finds <- c("sendData", "postNode", "sendCall", "clusterCall")
449+
nfinds <- length(finds)
450+
ncalls <- length(calls)
451+
452+
## Not possible?
453+
if (ncalls <= nfinds + 1L) return(FALSE)
454+
455+
ii <- 1L
456+
find <- as.symbol(finds[ii])
457+
458+
found <- FALSE
459+
for (jj in ncalls:1) {
460+
call <- calls[[jj]][[1]]
461+
462+
if (identical(call, find)) {
463+
if (ii == nfinds) {
464+
## First passage done
465+
found <- TRUE
466+
break
467+
}
468+
ii <- ii + 1L
469+
find <- as.symbol(finds[ii])
470+
} else if (ii > 1L) {
471+
return(FALSE)
472+
}
473+
}
474+
if (!found) return(FALSE)
475+
jj <- jj - 1L
476+
477+
call <- calls[[jj]][[1]]
478+
if (identical(call, as.symbol("clusterExport"))) {
479+
return(TRUE)
480+
}
481+
if (identical(call, quote(parallel::clusterExport))) {
482+
return(TRUE)
483+
}
484+
FALSE
485+
} ## called_via_clusterExport()
486+
487+
488+
# Dotted pair list of 6
489+
# $ : language clusterEvalQ(cl, 42)
490+
# $ : language clusterCall(cl, eval, substitute(expr), envir = .GlobalEnv)
491+
# $ : language sendCall(cl[[i]], fun, list(...))
492+
# $ : language postNode(con, "EXEC", list(fun = fun, args = args, return = return, tag = tag))
493+
# $ : language sendData(con, list(type = type, data = value, tag = tag))
494+
# $ : language sendData.FutureNode(con, list(type = type, data = value, tag = tag))
495+
called_via_clusterEvalQ <- function(calls = sys.calls()) {
496+
finds <- c("sendData", "postNode", "sendCall", "clusterCall")
497+
nfinds <- length(finds)
498+
ncalls <- length(calls)
499+
500+
## Not possible?
501+
if (ncalls <= nfinds + 1L) return(FALSE)
502+
503+
ii <- 1L
504+
find <- as.symbol(finds[ii])
505+
506+
found <- FALSE
507+
for (jj in ncalls:1) {
508+
call <- calls[[jj]][[1]]
509+
510+
if (identical(call, find)) {
511+
if (ii == nfinds) {
512+
## First passage done
513+
found <- TRUE
514+
break
515+
}
516+
ii <- ii + 1L
517+
find <- as.symbol(finds[ii])
518+
} else if (ii > 1L) {
519+
return(FALSE)
520+
}
521+
}
522+
if (!found) return(FALSE)
523+
jj <- jj - 1L
524+
525+
call <- calls[[jj]][[1]]
526+
if (identical(call, as.symbol("clusterEvalQ"))) {
527+
return(TRUE)
528+
}
529+
if (identical(call, quote(parallel::clusterEvalQ))) {
530+
return(TRUE)
531+
}
532+
FALSE
533+
} ## called_via_clusterEvalQ()

inst/testme/test-makeClusterFuture.R

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
if (getRversion() >= "4.4.0") {
22
library(future)
33
library(parallel)
4-
4+
5+
a <- 42
6+
57
FUN <- function(x) {
68
message("Process ID: ", Sys.getpid())
7-
mean(rnorm(n = x))
9+
list(a = a, mean = mean(rnorm(n = x)))
810
}
911

1012
message("makeCluster():")
1113
plan(multisession)
1214
cl <- makeCluster(2)
1315
set.seed(42)
1416
clusterSetRNGStream(cl)
17+
clusterExport(cl, "a")
1518
y <- list()
1619
for (kk in 1:3) y[[kk]] <- parLapply(cl, 11:13, FUN)
1720
str(y)
@@ -23,6 +26,7 @@ if (getRversion() >= "4.4.0") {
2326
cl <- makeClusterFuture()
2427
set.seed(42)
2528
clusterSetRNGStream(cl)
29+
clusterExport(cl, "a")
2630
y <- list()
2731
for (kk in 1:3) y[[kk]] <- parLapply(cl, 11:13, FUN)
2832
str(y)

0 commit comments

Comments
 (0)