Skip to content

Commit d28ead4

Browse files
evalFuture(): Add argument 'threads' => simplify getExpression() for MulticoreFuture
1 parent 15c115f commit d28ead4

File tree

2 files changed

+82
-71
lines changed

2 files changed

+82
-71
lines changed

R/MulticoreFuture-class.R

Lines changed: 6 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -299,58 +299,24 @@ result.MulticoreFuture <- function(future, ...) {
299299

300300
#' @export
301301
getExpression.MulticoreFuture <- local({
302-
tmpl_expr_disable_multithreading <- future:::bquote_compile({
303-
## Force single-threaded OpenMP, iff needed
304-
old_omp_threads <- RhpcBLASctl::omp_get_max_threads()
305-
if (old_omp_threads > 1L) {
306-
RhpcBLASctl::omp_set_num_threads(1L)
307-
base::on.exit(RhpcBLASctl::omp_set_num_threads(old_omp_threads), add = TRUE)
308-
new_omp_threads <- RhpcBLASctl::omp_get_max_threads()
309-
if (!is.numeric(new_omp_threads) || is.na(new_omp_threads) || new_omp_threads != 1L) {
310-
label <- future$label
311-
if (is.null(label)) label <- "<none>"
312-
warning(future::FutureWarning(sprintf("Failed to force a single OMP thread on this system. Number of threads used: %s", new_omp_threads), future = future))
313-
}
314-
}
315-
316-
## Tell BLAS to use a single thread(?)
317-
## NOTE: Is multi-threaded BLAS an issue? Have we got any reports on this.
318-
## FIXME: How can we get the current BLAS settings?
319-
## /HB 2020-01-09
320-
## RhpcBLASctl::blas_set_num_threads(1L)
321-
322-
## Force single-threaded RcppParallel, iff needed
323-
old_rcppparallel_threads <- Sys.getenv("RCPP_PARALLEL_NUM_THREADS", "")
324-
if (old_rcppparallel_threads != "1") {
325-
Sys.setenv(RCPP_PARALLEL_NUM_THREADS = "1")
326-
if (old_rcppparallel_threads == "") {
327-
base::on.exit(Sys.unsetenv("RCPP_PARALLEL_NUM_THREADS"), add = TRUE)
328-
} else {
329-
base::on.exit(Sys.setenv(RCPP_PARALLEL_NUM_THREADS = old_rcppparallel_threads), add = TRUE)
330-
}
331-
}
332-
333-
.(expr)
334-
})
335-
336302
function(future, expr = future$expr, mc.cores = 1L, immediateConditions = TRUE, conditionClasses = future$conditions, resignalImmediateConditions = getOption("future.multicore.relay.immediate", immediateConditions), ...) {
337303
## Assert that no arguments but the first is passed by position
338304
assert_no_positional_args_but_first()
339305

340306
debug <- getOption("future.debug", FALSE)
341307

342308
## Disable multi-threading in futures?
309+
threads <- NA_integer_
343310
multithreading <- getOption("future.fork.multithreading.enable", TRUE)
344311
if (isFALSE(multithreading)) {
345-
if (!supports_omp_threads(assert = TRUE, debug = debug)) {
312+
if (supports_omp_threads(assert = TRUE, debug = debug)) {
313+
threads <- 1L
314+
if (debug) mdebug("- Updated expression to force single-threaded (OpenMP and RcppParallel) processing")
315+
} else {
346316
warning(FutureWarning("It is not possible to disable OpenMP multi-threading on this systems", future = future))
347317
}
348-
349-
expr <- bquote_apply(tmpl_expr_disable_multithreading)
350-
if (debug) mdebug("- Updated expression to force single-threaded (OpenMP and RcppParallel) processing")
351318
}
352319

353-
354320
## Inject code for resignaling immediateCondition:s?
355321
if (resignalImmediateConditions && immediateConditions) {
356322
## Preserve condition classes to be ignored
@@ -368,6 +334,6 @@ getExpression.MulticoreFuture <- local({
368334
attr(conditionClasses, "exclude") <- exclude
369335
} ## if (resignalImmediateConditions && immediateConditions)
370336

371-
NextMethod(expr = expr, mc.cores = mc.cores, immediateConditions = immediateConditions, conditionClasses = conditionClasses)
337+
NextMethod(expr = expr, mc.cores = mc.cores, immediateConditions = immediateConditions, conditionClasses = conditionClasses, threads = threads)
372338
}
373339
})

R/expressions.R

Lines changed: 76 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
makeExpression <- local({
22
tmpl_expr_evaluate2 <- future:::bquote_compile({
33
## Evaluate future
4-
future:::evalFuture(expr = quote(.(expr)), local = .(local), stdout = .(stdout), conditionClasses = .(conditionClasses), split = .(split), immediateConditions = .(immediateConditions), immediateConditionClasses = .(immediateConditionClasses), globals = .(globals), packages = .(packages), seed = .(seed), strategiesR = .(strategiesR), forwardOptions = .(forwardOptions), cleanup = .(cleanup))
4+
future:::evalFuture(expr = quote(.(expr)), local = .(local), stdout = .(stdout), conditionClasses = .(conditionClasses), split = .(split), immediateConditions = .(immediateConditions), immediateConditionClasses = .(immediateConditionClasses), globals = .(globals), packages = .(packages), seed = .(seed), strategiesR = .(strategiesR), forwardOptions = .(forwardOptions), threads = .(threads), cleanup = .(cleanup))
55
})
66

77

8-
function(expr, local = TRUE, immediateConditions = FALSE, stdout = TRUE, conditionClasses = NULL, split = FALSE, globals = NULL, version = "1.8", packages = NULL, seed = NULL, mc.cores = NULL, cleanup = TRUE) {
8+
function(expr, local = TRUE, immediateConditions = FALSE, stdout = TRUE, conditionClasses = NULL, split = FALSE, globals = NULL, version = "1.8", packages = NULL, seed = NULL, mc.cores = NULL, threads = NA_integer_, cleanup = TRUE) {
99
if (version != "1.8") {
1010
stop(FutureError("Internal error: Non-supported future expression version: ", version))
1111
}
@@ -55,25 +55,28 @@ makeExpression <- local({
5555

5656
forwardOptions <- list(
5757
## Assert globals when future is created (or at run time)?
58-
future.globals.onMissing = getOption("future.globals.onMissing"),
58+
future.globals.onMissing = getOption("future.globals.onMissing"),
5959

6060
## Pass down other future.* options
61-
future.globals.maxSize = getOption("future.globals.maxSize"),
62-
future.globals.method = getOption("future.globals.method"),
63-
future.globals.onReference = getOption("future.globals.onReference"),
64-
future.globals.resolve = getOption("future.globals.resolve"),
65-
future.resolve.recursive = getOption("future.resolve.recursive"),
66-
future.rng.onMisuse = getOption("future.rng.onMisuse"),
67-
future.rng.onMisuse.keepFuture = getOption("future.rng.onMisuse.keepFuture"),
68-
future.stdout.windows.reencode = getOption("future.stdout.windows.reencode"),
69-
70-
future.makeExpression.skip = getOption("future.makeExpression.skip"),
71-
future.makeExpression.skip.local = getOption("future.makeExpression.skip.local"),
72-
future.globalenv.onMisuse = getOption("future.globalenv.onMisuse"),
61+
future.globals.maxSize = getOption("future.globals.maxSize"),
62+
future.globals.method = getOption("future.globals.method"),
63+
future.globals.onReference = getOption("future.globals.onReference"),
64+
future.globals.resolve = getOption("future.globals.resolve"),
65+
future.resolve.recursive = getOption("future.resolve.recursive"),
66+
future.rng.onMisuse = getOption("future.rng.onMisuse"),
67+
future.rng.onMisuse.keepFuture = getOption("future.rng.onMisuse.keepFuture"),
68+
future.stdout.windows.reencode = getOption("future.stdout.windows.reencode"),
69+
70+
future.fork.multithreading.enable = getOption("future.fork.multithreading.enable"),
71+
72+
future.globalenv.onMisuse = getOption("future.globalenv.onMisuse"),
73+
74+
future.makeExpression.skip = getOption("future.makeExpression.skip"),
75+
future.makeExpression.skip.local = getOption("future.makeExpression.skip.local"),
7376

7477
## Other options relevant to making futures behave consistently
7578
## across backends
76-
width = getOption("width")
79+
width = getOption("width")
7780
)
7881

7982
if (!is.null(mc.cores)) {
@@ -88,21 +91,13 @@ makeExpression <- local({
8891

8992

9093

91-
logme <- function(expr, envir = parent.frame()) {
92-
expr <- substitute(expr)
93-
stdout <- utils::capture.output(eval(expr, envir = envir))
94-
stdout <- sprintf("[evalFuture()] %s\n", stdout)
95-
stdout <- paste(stdout, collapse = "")
96-
cat(stdout, file = "callr.log", append = TRUE)
97-
}
98-
9994
FutureEvalError <- function(...) {
10095
ex <- FutureError(...)
10196
class(ex) <- c("FutureEvalError", class(ex))
10297
ex
10398
}
10499

105-
evalFuture <- function(expr, local = FALSE, stdout = TRUE, conditionClasses = character(0L), split = FALSE, immediateConditions = NULL, immediateConditionClasses = character(0L), globals = NULL, packages = NULL, seed = NULL, forwardOptions = NULL, strategiesR = NULL, envir = parent.frame(), cleanup = TRUE) {
100+
evalFuture <- function(expr, local = FALSE, stdout = TRUE, conditionClasses = character(0L), split = FALSE, immediateConditions = NULL, immediateConditionClasses = character(0L), globals = NULL, packages = NULL, seed = NULL, forwardOptions = NULL, strategiesR = NULL, threads = NA_integer_, envir = parent.frame(), cleanup = TRUE) {
106101
stop_if_not(
107102
length(local) == 1L && is.logical(local) && !is.na(local),
108103
length(stdout) == 1L && is.logical(stdout),
@@ -111,9 +106,29 @@ evalFuture <- function(expr, local = FALSE, stdout = TRUE, conditionClasses = ch
111106
length(immediateConditions) == 1L && is.logical(immediateConditions) && !is.na(immediateConditions),
112107
is.character(immediateConditionClasses) && !anyNA(immediateConditionClasses) && all(nzchar(immediateConditionClasses)),
113108
is.null(seed) || is_lecyer_cmrg_seed(seed) || (is.logical(seed) && !is.na(seed) || !seed),
109+
length(threads) == 1L && is.integer(threads) && (is.na(threads) || threads >= 1L),
114110
length(cleanup) == 1L && is.logical(cleanup) && !is.na(cleanup)
115111
)
116112

113+
## Is it possible to force single-threaded processing?
114+
if (!is.na(threads)) {
115+
## Setting other than single-threaded processing is currently not
116+
## supported. /HB 2024-12-30
117+
if (threads != 1L) {
118+
stop(FutureEvalError(sprintf("Non-supported value on argument 'threads': %d", threads)))
119+
}
120+
if (requireNamespace("RhpcBLASctl", quietly = TRUE)) {
121+
## If RhpcBLASctl is compiled without OpenMP support, then it
122+
## returns NA_integer_, or NULL if RhpcBLASctl (< 0.20-17)
123+
old_omp_threads <- RhpcBLASctl::omp_get_max_threads()
124+
if (is.null(old_omp_threads) || is.na(old_omp_threads)) {
125+
threads <- NA_integer_
126+
}
127+
} else {
128+
threads <- NA_integer_
129+
}
130+
}
131+
117132
if (is.function(strategiesR)) {
118133
if (!inherits(strategiesR, "future")) {
119134
stop(FutureEvalError(sprintf("Argument 'strategiesR' is a function, but does not inherit 'future': %s", paste(sQuote(class(strategiesR)), collapse = ", "))))
@@ -416,16 +431,46 @@ evalFuture <- function(expr, local = FALSE, stdout = TRUE, conditionClasses = ch
416431
## Prevent 'future.plan' / R_FUTURE_PLAN settings from being nested
417432
options(future.plan = NULL)
418433
Sys.unsetenv("R_FUTURE_PLAN")
419-
420-
# logme("future:plan() ...")
421-
# logme(utils::str(strategiesR))
422-
# logme(print(strategiesR))
434+
435+
## Prevent multithreading?
436+
if (!is.na(threads) && threads == 1L) {
437+
## Force single-threaded OpenMP, iff needed
438+
old_omp_threads <- RhpcBLASctl::omp_get_max_threads()
439+
if (old_omp_threads != 1L) {
440+
RhpcBLASctl::omp_set_num_threads(1L)
441+
if (cleanup) {
442+
on.exit(RhpcBLASctl::omp_set_num_threads(old_omp_threads), add = TRUE)
443+
}
444+
445+
new_omp_threads <- RhpcBLASctl::omp_get_max_threads()
446+
if (!is.numeric(new_omp_threads) || is.na(new_omp_threads) || new_omp_threads != 1L) {
447+
warning(FutureWarning(sprintf("Failed to force a single OMP thread on this system. Number of threads used: %s", new_omp_threads)))
448+
}
449+
}
450+
451+
## Tell BLAS to use a single thread(?)
452+
## NOTE: Is multi-threaded BLAS an issue? Have we got any reports on this.
453+
## FIXME: How can we get the current BLAS settings?
454+
## /HB 2020-01-09
455+
## RhpcBLASctl::blas_set_num_threads(1L)
456+
457+
## Force single-threaded RcppParallel, iff needed
458+
old_rcppparallel_threads <- Sys.getenv("RCPP_PARALLEL_NUM_THREADS", "")
459+
if (old_rcppparallel_threads != "1") {
460+
Sys.setenv(RCPP_PARALLEL_NUM_THREADS = "1")
461+
if (cleanup) {
462+
if (old_rcppparallel_threads == "") {
463+
on.exit(Sys.unsetenv("RCPP_PARALLEL_NUM_THREADS"), add = TRUE)
464+
} else {
465+
on.exit(Sys.setenv(RCPP_PARALLEL_NUM_THREADS = old_rcppparallel_threads), add = TRUE)
466+
}
467+
}
468+
}
469+
}
423470

424471
## Use the next-level-down ("popped") future strategy
425472
future::plan(strategiesR, .cleanup = FALSE, .init = FALSE)
426473

427-
# logme("future:plan() ... done")
428-
429474
## Set RNG seed?
430475
if (is.numeric(seed)) {
431476
genv <- globalenv()

0 commit comments

Comments
 (0)