Skip to content

Commit 52c9f90

Browse files
Now future.tests::check("mirai::daemons(2); plan(future.mirai::mirai_cluster)") works
1 parent 1d0b1b2 commit 52c9f90

File tree

8 files changed

+91
-112
lines changed

8 files changed

+91
-112
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Package: future.mirai
2-
Version: 0.2.2-9206
2+
Version: 0.2.2-9207
33
Depends:
44
future
55
Imports:

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,8 @@ spelling:
77
future.tests/%:
88
$(R_SCRIPT) -e "future.tests::check" --args --test-plan="future.mirai::$*"
99

10+
future.tests/mirai_cluster:
11+
$(R_SCRIPT) -e "future.tests::check" --args --test-plan="mirai::daemons(2); plan(future.mirai::mirai_cluster)"
12+
1013
future.tests: future.tests/mirai_multisession future.tests/mirai_cluster
1114

NAMESPACE

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ S3method(resolved,MiraiFuture)
1010
S3method(result,MiraiFuture)
1111
S3method(stopWorkers,MiraiFutureBackend)
1212
S3method(tweak,mirai_cluster)
13-
S3method(tweak,mirai_multisession)
1413
export(MiraiFutureBackend)
1514
export(MiraiMultisessionFutureBackend)
1615
export(mirai_cluster)

NEWS.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
## New Features
44

55
* Now 'mirai' futures can be interrupted using `interrupt()`.
6-
6+
7+
* Now `future.tests::check("mirai::daemons(2);
8+
plan(future.mirai::mirai_cluster)")` works.
9+
710

811
# Version 0.2.2
912

R/MiraiFutureBackend-class.R

