Skip to content

Commit ec2446b

Browse files
resolved() on a lazy ClusterFuture:s only collects resolved futures if all slots are busy
1 parent 10ec3cc commit ec2446b

File tree

4 files changed

+39
-23
lines changed

4 files changed

+39
-23
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Package: future
2-
Version: 1.58.0-9002
2+
Version: 1.58.0-9003
33
Title: Unified Parallel and Distributed Processing in R for Everyone
44
Depends:
55
R (>= 3.2.0)

NEWS.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
# Version (development version)
22

3-
* ...
3+
## Performance
4+
5+
* Calling `resolved()` on a lazy `ClusterFuture` would collect the
6+
result for the first _resolved_ future in order to free up one
7+
worker slot. Now this is only done if all slots are occupied. The
8+
net benefit is that lazy cluster futures will be launched faster,
9+
unless all workers are busy.
410

511

612
# Version 1.58.0 [2025-06-05]

R/backend_api-11.ClusterFutureBackend-class.R

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -553,10 +553,11 @@ getSocketSelectTimeout <- function(future, timeout = NULL) {
553553
#' where `<type>` corresponds to the type of future, e.g. `cluster` and `multicore`.
554554
#'
555555
#' @section Behavior of cluster and multisession futures:
556-
#' `resolved()` for `ClusterFuture`, and therefore also `MultisessionFuture`,
557-
#' will always check whether any of the currently running futures are resolved,
558-
#' and if one of them has been resolved, then its result is collected. This makes
559-
#' sure to free up workers, when possible.
556+
#' `resolved()` will always attempt to launch a lazy future, if possible.
557+
#' If all worker slots are occupied, `resolved()` for `ClusterFuture` and
558+
#' `MultisessionFuture` will attempt to free one up by checking whether
559+
#' one of the futures is _resolved_. If it is, then its result is collected
560+
#' in order to free up one worker slot.
560561
#'
561562
#' `resolved()` for `ClusterFuture` may receive immediate condition objects, rather
562563
#' than a [FutureResult], when polling the worker for results. In such cases, the
@@ -579,16 +580,23 @@ resolved.ClusterFuture <- function(x, run = TRUE, timeout = NULL, ...) {
579580
## A lazy future not even launched?
580581
if (future[["state"]] == "created") {
581582
if (run) {
582-
## Can we launch it? Are there available workers?
583+
nworkers <- length(workers)
583584

584-
## Collect one resolved future, if one exists
585-
FutureRegistry(reg, action = "collect-first", earlySignal = TRUE, debug = debug)
585+
## 1. Are there available worker slots? Can we launch it immediately?
586+
futures <- FutureRegistry(reg, action = "list", earlySignal = FALSE, debug = debug)
587+
588+
## 2. No, all worker slots are occupied. Try to collect one
589+
## *resolved* future, if available, to free up one slot
590+
if (length(futures) >= nworkers) {
591+
## Collect one resolved future, if one exists
592+
FutureRegistry(reg, action = "collect-first", earlySignal = TRUE, debug = debug)
593+
futures <- FutureRegistry(reg, action = "list", earlySignal = FALSE, debug = debug)
594+
}
586595

587-
## Find which nodes are available
596+
## 3. Now, there should be available worker slots. Assert this
588597
avail <- rep(TRUE, times = length(workers))
589-
futures <- FutureRegistry(reg, action = "list", earlySignal = FALSE, debug = debug)
590-
if (length(futures) > 0) {
591-
## Get indices for all busy cluster nodes
598+
if (length(futures) > 0 && length(futures) < nworkers) {
599+
## a. Get indices for all busy cluster nodes
592600
nodes <- unlist(lapply(futures, FUN = function(f) f[["node"]]), use.names = FALSE)
593601
stop_if_not(
594602
length(nodes) == length(futures),
@@ -599,14 +607,13 @@ resolved.ClusterFuture <- function(x, run = TRUE, timeout = NULL, ...) {
599607
avail[nodes] <- FALSE
600608
}
601609
if (debug) mdebugf("avail: [n=%d] %s", length(avail), commaq(which(avail)))
602-
603-
## Sanity check
604610
stop_if_not(any(avail))
605611

606-
## If at least one is available, then launch this lazy future
607-
if (any(avail)) future <- run(future)
612+
## 4. Launch this lazy future
613+
future <- run(future)
608614
} ## if (run)
609-
615+
616+
## Consider future non-resolved
610617
return(FALSE)
611618
}
612619

@@ -1120,6 +1127,7 @@ requestNode <- function(await, backend, timeout, delta, alpha) {
11201127
clusterCall(cl = cl, function(x) x, truth)[[1]]
11211128
})
11221129
}, error = identity)
1130+
11231131
## If not working, investigate why, and relaunch a new worker
11241132
if (inherits(res, "error") || !identical(res, truth)) {
11251133
if (debug) {
@@ -1191,7 +1199,8 @@ requestNode <- function(await, backend, timeout, delta, alpha) {
11911199
mdebugf_pop()
11921200
}
11931201
}
1194-
1202+
1203+
## Try again
11951204
Sys.sleep(0.1)
11961205
} ## for (kk in maxTries:1)
11971206

man/resolved.Rd

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)