Skip to content

Commit becfd04

Browse files
evalFuture(): Add argument 'evalFuture' => R CMD check all OK now
1 parent 9a7988b commit becfd04

File tree

2 files changed

+127
-141
lines changed

2 files changed

+127
-141
lines changed

R/expressions.R

Lines changed: 115 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ makeExpression <- local({
55

66
tmpl_expr_evaluate2 <- future:::bquote_compile({
77
## Evaluate future
8-
future:::evalFuture(expr = quote(.(expr)), stdout = .(stdout), conditionClasses = .(conditionClasses), split = .(split), immediateConditions = .(immediateConditions), immediateConditionClasses = .(immediateConditionClasses), globals.onMissing = .(globals.onMissing), globalenv = .(globalenv), skip = .(skip), packages = .(packages), seed = .(seed), strategiesR = .(strategiesR), mc.cores = .(mc.cores))
8+
future:::evalFuture(expr = quote(.(expr)), stdout = .(stdout), conditionClasses = .(conditionClasses), split = .(split), immediateConditions = .(immediateConditions), immediateConditionClasses = .(immediateConditionClasses), globals.onMissing = .(globals.onMissing), globalenv = .(globalenv), skip = .(skip), packages = .(packages), seed = .(seed), strategiesR = .(strategiesR), forwardOptions = .(forwardOptions), mc.cores = .(mc.cores))
99
})
1010

1111

@@ -55,6 +55,25 @@ makeExpression <- local({
5555
packages <- unique(c(packages, pkgsS))
5656
}
5757

58+
forwardOptions <- list(
59+
## Assert globals when future is created (or at run time)?
60+
future.globals.onMissing = globals.onMissing,
61+
62+
## Pass down other future.* options
63+
future.globals.maxSize = getOption("future.globals.maxSize"),
64+
future.globals.method = getOption("future.globals.method"),
65+
future.globals.onReference = getOption("future.globals.onReference"),
66+
future.globals.resolve = getOption("future.globals.resolve"),
67+
future.resolve.recursive = getOption("future.resolve.recursive"),
68+
future.rng.onMisuse = getOption("future.rng.onMisuse"),
69+
future.rng.onMisuse.keepFuture = getOption("future.rng.onMisuse.keepFuture"),
70+
future.stdout.windows.reencode = getOption("future.stdout.windows.reencode"),
71+
72+
## Other options relevant to making futures behave consistently
73+
## across backends
74+
width = getOption("width")
75+
)
76+
5877
expr <- bquote_apply(tmpl_expr_evaluate2)
5978

6079
expr
@@ -63,10 +82,7 @@ makeExpression <- local({
6382

6483

6584

66-
67-
68-
69-
evalFuture <- function(expr, stdout = TRUE, conditionClasses = character(0L), split = FALSE, immediateConditions = NULL, immediateConditionClasses = character(0L), globals.onMissing = getOption("future.globals.onMissing", NULL), globalenv = (getOption("future.globalenv.onMisuse", "ignore") != "ignore"), skip = NULL, packages = NULL, seed = NULL, mc.cores = NULL, strategiesR = future::sequential, envir = parent.frame()) {
85+
evalFuture <- function(expr, stdout = TRUE, conditionClasses = character(0L), split = FALSE, immediateConditions = NULL, immediateConditionClasses = character(0L), globals.onMissing = getOption("future.globals.onMissing", NULL), globalenv = (getOption("future.globalenv.onMisuse", "ignore") != "ignore"), skip = NULL, packages = NULL, seed = NULL, mc.cores = NULL, forwardOptions = NULL, strategiesR = future::sequential, envir = parent.frame()) {
7086
stop_if_not(
7187
length(stdout) == 1L && is.logical(stdout),
7288
length(split) == 1L && is.logical(split) && !is.na(split),
@@ -86,8 +102,24 @@ evalFuture <- function(expr, stdout = TRUE, conditionClasses = character(0L), sp
86102

87103

88104
## -----------------------------------------------------------------
89-
## Load and attached packages
105+
## Record current state
90106
## -----------------------------------------------------------------
107+
## Current working directory
108+
...future.workdir <- getwd()
109+
110+
## mc.cores
111+
...future.mc.cores.old <- getOption("mc.cores")
112+
113+
## RNG state
114+
...future.rngkind <- RNGkind()[1]
115+
...future.rng <- globalenv()$.Random.seed
116+
117+
## Record the original future strategy set on this worker
118+
...future.plan.old <- getOption("future.plan")
119+
...future.plan.old.envvar <- Sys.getenv("R_FUTURE_PLAN", NA_character_)
120+
...future.strategy.old <- plan("list")
121+
122+
## Load and attached packages
91123
## TROUBLESHOOTING: If the package fails to load, then library()
92124
## suppress that error and generates a generic much less
93125
## informative error message. Because of this, we load the
@@ -109,84 +141,22 @@ evalFuture <- function(expr, stdout = TRUE, conditionClasses = character(0L), sp
109141
}
110142
}
111143

112-
113-
## -----------------------------------------------------------------
114-
## Preserve RNG state
115-
## -----------------------------------------------------------------
116-
...future.rngkind <- RNGkind()[1]
117-
...future.rng <- globalenv()$.Random.seed
118-
on.exit({
119-
## Undo .Random.seed
120-
genv <- globalenv()
121-
RNGkind(...future.rngkind)
122-
if (is.null(...future.rng)) {
123-
if (exists(".Random.seed", envir = genv, inherits = FALSE)) {
124-
rm(list = ".Random.seed", envir = genv, inherits = FALSE)
125-
}
126-
} else {
127-
assign(".Random.seed", ...future.rng, envir = genv, inherits = FALSE)
128-
}
129-
}, add = TRUE)
130-
131-
132-
## -----------------------------------------------------------------
133-
## Preserve R option 'mc.cores'
134-
## -----------------------------------------------------------------
135-
...future.mc.cores.old <- getOption("mc.cores")
136-
on.exit({
137-
## Reset R option 'mc.cores'
138-
options(mc.cores = ...future.mc.cores.old)
139-
}, add = TRUE)
140-
141-
142-
## -----------------------------------------------------------------
143-
## Preserve R options
144-
## -----------------------------------------------------------------
145-
## Note, we do this _after_ loading and attaching packages, in
146-
## case they set options/env vars needed for the session, e.g.
144+
## Note, we record R options and environment variables _after_
145+
## loading and attaching packages, in case they set options/env vars
146+
## needed for the session, e.g.
147147
## https://github.com/Rdatatable/data.table/issues/5375
148+
149+
## R options
148150
...future.oldOptions <- as.list(.Options)
149-
on.exit({
150-
## (a) Reset options
151-
## WORKAROUND: Do not reset 'nwarnings' unless changed, because
152-
## that will, as documented, trigger any warnings collected
153-
## internally to be removed.
154-
## https://github.com/futureverse/future/issues/645
155-
if (identical(getOption("nwarnings"), ...future.oldOptions$nwarnings)) {
156-
...future.oldOptions$nwarnings <- NULL
157-
}
158-
options(...future.oldOptions)
159151

160-
## There might be packages that add essential R options when
161-
## loaded or attached, and if their R options are removed, some of
162-
## those packages might break. Because we don't know which these
163-
## packages are, and we cannot detect when a random packages is
164-
## loaded/attached, we cannot reliably workaround R options added
165-
## on package load/attach. For this reason, I'll relax the
166-
## resetting of R options to only be done to preexisting R options
167-
## for now. These thoughts were triggered by a related data.table
168-
## issue, cf. https://github.com/futureverse/future/issues/609
169-
## /HB 2022-04-29
170-
171-
## (b) Remove any options added
172-
## diff <- setdiff(names(.Options),
173-
## names(...future.oldOptions))
174-
## if (length(diff) > 0L) {
175-
## opts <- vector("list", length = length(diff))
176-
## names(opts) <- diff
177-
## options(opts)
178-
## }
179-
}, add = TRUE)
152+
## Environment variables
153+
...future.oldEnvVars <- Sys.getenv()
180154

181155

182156
## -----------------------------------------------------------------
183-
## Preserve environment variables
157+
## Reset the current state on exit
184158
## -----------------------------------------------------------------
185-
## Note, we do this _after_ loading and attaching packages, in
186-
## case they set options/env vars needed for the session, e.g.
187-
## https://github.com/Rdatatable/data.table/issues/5375
188-
...future.oldEnvVars <- Sys.getenv()
189-
on.exit({
159+
on.exit({
190160
## (d) Reset environment variables
191161
if (.Platform$OS.type == "windows") {
192162
## On MS Windows, there are two special cases to consider:
@@ -265,32 +235,74 @@ evalFuture <- function(expr, stdout = TRUE, conditionClasses = character(0L), sp
265235
## (d) Remove any environment variables added
266236
## diff <- setdiff(names(Sys.getenv()), names(...future.oldEnvVars))
267237
## Sys.unsetenv(diff)
268-
}, add = TRUE)
269-
238+
239+
## (a) Reset options
240+
## WORKAROUND: Do not reset 'nwarnings' unless changed, because
241+
## that will, as documented, trigger any warnings collected
242+
## internally to be removed.
243+
## https://github.com/futureverse/future/issues/645
244+
if (identical(getOption("nwarnings"), ...future.oldOptions$nwarnings)) {
245+
...future.oldOptions$nwarnings <- NULL
246+
}
247+
options(...future.oldOptions)
270248

271-
## covr: skip=7
272-
options(
273-
## Prevent .future.R from being source():d when future is attached
274-
future.startup.script = FALSE,
249+
## There might be packages that add essential R options when
250+
## loaded or attached, and if their R options are removed, some of
251+
## those packages might break. Because we don't know which these
252+
## packages are, and we cannot detect when a random packages is
253+
## loaded/attached, we cannot reliably workaround R options added
254+
## on package load/attach. For this reason, I'll relax the
255+
## resetting of R options to only be done to preexisting R options
256+
## for now. These thoughts were triggered by a related data.table
257+
## issue, cf. https://github.com/futureverse/future/issues/609
258+
## /HB 2022-04-29
275259

276-
## Assert globals when future is created (or at run time)?
277-
future.globals.onMissing = globals.onMissing,
260+
## (b) Remove any options added
261+
## diff <- setdiff(names(.Options),
262+
## names(...future.oldOptions))
263+
## if (length(diff) > 0L) {
264+
## opts <- vector("list", length = length(diff))
265+
## names(opts) <- diff
266+
## options(opts)
267+
## }
268+
269+
## Revert to the original future strategy
270+
## Reset option 'future.plan' and env var 'R_FUTURE_PLAN'
271+
options(future.plan = ...future.plan.old)
272+
plan(...future.strategy.old, .cleanup = FALSE, .init = FALSE)
273+
if (is.na(...future.plan.old.envvar)) {
274+
Sys.unsetenv("R_FUTURE_PLAN")
275+
} else {
276+
Sys.setenv(R_FUTURE_PLAN = ...future.plan.old.envvar)
277+
}
278+
279+
## Undo RNG state
280+
genv <- globalenv()
281+
RNGkind(...future.rngkind)
282+
if (is.null(...future.rng)) {
283+
if (exists(".Random.seed", envir = genv, inherits = FALSE)) {
284+
rm(list = ".Random.seed", envir = genv, inherits = FALSE)
285+
}
286+
} else {
287+
assign(".Random.seed", ...future.rng, envir = genv, inherits = FALSE)
288+
}
278289

279-
## Pass down other future.* options
280-
future.globals.maxSize = getOption("future.globals.maxSize"),
281-
future.globals.method = getOption("future.globals.method"),
282-
future.globals.onReference = getOption("future.globals.onReference"),
283-
future.globals.resolve = getOption("future.globals.resolve"),
284-
future.resolve.recursive = getOption("future.resolve.recursive"),
285-
future.rng.onMisuse = getOption("future.rng.onMisuse"),
286-
future.rng.onMisuse.keepFuture = getOption("future.rng.onMisuse.keepFuture"),
287-
future.stdout.windows.reencode = getOption("future.stdout.windows.reencode"),
288-
289-
## Other options relevant to making futures behave consistently
290-
## across backends
291-
width = getOption("width")
292-
)
290+
## Reset R option 'mc.cores'
291+
options(mc.cores = ...future.mc.cores.old)
292+
293+
## Reset working directory
294+
setwd(...future.workdir)
295+
}, add = TRUE)
296+
293297

298+
## Prevent .future.R from being source():d when future is attached
299+
options(future.startup.script = FALSE)
300+
301+
## Options forwarded from parent process
302+
if (length(forwardOptions) > 0) {
303+
stopifnot(!is.null(names(forwardOptions)))
304+
do.call(options, args = forwardOptions)
305+
}
294306

295307
## -----------------------------------------------------------------
296308
## Preserve future options added
@@ -306,35 +318,6 @@ evalFuture <- function(expr, stdout = TRUE, conditionClasses = character(0L), sp
306318
}, add = TRUE)
307319

308320

309-
## -----------------------------------------------------------------
310-
## Preserve working directory
311-
## -----------------------------------------------------------------
312-
...future.workdir <- getwd()
313-
on.exit({
314-
setwd(...future.workdir)
315-
}, add = TRUE)
316-
317-
318-
## -----------------------------------------------------------------
319-
## Preserve future strategy
320-
## -----------------------------------------------------------------
321-
## Record the original future strategy set on this worker
322-
...future.plan.old <- getOption("future.plan")
323-
...future.plan.old.envvar <- Sys.getenv("R_FUTURE_PLAN", NA_character_)
324-
...future.strategy.old <- plan("list")
325-
on.exit({
326-
## Revert to the original future strategy
327-
## Reset option 'future.plan' and env var 'R_FUTURE_PLAN'
328-
options(future.plan = ...future.plan.old)
329-
plan(...future.strategy.old, .cleanup = FALSE, .init = FALSE)
330-
if (is.na(...future.plan.old.envvar)) {
331-
Sys.unsetenv("R_FUTURE_PLAN")
332-
} else {
333-
Sys.setenv(R_FUTURE_PLAN = ...future.plan.old.envvar)
334-
}
335-
}, add = TRUE)
336-
337-
338321
## -----------------------------------------------------------------
339322
## Evaluate future in the correct context
340323
## -----------------------------------------------------------------
@@ -370,7 +353,10 @@ evalFuture <- function(expr, stdout = TRUE, conditionClasses = character(0L), sp
370353
## Prevent 'future.plan' / R_FUTURE_PLAN settings from being nested
371354
options(future.plan = NULL)
372355
Sys.unsetenv("R_FUTURE_PLAN")
373-
356+
357+
## Use the next-level-down ("popped") future strategy
358+
future::plan(strategiesR, .cleanup = FALSE, .init = FALSE)
359+
374360
## Temporarily set R option 'mc.cores'?
375361
if (!is.null(mc.cores)) {
376362
options(mc.cores = mc.cores)

tests/nested_futures,mc.cores.R

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,23 +41,23 @@ for (mc in 1:2) {
4141
a %<-% {
4242
b1 %<-% Sys.getpid()
4343
b2 %<-% Sys.getpid()
44-
list(pid = Sys.getpid(), cores = availableCores(), pid1 = b1, pid2 = b2)
44+
list(pid = Sys.getpid(), mc.cores = getOption("mc.cores"), cores = availableCores(), plan = plan("list"), pid1 = b1, pid2 = b2)
4545
}
46-
print(a)
46+
utils::str(a)
4747
stopifnot(a$pid == pid)
4848
stopifnot((mc2 <= 1 && a$pid1 == pid) || (a$pid1 != pid))
4949
stopifnot((mc2 <= 1 && a$pid2 == pid) || (a$pid2 != pid))
5050
stopifnot(((mc2 <= 1 || a$cores <= 2) && a$pid2 == a$pid1) || (a$pid2 != a$pid1))
5151

5252
if (mc == 1L) {
5353
message(sprintf("plan(list('sequential', '%s':2)):", strategy))
54-
plan(list('sequential', tweak(strategy, workers = 2)))
54+
plan(list('sequential', tweak(strategy, workers = I(2))))
5555
a %<-% {
5656
b1 %<-% Sys.getpid()
5757
b2 %<-% Sys.getpid()
58-
list(pid = Sys.getpid(), cores = availableCores(), pid1 = b1, pid2 = b2)
58+
list(pid = Sys.getpid(), mc.cores = getOption("mc.cores"), cores = availableCores(), plan = plan("list"), pid1 = b1, pid2 = b2)
5959
}
60-
print(a)
60+
utils::str(a)
6161
stopifnot(a$pid == pid)
6262
stopifnot((mc2 <= 1 && a$pid1 == pid) || (a$pid1 != pid))
6363
stopifnot((mc2 <= 1 && a$pid2 == pid) || (a$pid2 != pid))
@@ -69,9 +69,9 @@ for (mc in 1:2) {
6969
a %<-% {
7070
b1 %<-% Sys.getpid()
7171
b2 %<-% Sys.getpid()
72-
list(pid = Sys.getpid(), cores = availableCores(), pid1 = b1, pid2 = b2)
72+
list(pid = Sys.getpid(), mc.cores = getOption("mc.cores"), cores = availableCores(), plan = plan("list"), pid1 = b1, pid2 = b2)
7373
}
74-
print(a)
74+
utils::str(a)
7575
stopifnot((mc2 <= 1 && a$pid == pid) || (a$pid != pid))
7676
stopifnot((mc2 <= 1 && a$pid1 == pid) || (a$pid1 != pid))
7777
stopifnot((mc2 <= 1 && a$pid2 == pid) || (a$pid2 != pid))
@@ -82,23 +82,23 @@ for (mc in 1:2) {
8282
a %<-% {
8383
b1 %<-% { Sys.sleep(0.2); Sys.getpid() }
8484
b2 %<-% Sys.getpid()
85-
list(pid = Sys.getpid(), cores = availableCores(), pid1 = b1, pid2 = b2)
85+
list(pid = Sys.getpid(), mc.cores = getOption("mc.cores"), cores = availableCores(), plan = plan("list"), pid1 = b1, pid2 = b2)
8686
}
87-
print(a)
87+
utils::str(a)
8888
stopifnot((mc2 <= 1 && a$pid == pid) || (a$pid != pid))
8989
stopifnot((mc2 <= 1 && a$pid1 == pid) || (a$pid1 != pid))
9090
stopifnot((mc2 <= 1 && a$pid2 == pid) || (a$pid2 != pid))
9191
stopifnot(a$pid2 == a$pid1)
9292

9393
if (mc == 1L && !winWorkaround) {
9494
message(sprintf("plan(list('%s':2, '%s':2)):", strategy, strategy))
95-
plan(list(tweak(strategy, workers = 2), tweak(strategy, workers = 2)))
95+
plan(list(tweak(strategy, workers = I(2)), tweak(strategy, workers = I(2))))
9696
a %<-% {
9797
b1 %<-% Sys.getpid() ## This stalls
9898
b2 %<-% Sys.getpid()
99-
list(pid = Sys.getpid(), cores = availableCores(), pid1 = b1, pid2 = b2)
99+
list(pid = Sys.getpid(), mc.cores = getOption("mc.cores"), cores = availableCores(), plan = plan("list"), pid1 = b1, pid2 = b2)
100100
}
101-
print(a)
101+
utils::str(a)
102102
stopifnot(a$pid != pid)
103103
stopifnot(a$pid1 != pid)
104104
stopifnot(a$pid2 != pid)

0 commit comments

Comments
 (0)