|
| 1 | +#' @param \ldots Additional arguments passed to [BatchtoolsFutureBackend()]. |
| 2 | +#' |
| 3 | +#' @rdname BatchtoolsFutureBackend |
| 4 | +#' @importFrom batchtools makeClusterFunctionsInteractive |
| 5 | +#' @importFrom future SequentialFutureBackend |
| 6 | +#' @export |
| 7 | +BatchtoolsMulticoreFutureBackend <- function(workers = availableCores(constraints = "multicore"), ...) { |
| 8 | + message("BatchtoolsMulticoreFutureBackend() ...") |
| 9 | + str(list(...)) |
| 10 | + |
| 11 | + if (is.null(workers)) { |
| 12 | + workers <- availableCores(constraints = "multicore") |
| 13 | + } else if (is.function(workers)) { |
| 14 | + workers <- workers() |
| 15 | + } |
| 16 | + stop_if_not(length(workers) == 1L, is.numeric(workers), |
| 17 | + is.finite(workers), workers >= 1L) |
| 18 | + |
| 19 | +str(list(workers = workers, cores = availableCores(constraints = "multicore"))) |
| 20 | + |
| 21 | + ## Fall back to batchtools_local if multicore processing is not supported |
| 22 | + asIs <- inherits(workers, "AsIs") |
| 23 | + if (!asIs && (workers == 1L && availableCores(constraints = "multicore") == 1L) || is_os("windows") || is_os("solaris")) { |
| 24 | + ## covr: skip=1 |
| 25 | + return(SequentialFutureBackend(...)) |
| 26 | + } |
| 27 | + |
| 28 | + cluster.functions <- makeClusterFunctionsMulticore(ncpus = workers) |
| 29 | + |
| 30 | + core <- BatchtoolsFutureBackend( |
| 31 | + workers = workers, |
| 32 | + cluster.functions = cluster.functions, |
| 33 | + ... |
| 34 | + ) |
| 35 | + |
| 36 | + core[["futureClasses"]] <- c("BatchtoolsMulticoreFuture", "BatchtoolsMultiprocessFuture", "MultiprocessFuture", core[["futureClasses"]]) |
| 37 | + core <- structure(core, class = c("BatchtoolsMulticoreFutureBackend", "BatchtoolsMultiprocessFutureBackend", setdiff(class(core), "MultiprocessFutureBackend"))) |
| 38 | + core |
| 39 | +} |
| 40 | + |
| 41 | + |
| 42 | +#' @importFrom future stopWorkers |
| 43 | +#' @export |
| 44 | +stopWorkers.BatchtoolsMulticoreFutureBackend <- function(backend, ...) { |
| 45 | + TRUE |
| 46 | +} |
| 47 | + |
| 48 | + |
1 | 49 | #' batchtools multicore futures |
2 | 50 | #' |
3 | 51 | #' A batchtools multicore future is an asynchronous multiprocess |
|
27 | 75 | #' @importFrom future withPlan |
28 | 76 | #' @export |
29 | 77 | #' @keywords internal |
30 | | -batchtools_multicore <- function(expr, envir = parent.frame(), |
31 | | - substitute = TRUE, globals = TRUE, |
32 | | - label = NULL, |
33 | | - workers = availableCores(constraints = "multicore"), |
34 | | - registry = list(), ...) { |
35 | | - if (substitute) expr <- substitute(expr) |
36 | | - |
37 | | - if (is.null(workers)) { |
38 | | - workers <- availableCores(constraints = "multicore") |
39 | | - } else if (is.function(workers)) { |
40 | | - workers <- workers() |
41 | | - } |
42 | | - stop_if_not(length(workers) == 1L, is.numeric(workers), |
43 | | - is.finite(workers), workers >= 1L) |
44 | | - |
45 | | - ## Fall back to batchtools_local if multicore processing is not supported |
46 | | - if ((workers == 1L && !inherits(workers, "AsIs")) || |
47 | | - is_os("windows") || is_os("solaris") || |
48 | | - availableCores(constraints = "multicore") == 1L) { |
49 | | - ## covr: skip=1 |
50 | | - withPlan(batchtools_local, { |
51 | | - future(expr, envir = envir, substitute = FALSE, |
52 | | - globals = globals, label = label, registry = registry, ...) |
53 | | - }) |
54 | | - } |
55 | | - |
56 | | - oopts <- options(mc.cores = workers) |
57 | | - on.exit(options(oopts)) |
58 | | - |
59 | | - cf <- makeClusterFunctionsMulticore(ncpus = workers) |
60 | | - |
61 | | - future <- BatchtoolsMulticoreFuture( |
62 | | - expr = expr, envir = envir, substitute = FALSE, |
63 | | - globals = globals, |
64 | | - label = label, |
65 | | - cluster.functions = cf, |
66 | | - registry = registry, |
67 | | - ... |
68 | | - ) |
69 | | - |
70 | | - if (!future$lazy) future <- run(future) |
71 | | - |
72 | | - invisible(future) |
| 78 | +batchtools_multicore <- function(workers = availableCores(constraints = "multicore"), ...) { |
| 79 | + stop("INTERNAL ERROR: The future.batchtools::batchtools_multicore() function implements the FutureBackend and should never be called directly") |
73 | 80 | } |
74 | 81 | class(batchtools_multicore) <- c( |
75 | 82 | "batchtools_multicore", "batchtools_multiprocess", "batchtools", |
76 | 83 | "multiprocess", "future", "function" |
77 | 84 | ) |
78 | 85 | attr(batchtools_multicore, "tweakable") <- c("finalize") |
| 86 | +attr(batchtools_multicore, "factory") <- BatchtoolsMulticoreFutureBackend |
0 commit comments