Skip to content

Commit d17e255

Browse files
Major internal redesign - step 2:
* All of future's backends are now using evalFuture() directly. Other, existing backends will rely on getExpression() until updated. Squashed commit of the following: commit 5e5c70ec9c459385bf20a0976c2318aa43f8ae1f Author: Henrik Bengtsson <[email protected]> Date: Fri Feb 14 16:54:11 2025 -0800 Missing ) in help commit 9058390 Author: Henrik Bengtsson <[email protected]> Date: Fri Feb 14 16:03:34 2025 -0800 ClusterFuture/MultisessionFuture is now using evalFuture() directly commit f9bcf57 Author: Henrik Bengtsson <[email protected]> Date: Fri Feb 14 15:51:03 2025 -0800 MulticoreFuture is now using evalFuture() directly commit 84e3a2a Author: Henrik Bengtsson <[email protected]> Date: Fri Feb 14 12:17:19 2025 -0800 CLEANUP: Drop argument 'envir' from evalFuture() commit 92b3f09 Author: Henrik Bengtsson <[email protected]> Date: Fri Feb 14 12:15:34 2025 -0800 UniprocessFuture is now using evalFuture() directly commit 7dc6c97 Author: Henrik Bengtsson <[email protected]> Date: Fri Feb 14 11:55:48 2025 -0800 Simplify evalFuture() further commit 089608a Author: Henrik Bengtsson <[email protected]> Date: Fri Feb 14 10:09:22 2025 -0800 Regroup R/ files by renaming them commit d1a5755 Author: Henrik Bengtsson <[email protected]> Date: Thu Feb 13 19:56:42 2025 -0800 File cleanup: move utility functions to utils-*.R files commit 8e726cb Author: Henrik Bengtsson <[email protected]> Date: Thu Feb 13 12:20:55 2025 -0800 More backend cleanups commit cda1fda Author: Henrik Bengtsson <[email protected]> Date: Thu Feb 13 08:39:22 2025 -0800 Further cleanup of evalFuture() and backends commit 0511461 Author: Henrik Bengtsson <[email protected]> Date: Tue Feb 11 19:15:28 2025 -0800 Add internal getFutureCore(), getFutureCapture(), and getFutureContext() Separate future expression and backend packages
1 parent e1cc26f commit d17e255

File tree

108 files changed

+537
-373
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

108 files changed

+537
-373
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Package: future
2-
Version: 1.34.0-9126
2+
Version: 1.34.0-9134
33
Title: Unified Parallel and Distributed Processing in R for Everyone
44
Imports:
55
digest,

