Skip to content

Commit fe3971a

Browse files
extract local functions from plan(); easier to debug
1 parent 4e8490a commit fe3971a

File tree

1 file changed

+145
-139
lines changed

1 file changed

+145
-139
lines changed

R/utils_api-plan.R

Lines changed: 145 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,144 @@
1+
assert_no_disallowed_strategies <- function(stack) {
2+
noplans <- getOption("future.plan.disallow")
3+
if (length(noplans) == 0L) return()
4+
5+
for (kk in seq_along(stack)) {
6+
evaluator <- stack[[kk]]
7+
if (!inherits(evaluator, noplans)) next
8+
clazz <- class(evaluator)[1]
9+
if (!clazz %in% noplans) next ## <== sic!
10+
11+
stop(FutureError(sprintf("Can not use %s in the future plan because it is on the list of future strategies that are not allow per option 'future.plan.disallow': %s", sQuote(clazz), commaq(noplans))))
12+
}
13+
}
14+
15+
16+
evaluator_uses <- function(evaluator, strategy) {
17+
if (!inherits(evaluator, strategy)) return(FALSE)
18+
## NOTE: Yes, we are indeed inspecting the 'class' attribute itself
19+
class <- class(evaluator)
20+
if (class[1] == strategy) return(TRUE)
21+
if (length(class) == 1L) return(FALSE)
22+
if (class[1] == "tweaked" && class[2] == strategy) return(TRUE)
23+
## Special case for strategy == "multiprocess"
24+
if (strategy == "multiprocess" && class[length(class)] == strategy) return(TRUE)
25+
FALSE
26+
}
27+
28+
29+
warn_about_multicore <- local({
30+
.warn <- TRUE
31+
32+
function(stack) {
33+
if (!.warn) return()
34+
35+
## Is 'multicore' used despite not being supported on the current
36+
## platform?
37+
for (kk in seq_along(stack)) {
38+
if (evaluator_uses(stack[[kk]], "multicore")) {
39+
supportsMulticore(warn = TRUE)
40+
## Warn only once, if at all
41+
.warn <<- FALSE
42+
break
43+
}
44+
}
45+
}
46+
})
47+
48+
49+
equal_strategy_stacks <- function(stack, other) {
50+
stop_if_not(is.list(stack), is.list(other))
51+
stack <- lapply(stack, FUN = function(s) { attr(s, "call") <- attr(s, "init") <- NULL; s })
52+
other <- lapply(other, FUN = function(s) { attr(s, "call") <- attr(s, "init") <- NULL; s })
53+
54+
if (identical(stack, other)) return(TRUE)
55+
if (isTRUE(all.equal(stack, other))) return(TRUE)
56+
FALSE
57+
}
58+
59+
60+
plan_default_stack <- local({
61+
defaultStack <- NULL
62+
63+
function() {
64+
if (is.null(defaultStack)) {
65+
defaultStrategy <- structure(sequential,
66+
call = substitute(plan(sequential)))
67+
defaultStack <<- structure(list(defaultStrategy),
68+
class = c("FutureStrategyList", "list"))
69+
}
70+
defaultStack
71+
}
72+
}) ## plan_default_stack()
73+
74+
75+
plan_cleanup <- function(evaluator) {
76+
cleanup <- attr(evaluator, "cleanup", exact = TRUE)
77+
if (!is.null(cleanup)) {
78+
if (is.function(cleanup)) {
79+
cleanup()
80+
} else {
81+
stop(FutureError(sprintf("Unknown type of 'cleanup' attribute on current future strategy: %s", commaq(class(cleanup)))))
82+
}
83+
} else {
84+
## Backward compatibility for future (<= 1.33.2)
85+
if (isTRUE(getOption("future.plan.cleanup.legacy"))) {
86+
ClusterRegistry(action = "stop")
87+
}
88+
}
89+
} ## plan_cleanup()
90+
91+
92+
plan_init <- function(evaluator) {
93+
init <- attr(evaluator, "init", exact = TRUE)
94+
if (identical(init, TRUE)) {
95+
debug <- isTRUE(getOption("future.debug"))
96+
if (debug) {
97+
mdebugf("plan(): plan_init() of %s ...",
98+
commaq(class(evaluator)))
99+
mprint(evaluator)
100+
}
101+
102+
## IMPORANT: Initiate only once. This avoids an infinite
103+
## recursive loop caused by other plan() calls.
104+
attr(evaluator, "init") <- "done"
105+
106+
## Create dummy future to trigger setup (minimum overhead)
107+
f <- evaluator(NA, label = "future-plan-test",
108+
globals = FALSE, lazy = FALSE)
109+
110+
## Cleanup, by resolving it
111+
## (otherwise the garbage collector would have to do it)
112+
res <- tryCatch({
113+
value(f)
114+
}, FutureError = identity)
115+
if (inherits(res, "FutureError")) {
116+
res$message <- paste0(
117+
"Initialization of plan() failed, because the test future used for validation failed. The reason was: ", conditionMessage(res))
118+
stop(res)
119+
}
120+
121+
if (!identical(res, NA)) {
122+
res <- if (is.null(res)) {
123+
"NULL"
124+
} else {
125+
commaq(res)
126+
}
127+
stop(FutureError(sprintf("Initialization of plan() failed, because the value of the test future is not NA as expected: %s", res)))
128+
}
129+
130+
if (debug) {
131+
mdebugf("plan(): plan_init() of %s ... DONE",
132+
commaq(class(evaluator)))
133+
}
134+
}
135+
136+
evaluator
137+
} ## plan_init()
138+
139+
140+
141+
1142
#' Plan how to resolve a future
2143
#'
3144
#' This function allows _the user_ to plan the future, more specifically,
@@ -126,128 +267,6 @@ plan <- local({
126267
## Stack of type of futures to use
127268
stack <- NULL
128269

129-
assert_no_disallowed_strategies <- function(stack) {
130-
noplans <- getOption("future.plan.disallow")
131-
if (length(noplans) == 0L) return()
132-
133-
for (kk in seq_along(stack)) {
134-
evaluator <- stack[[kk]]
135-
if (!inherits(evaluator, noplans)) next
136-
clazz <- class(evaluator)[1]
137-
if (!clazz %in% noplans) next ## <== sic!
138-
139-
stop(FutureError(sprintf("Can not use %s in the future plan because it is on the list of future strategies that are not allow per option 'future.plan.disallow': %s", sQuote(clazz), commaq(noplans))))
140-
}
141-
}
142-
143-
evaluator_uses <- function(evaluator, strategy) {
144-
if (!inherits(evaluator, strategy)) return(FALSE)
145-
## NOTE: Yes, we are indeed inspecting the 'class' attribute itself
146-
class <- class(evaluator)
147-
if (class[1] == strategy) return(TRUE)
148-
if (length(class) == 1L) return(FALSE)
149-
if (class[1] == "tweaked" && class[2] == strategy) return(TRUE)
150-
## Special case for strategy == "multiprocess"
151-
if (strategy == "multiprocess" && class[length(class)] == strategy) return(TRUE)
152-
FALSE
153-
}
154-
155-
warn_about_multicore <- local({
156-
.warn <- TRUE
157-
158-
function(stack) {
159-
if (!.warn) return()
160-
161-
## Is 'multicore' used despite not being supported on the current
162-
## platform?
163-
for (kk in seq_along(stack)) {
164-
if (evaluator_uses(stack[[kk]], "multicore")) {
165-
supportsMulticore(warn = TRUE)
166-
## Warn only once, if at all
167-
.warn <<- FALSE
168-
break
169-
}
170-
}
171-
}
172-
})
173-
174-
plan_cleanup <- function() {
175-
evaluator <- stack[[1L]]
176-
177-
cleanup <- attr(evaluator, "cleanup", exact = TRUE)
178-
if (!is.null(cleanup)) {
179-
if (is.function(cleanup)) {
180-
cleanup()
181-
} else {
182-
stop(FutureError(sprintf("Unknown type of 'cleanup' attribute on current future strategy: %s", commaq(class(cleanup)))))
183-
}
184-
} else {
185-
## Backward compatibility for future (<= 1.33.2)
186-
if (isTRUE(getOption("future.plan.cleanup.legacy"))) {
187-
ClusterRegistry(action = "stop")
188-
}
189-
}
190-
} ## plan_cleanup()
191-
192-
plan_init <- function() {
193-
evaluator <- stack[[1L]]
194-
195-
init <- attr(evaluator, "init", exact = TRUE)
196-
if (identical(init, TRUE)) {
197-
debug <- isTRUE(getOption("future.debug"))
198-
if (debug) {
199-
mdebugf("plan(): plan_init() of %s ...",
200-
commaq(class(evaluator)))
201-
mprint(evaluator)
202-
}
203-
204-
## IMPORANT: Initiate only once. This avoids an infinite
205-
## recursive loop caused by other plan() calls.
206-
attr(evaluator, "init") <- "done"
207-
stack[[1L]] <<- evaluator
208-
209-
## Create dummy future to trigger setup (minimum overhead)
210-
f <- evaluator(NA, label = "future-plan-test",
211-
globals = FALSE, lazy = FALSE)
212-
213-
## Cleanup, by resolving it
214-
## (otherwise the garbage collector would have to do it)
215-
res <- tryCatch({
216-
value(f)
217-
}, FutureError = identity)
218-
if (inherits(res, "FutureError")) {
219-
res$message <- paste0(
220-
"Initialization of plan() failed, because the test future used for validation failed. The reason was: ", conditionMessage(res))
221-
stop(res)
222-
}
223-
224-
if (!identical(res, NA)) {
225-
res <- if (is.null(res)) {
226-
"NULL"
227-
} else {
228-
commaq(res)
229-
}
230-
stop(FutureError(sprintf("Initialization of plan() failed, because the value of the test future is not NA as expected: %s", res)))
231-
}
232-
233-
if (debug) {
234-
mdebugf("plan(): plan_init() of %s ... DONE",
235-
commaq(class(evaluator)))
236-
}
237-
}
238-
} ## plan_init()
239-
240-
241-
equal_strategy_stacks <- function(stack, other) {
242-
stop_if_not(is.list(stack), is.list(other))
243-
stack <- lapply(stack, FUN = function(s) { attr(s, "call") <- attr(s, "init") <- NULL; s })
244-
other <- lapply(other, FUN = function(s) { attr(s, "call") <- attr(s, "init") <- NULL; s })
245-
246-
if (identical(stack, other)) return(TRUE)
247-
if (isTRUE(all.equal(stack, other))) return(TRUE)
248-
FALSE
249-
}
250-
251270
plan_set <- function(newStack, skip = TRUE, cleanup = TRUE, init = TRUE) {
252271
stop_if_not(!is.null(newStack), is.list(newStack), length(newStack) >= 1L)
253272

@@ -276,12 +295,12 @@ plan <- local({
276295
warn_about_multicore(newStack)
277296

278297
## Stop/cleanup any previously registered backends?
279-
if (cleanup) plan_cleanup()
298+
if (cleanup) plan_cleanup(stack[[1L]])
280299

281300
stack <<- newStack
282301

283302
## Initiate future workers?
284-
if (init) plan_init()
303+
if (init) stack[[1]] <<- plan_init(stack[[1]])
285304

286305
## Sanity checks
287306
with_assert({
@@ -299,20 +318,6 @@ plan <- local({
299318
invisible(oldStack)
300319
} ## plan_set()
301320

302-
plan_default_stack <- local({
303-
defaultStack <- NULL
304-
305-
function() {
306-
if (is.null(defaultStack)) {
307-
defaultStrategy <- structure(sequential,
308-
call = substitute(plan(sequential)))
309-
defaultStack <<- structure(list(defaultStrategy),
310-
class = c("FutureStrategyList", "list"))
311-
}
312-
defaultStack
313-
}
314-
}) ## plan_default_stack()
315-
316321

317322
## Main function
318323
function(strategy = NULL, ..., substitute = TRUE, .skip = FALSE, .call = TRUE,
@@ -344,7 +349,7 @@ plan <- local({
344349
return(stack)
345350
} else if (identical(strategy, "reset")) {
346351
## Stop/cleanup any previously registered backends?
347-
if (.cleanup) plan_cleanup()
352+
if (.cleanup) plan_cleanup(stack[[1]])
348353
## Reset stack of future strategies?
349354
stack <<- plan_default_stack()
350355
return(stack)
@@ -487,6 +492,7 @@ plan <- local({
487492
}) # plan()
488493

489494

495+
490496
supportedStrategies <- function(strategies = c("sequential", "multicore",
491497
"multisession", "cluster")) {
492498
if (!supportsMulticore()) strategies <- setdiff(strategies, "multicore")

0 commit comments

Comments
 (0)