|
1 | | -#' A cluster future is a future whose value will be resolved asynchronously in a parallel process |
2 | | -#' |
3 | | -#' @inheritParams MultiprocessFuture-class |
4 | | -#' @inheritParams Future-class |
5 | | -#' |
6 | | -#' @param persistent If FALSE, the evaluation environment is cleared |
7 | | -#' from objects prior to the evaluation of the future. |
8 | | -#' |
9 | | -#' @param workers A \code{\link[parallel:makeCluster]{cluster}} object, |
10 | | -#' a character vector of host names, a positive numeric scalar, |
11 | | -#' or a function. |
12 | | -#' If a character vector or a numeric scalar, a `cluster` object |
13 | | -#' is created using \code{\link[parallelly:makeClusterPSOCK]{makeClusterPSOCK}(workers)}. |
14 | | -#' If a function, it is called without arguments _when the future |
15 | | -#' is created_ and its value is used to configure the workers. |
16 | | -#' The function should return any of the above types. |
17 | | -#' |
18 | | -#' @return |
19 | | -#' `ClusterFuture()` returns an object of class `ClusterFuture`. |
20 | | -#' |
21 | | -#' @section Usage: |
22 | | -#' To use 'cluster' futures, use `plan(cluster, ...)`, cf. [cluster]. |
23 | | -#' |
24 | | -#' @aliases MultisessionFuture MultisessionFuture-class |
25 | | -#' @export |
26 | | -#' @export MultisessionFuture |
27 | | -#' @importFrom digest digest |
28 | | -#' @name ClusterFuture-class |
29 | | -#' @keywords internal |
30 | | -ClusterFuture <- function(expr = NULL, substitute = TRUE, envir = parent.frame(), persistent = FALSE, workers = NULL, ...) { |
31 | | - if (substitute) expr <- substitute(expr) |
32 | | - |
33 | | - stop_if_not(is.logical(persistent), length(persistent) == 1L, |
34 | | - !is.na(persistent)) |
35 | | - |
36 | | - args <- list(...) |
37 | | - |
38 | | - ## Which '...' arguments should be passed to Future() and |
39 | | - ## which should be passed to makeClusterPSOCK()? |
40 | | - future_args <- !is.element(names(args), makeClusterPSOCK_args()) |
41 | | - |
42 | | - future <- do.call(MultiprocessFuture, args = c(list(expr = quote(expr), substitute = FALSE, envir = envir, persistent = persistent, node = NA_integer_), args[future_args]), quote = FALSE) |
43 | | - |
44 | | - future <- do.call(as_ClusterFuture, args = c(list(future, workers = workers), args[!future_args]), quote = TRUE) |
45 | | - |
46 | | - future |
47 | | -} |
48 | | - |
49 | | - |
50 | | -as_ClusterFuture <- local({ |
51 | | - getDefaultCluster <- import_parallel_fcn("getDefaultCluster") |
52 | | - |
53 | | - function(future, workers = NULL, ...) { |
54 | | - if (is.function(workers)) workers <- workers() |
55 | | - if (is.null(workers)) { |
56 | | - workers <- getDefaultCluster() |
57 | | - workers <- addCovrLibPath(workers) |
58 | | - } else if (is.character(workers) || is.numeric(workers)) { |
59 | | - workers <- ClusterRegistry("start", workers = workers, ...) |
60 | | - } else { |
61 | | - workers <- as.cluster(workers) |
62 | | - workers <- addCovrLibPath(workers) |
63 | | - } |
64 | | - if (!inherits(workers, "cluster")) { |
65 | | - stopf("Argument 'workers' is not of class 'cluster': %s", commaq(class(workers))) |
66 | | - } |
67 | | - stop_if_not(length(workers) > 0) |
68 | | - |
69 | | - |
70 | | - ## Attached workers' session information, unless already done. |
71 | | - ## FIXME: We cannot do this here, because it introduces a race condition |
72 | | - ## where multiple similar requests may appear at the same time bringing |
73 | | - ## the send/receive data to be out of sync and therefore corrupt the |
74 | | - ## futures' values. |
75 | | - ## workers <- add_cluster_session_info(workers) |
76 | | - |
77 | | - ## Attach name to cluster? |
78 | | - name <- attr(workers, "name", exact = TRUE) |
79 | | - if (is.null(name)) { |
80 | | - name <- digest(workers) |
81 | | - stop_if_not(length(name) > 0, nzchar(name)) |
82 | | - attr(workers, "name") <- name |
83 | | - } |
84 | | - |
85 | | - future[["backend"]] <- list( |
86 | | - workers = workers, |
87 | | - reg = sprintf("workers-%s", attr(workers, "name", exact = TRUE)) |
88 | | - ) |
89 | | - |
90 | | - ## FIXME: To be cleaned out /HB 2025-02-19 |
91 | | - future[["workers"]] <- workers |
92 | | - |
93 | | - future <- structure(future, class = c("ClusterFuture", class(future))) |
94 | | - |
95 | | - future |
96 | | - } |
97 | | -}) |
98 | | - |
99 | | - |
100 | 1 | getSocketSelectTimeout <- function(future, timeout = NULL) { |
101 | 2 | if (!is.null(timeout)) return(timeout) |
102 | 3 |
|
|
0 commit comments