Lines changed: 82 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -13,31 +13,37 @@
1313
#' @importFrom mirai status
1414
#' @importFrom future FutureBackend SequentialFutureBackend
1515
#' @export
16-
MiraiFutureBackend <- function(...) {
17-
status <- status()
18-
if (status[["connections"]] == 0L) {
19-
stop("Mirai futures require that at least one mirai daemon is available. mirai::status() reports:\n%s", paste(capture.output(print(status)), collapse = "\n"))
20-
}
21-
22-
## Assert that a mirai dispatcher is in place, which is
23-
## required to protect against launching too many workers
24-
dispatcher <- !is.null(status[["mirai"]])
25-
if (!dispatcher) {
26-
stop(sprintf("Mirai futures require that the mirai daemons are configured to use a dispatcher (dispatcher = TRUE). If not, there is a risk of launching an unlimited number of mirai processes. This requirement might be relaxed in future versions of the %s package. mirai::status() reports:\n%s", sQuote(.packageName), paste(capture.output(print(status)), collapse = "\n")))
16+
MiraiFutureBackend <- local({
17+
with_stealth_rng <- import_future("with_stealth_rng")
18+
19+
function(...) {
20+
status <- status()
21+
22+
if (status[["connections"]] == 0L) {
23+
stop(FutureError(sprintf("Mirai futures require that at least one mirai daemon is available. mirai::status() reports:\n%s", paste(capture.output(print(status)), collapse = "\n"))))
24+
}
25+
26+
## Assert that a mirai dispatcher is in place, which is
27+
## required to protect against launching too many workers
28+
dispatcher <- !is.null(status[["mirai"]])
29+
if (!dispatcher) {
30+
stop(sprintf("Mirai futures require that the mirai daemons are configured to use a dispatcher (dispatcher = TRUE). If not, there is a risk of launching an unlimited number of mirai processes. This requirement might be relaxed in future versions of the %s package. mirai::status() reports:\n%s", sQuote(.packageName), paste(capture.output(print(status)), collapse = "\n")))
31+
}
32+
33+
core <- FutureBackend(
34+
reg = "workers-mirai",
35+
dispatcher = dispatcher,
36+
shutdown = FALSE,
37+
...,
38+
timeout = getOption("future.wait.timeout", 30 * 24 * 60 * 60),
39+
delta = getOption("future.wait.interval", 0.2),
40+
alpha = getOption("future.wait.alpha", 1.01)
41+
)
42+
core[["futureClasses"]] <- c("MiraiFuture", "MultiprocessFuture", core[["futureClasses"]])
43+
core <- structure(core, class = c("MiraiFutureBackend", "MultiprocessFutureBackend", "FutureBackend", class(core)))
44+
core
2745
}
28-
29-
core <- FutureBackend(
30-
reg = "workers-mirai",
31-
dispatcher = dispatcher,
32-
...,
33-
timeout = getOption("future.wait.timeout", 30 * 24 * 60 * 60),
34-
delta = getOption("future.wait.interval", 0.2),
35-
alpha = getOption("future.wait.alpha", 1.01)
36-
)
37-
core[["futureClasses"]] <- c("MiraiFuture", "MultiprocessFuture", core[["futureClasses"]])
38-
core <- structure(core, class = c("MiraiFutureBackend", "MultiprocessFutureBackend", "FutureBackend", class(core)))
39-
core
40-
}
46+
})
4147

4248

4349
#' @importFrom mirai mirai
@@ -54,6 +60,8 @@ launchFuture.MiraiFutureBackend <- local({
5460
on.exit(mdebugf("launchFuture() for %s ... done", class(backend)[1], debug = debug))
5561
}
5662

63+
stop_if_not(nbrOfWorkers(backend) > 0L)
64+
5765
## Wait for a free worker
5866
waitForWorker(backend, debug = debug)
5967

@@ -74,7 +82,7 @@ launchFuture.MiraiFutureBackend <- local({
7482
if (!is.null(workers)) future[["workers"]] <- workers
7583

7684
data <- getFutureData(future)
77-
85+
7886
future[["state"]] <- "submitted"
7987
mirai <- mirai(future:::evalFuture(data), data = data)
8088
future[["mirai"]] <- mirai
@@ -92,23 +100,41 @@ launchFuture.MiraiFutureBackend <- local({
92100
#' @importFrom future stopWorkers interrupt
93101
#' @export
94102
stopWorkers.MiraiFutureBackend <- function(backend, ...) {
103+
debug <- isTRUE(getOption("future.mirai.debug"))
104+
if (debug) {
105+
mdebugf("stopWorkers() for %s ...", class(backend)[1], debug = debug)
106+
on.exit(mdebugf("stopWorkers() for %s ... done", class(backend)[1], debug = debug))
107+
}
108+
95109
reg <- backend[["reg"]]
96110
futures <- FutureRegistry(reg, action = "list", earlySignal = FALSE)
111+
if (debug) mdebugf("Number of active futures: %d", length(futures))
97112

98-
## Nothing to do?
99-
if (length(futures) == 0L) return(backend)
100-
101-
## Enable interrupts temporarily, if disabled
102-
if (!isTRUE(backend[["interrupts"]])) {
103-
backend[["interrupts"]] <- TRUE
104-
on.exit(backend[["interrupts"]] <- FALSE)
113+
if (length(futures) > 0L) {
114+
## Enable interrupts temporarily, if disabled
115+
if (!isTRUE(backend[["interrupts"]])) {
116+
backend[["interrupts"]] <- TRUE
117+
on.exit(backend[["interrupts"]] <- FALSE)
118+
}
119+
120+
## Interrupt all futures, which terminates the workers
121+
if (debug) mdebugf_push("Interrupt futures ...")
122+
futures <- lapply(futures, FUN = interrupt)
123+
if (debug) mdebugf_pop("Interrupt futures ... done")
124+
125+
## Erase registry
126+
futures <- FutureRegistry(reg, action = "reset")
105127
}
106128

107-
## Interrupt all futures, which terminates the workers
108-
futures <- lapply(futures, FUN = interrupt)
109-
110-
## Erase registry
111-
futures <- FutureRegistry(reg, action = "reset")
129+
## Stop workers?
130+
if (backend[["shutdown"]]) {
131+
if (debug) {
132+
mdebug("Mirai daemons:")
133+
mprint(mirai::status())
134+
}
135+
mirai::daemons(n = 0L)
136+
if (debug) mdebug("Mirai daemons shut down")
137+
}
112138

113139
backend
114140
}
@@ -124,14 +150,14 @@ nbrOfWorkers.MiraiFutureBackend <- function(evaluator) {
124150
workers <- res[["daemons"]]
125151
if (is_error_value(workers)) {
126152
reason <- capture.output(print(workers))
127-
stop(FutureError(sprintf("mirai::status() failed to communicate with dispatcher: %s", reason)))
153+
stop(FutureError(sprintf("Cannot infer number of mirai workers. mirai::status() failed to communicate with dispatcher: %s", reason)))
128154
}
129155

130156
if (is.character(workers)) {
131157
workers <- res[["connections"]]
132158
stop_if_not(is.numeric(workers))
133159
} else if (!is.numeric(workers)) {
134-
stop(FutureError(sprintf("Unknown type of mirai::daemons()$daemons: %s", typeof(workers))))
160+
stop(FutureError(sprintf("Cannot infer number of mirai workers. Unknown type of mirai::daemons()$daemons: %s", typeof(workers))))
135161
}
136162

137163
if (is.matrix(workers)) {
@@ -143,11 +169,12 @@ nbrOfWorkers.MiraiFutureBackend <- function(evaluator) {
143169
}
144170

145171
if (length(workers) != 1L) {
146-
stop(FutureError(sprintf("Length of mirai::daemons()$daemons is not one: %d", length(workers))))
172+
stop(FutureError(sprintf("Cannot infer number of mirai workers. Length of mirai::daemons()$daemons is not one: %d", length(workers))))
147173
}
148174

149-
if (workers == 0L) return(Inf)
150-
175+
mirai <- res[["mirai"]]
176+
if (is.null(mirai)) return(0L)
177+
151178
workers
152179
}
153180

@@ -161,7 +188,7 @@ nbrOfFreeWorkers.MiraiFutureBackend <- function(evaluator, background = FALSE, .
161188
workers <- res[["daemons"]]
162189
if (is_error_value(workers)) {
163190
reason <- capture.output(print(workers))
164-
stop(FutureError(sprintf("mirai::status() failed to communicate with dispatcher: %s", reason)))
191+
stop(FutureError(sprintf("Cannot infer number of free mirai workers. mirai::status() failed to communicate with dispatcher: %s", reason)))
165192
}
166193

167194
if (is.character(workers)) {
@@ -180,11 +207,14 @@ nbrOfFreeWorkers.MiraiFutureBackend <- function(evaluator, background = FALSE, .
180207
}
181208

182209
if (length(workers) != 1L) {
183-
stop(FutureError(sprintf("Length of mirai::daemons()$daemons is not one: %d", length(workers))))
210+
stop(FutureError(sprintf("Cannot infer number of free mirai workers. Length of mirai::daemons()$daemons is not one: %d", length(workers))))
184211
}
185212

186213
mirai <- res[["mirai"]]
187-
stop_if_not(!is.null(mirai))
214+
if (is.null(mirai)) {
215+
stop(FutureError("Cannot infer number of free mirai workers. mirai::status() reports zero daemons. Did you call mirai::daemons(0) by mistake?"))
216+
}
217+
188218
used <- mirai[["awaiting"]] + mirai[["executing"]]
189219
workers <- workers - used
190220
stop_if_not(is.numeric(workers), is.finite(workers), workers >= 0)
@@ -330,6 +360,13 @@ interruptFuture.MiraiFutureBackend <- function(backend, future, ...) {
330360
}
331361

332362

363+
#' @importFrom future tweak
364+
#' @export
365+
tweak.mirai_cluster <- function(strategy, ..., penvir = parent.frame()) {
366+
attr(strategy, "init") <- TRUE
367+
NextMethod("tweak")
368+
}
369+
333370
#' Mirai-based cluster futures
334371
#'
335372
#' @inheritParams future::Future
@@ -340,12 +377,6 @@ interruptFuture.MiraiFutureBackend <- function(backend, future, ...) {
340377
#'
341378
#' @example incl/mirai_cluster.R
342379
#'
343-
#' @details
344-
#' _WARNING_: When using this future plan, mirai workers are _not_ shut down
345-
#' when switching away from this future plan. This is because it the backend
346-
#' requires them to be launched manually before, and it therefore needs to be
347-
#' manually shutdown as well.
348-
#'
349380
#' @importFrom future Future
350381
#' @export
351382
mirai_cluster <- function(..., envir = parent.frame()) {
@@ -354,11 +385,3 @@ mirai_cluster <- function(..., envir = parent.frame()) {
354385
class(mirai_cluster) <- c("mirai_cluster", "mirai", "multiprocess", "future", "function")
355386
attr(mirai_cluster, "init") <- TRUE
356387
attr(mirai_cluster, "factory") <- MiraiFutureBackend
357-
358-
359-
#' @importFrom future tweak
360-
#' @export
361-
tweak.mirai_cluster <- function(strategy, ..., penvir = parent.frame()) {
362-
attr(strategy, "init") <- TRUE
363-
NextMethod("tweak")
364-
}

R/MiraiMultisessionFutureBackend-class.R

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ MiraiMultisessionFutureBackend <- local({
3737
workers = workers,
3838
...
3939
)
40+
core[["shutdown"]] <- TRUE
4041

4142
core[["futureClasses"]] <- c("MiraiMultisessionFuture", core[["futureClasses"]])
4243
core <- structure(core, class = c("MiraiMultisessionFutureBackend", "MiraiFutureBackend", "MultiprocessFutureBackend", "FutureBackend", class(core)))
@@ -96,14 +97,4 @@ attr(mirai_multisession, "init") <- TRUE
9697
attr(mirai_multisession, "cleanup") <- function(...) {
9798
mirai::daemons(0)
9899
}
99-
attr(mirai_multisession, "tweakable") <- "workers"
100100
attr(mirai_multisession, "factory") <- MiraiMultisessionFutureBackend
101-
102-
103-
104-
#' @importFrom future tweak
105-
#' @export
106-
tweak.mirai_multisession <- function(strategy, ..., penvir = parent.frame()) {
107-
attr(strategy, "init") <- TRUE
108-
NextMethod("tweak")
109-
}

R/utils.R

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,3 @@
1-
now <- function(x = Sys.time(), format = "[%H:%M:%OS3] ") {
2-
## format(x, format = format) ## slower
3-
format(as.POSIXlt(x, tz = ""), format = format)
4-
}
5-
6-
mdebug <- function(..., prefix = now(), debug = isTRUE(getOption("future.mirai.debug"))) {
7-
if (!debug) return()
8-
message(prefix, ...)
9-
}
10-
11-
mdebugf <- function(..., appendLF = TRUE,
12-
prefix = now(), debug = isTRUE(getOption("future.mirai.debug"))) {
13-
if (!debug) return()
14-
message(prefix, sprintf(...), appendLF = appendLF)
15-
}
16-
17-
#' @importFrom utils capture.output str
18-
mstr <- function(..., prefix = now(), debug = isTRUE(getOption("future.mirai.debug"))) {
19-
if (!debug) return()
20-
stdout <- capture.output(str(...))
21-
stdout <- paste(prefix, stdout, sep = "", collapse = "\n")
22-
message(stdout)
23-
}
24-
25-
#' @importFrom utils capture.output str
26-
mprint <- function(..., prefix = now(), debug = isTRUE(getOption("future.mirai.debug"))) {
27-
if (!debug) return()
28-
stdout <- capture.output(print(...))
29-
stdout <- paste(prefix, stdout, sep = "", collapse = "\n")
30-
message(stdout)
31-
}
32-
33-
mprintf <- function(...) message(now(), sprintf(...), appendLF = FALSE)
34-
351
stop_if_not <- function(...) {
362
res <- list(...)
373
for (ii in 1L:length(res)) {

man/mirai_cluster.Rd

Lines changed: 0 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)