NAMESPACE

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ S3method(c,FutureGlobals)
1212
S3method(futures,environment)
1313
S3method(futures,list)
1414
S3method(futures,listenv)
15-
S3method(getExpression,ClusterFuture)
1615
S3method(getExpression,Future)
17-
S3method(getExpression,MulticoreFuture)
18-
S3method(getExpression,UniprocessFuture)
16+
S3method(getFutureBackendConfigs,ClusterFuture)
17+
S3method(getFutureBackendConfigs,Future)
18+
S3method(getFutureBackendConfigs,MulticoreFuture)
19+
S3method(getFutureBackendConfigs,UniprocessFuture)
1920
S3method(globals,Future)
2021
S3method(journal,Future)
2122
S3method(journal,FutureJournal)
Lines changed: 46 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ run.ClusterFuture <- function(future, ...) {
103103
assertOwner(future)
104104

105105
workers <- future$workers
106-
expr <- getExpression(future)
106+
data <- getFutureData(future)
107107
persistent <- isTRUE(future$persistent)
108108

109109
## FutureRegistry to use
@@ -150,7 +150,7 @@ run.ClusterFuture <- function(future, ...) {
150150

151151

152152
## (ii) Attach packages that needs to be attached
153-
## NOTE: Already take care of by getExpression() of the Future class.
153+
## NOTE: Already take care of by evalFuture().
154154
## However, if we need to get an early error about missing packages,
155155
## we can get the error here before launching the future.
156156
t_start <- Sys.time()
@@ -179,7 +179,7 @@ run.ClusterFuture <- function(future, ...) {
179179
FutureRegistry(reg, action = "add", future = future, earlySignal = FALSE)
180180

181181
## (iv) Launch future
182-
send_call(cl[[1L]], fun = geval, args = list(expr), future = future, when = "launch future on")
182+
send_call(cl[[1L]], fun = evalFuture, args = list(data), when = "launch future on")
183183

184184
future$state <- 'running'
185185

@@ -563,90 +563,6 @@ requestNode <- function(await, workers, timeout = getOption("future.wait.timeout
563563

564564

565565

566-
#' @export
567-
getExpression.ClusterFuture <- local({
568-
tmpl_expr_conditions <- future:::bquote_compile({
569-
"# future:::getExpression.ClusterFuture(): inject code for instant"
570-
"# relaying of 'immediateCondition' objects back to the parent R "
571-
"# process via the existing PSOCK channel "
572-
...future.makeSendCondition <- base::local({
573-
sendCondition <- NULL
574-
575-
function(frame = 1L) {
576-
if (is.function(sendCondition)) return(sendCondition)
577-
578-
ns <- getNamespace("parallel")
579-
if (exists("sendData", mode = "function", envir = ns)) {
580-
parallel_sendData <- get("sendData", mode = "function", envir = ns)
581-
582-
## Find the 'master' argument of the worker's {slave,work}Loop()
583-
envir <- sys.frame(frame)
584-
master <- NULL
585-
while (!identical(envir, .GlobalEnv) && !identical(envir, emptyenv())) {
586-
if (exists("master", mode = "list", envir = envir, inherits=FALSE)) {
587-
master <- get("master", mode = "list", envir = envir, inherits = FALSE)
588-
if (inherits(master, c("SOCKnode", "SOCK0node"))) {
589-
sendCondition <<- function(cond) {
590-
data <- list(type = "VALUE", value = cond, success = TRUE)
591-
parallel_sendData(master, data)
592-
}
593-
return(sendCondition)
594-
}
595-
}
596-
frame <- frame + 1L
597-
envir <- sys.frame(frame)
598-
}
599-
}
600-
601-
## Failed to locate 'master' or 'parallel:::sendData()',
602-
## so just ignore conditions
603-
sendCondition <<- function(cond) NULL
604-
}
605-
})
606-
607-
withCallingHandlers({
608-
.(expr)
609-
}, immediateCondition = function(cond) {
610-
sendCondition <- ...future.makeSendCondition()
611-
sendCondition(cond)
612-
613-
## Avoid condition from being signaled more than once
614-
## muffleCondition <- future:::muffleCondition()
615-
muffleCondition <- .(muffleCondition)
616-
muffleCondition(cond)
617-
})
618-
})
619-
620-
621-
function(future, expr = future$expr, immediateConditions = TRUE, ...) {
622-
## Assert that no arguments but the first is passed by position
623-
assert_no_positional_args_but_first()
624-
625-
## Inject code for resignaling immediateCondition:s?
626-
if (immediateConditions) {
627-
resignalImmediateConditions <- getOption("future.psock.relay.immediate", TRUE)
628-
if (resignalImmediateConditions) {
629-
immediateConditionClasses <- getOption("future.relay.immediate", "immediateCondition")
630-
if (length(immediateConditionClasses) > 0L) {
631-
## Does the cluster node communicate with a connection?
632-
## (if not, it's via MPI)
633-
workers <- future$workers
634-
## AD HOC/FIXME: Here 'future$node' is yet not assigned, so we look at
635-
## the first worker and assume the others are the same. /HB 2019-10-23
636-
cl <- workers[1L]
637-
node <- cl[[1L]]
638-
con <- node$con
639-
if (!is.null(con)) {
640-
expr <- bquote_apply(tmpl_expr_conditions)
641-
} ## if (!is.null(con))
642-
}
643-
}
644-
} ## if (resignalImmediateConditions && immediateConditions)
645-
646-
NextMethod(expr = expr, immediateConditions = immediateConditions)
647-
}
648-
})
649-
650566

651567
send_call <- function(node, ..., when = "send call to", future) {
652568
sendCall <- importParallel("sendCall")
@@ -772,3 +688,46 @@ post_mortem_cluster_failure <- function(ex, when, node, future) {
772688

773689
msg
774690
} # post_mortem_cluster_failure()
691+
692+
693+
694+
getPsockImmediateConditionHandler <- local({
695+
sendCondition <- NULL
696+
697+
function(frame = 1L) {
698+
if (is.function(sendCondition)) return(sendCondition)
699+
700+
ns <- getNamespace("parallel")
701+
if (exists("sendData", mode = "function", envir = ns)) {
702+
parallel_sendData <- get("sendData", mode = "function", envir = ns)
703+
704+
## Find the 'master' argument of the worker's {slave,work}Loop()
705+
envir <- sys.frame(frame)
706+
master <- NULL
707+
while (!identical(envir, .GlobalEnv) && !identical(envir, emptyenv())) {
708+
if (exists("master", mode = "list", envir = envir, inherits = FALSE)) {
709+
master <- get("master", mode = "list", envir = envir, inherits = FALSE)
710+
if (inherits(master, c("SOCKnode", "SOCK0node"))) {
711+
sendCondition <<- function(cond) {
712+
data <- list(type = "VALUE", value = cond, success = TRUE)
713+
parallel_sendData(master, data)
714+
}
715+
return(sendCondition)
716+
}
717+
}
718+
frame <- frame + 1L
719+
envir <- sys.frame(frame)
720+
}
721+
}
722+
723+
## Failed to locate 'master' or 'parallel:::sendData()',
724+
## so just ignore immedicate conditions
725+
sendCondition <<- function(cond) NULL
726+
}
727+
}) ## getPsockImmediateConditionHandler()
728+
729+
730+
psockImmediateConditionHandler <- function(cond) {
731+
handler <- getPsockImmediateConditionHandler()
732+
handler(cond)
733+
}

0 commit comments

Comments
 (0)