Skip to content

Commit f094f2d

Browse files
doFuture2(): Handle interrupts and errors also during the phase where futures are launched [#85]
1 parent 6917ae4 commit f094f2d

File tree

2 files changed

+142
-114
lines changed

2 files changed

+142
-114
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Package: doFuture
2-
Version: 1.0.2-9016
2+
Version: 1.0.2-9017
33
Title: Use Foreach to Parallelize via the Future Framework
44
Depends:
55
foreach (>= 1.5.0),

R/doFuture2.R

Lines changed: 141 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ doFuture2 <- function(obj, expr, envir, data) { #nolint
8989

9090

9191
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
92-
## 4. Load balancing ("chunking")
92+
## 2. Load balancing ("chunking")
9393
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
9494
## (a) .options.future = list(chunk.size = <numeric>)
9595
## cf. future_lapply(..., future.chunk.size)
@@ -113,7 +113,7 @@ doFuture2 <- function(obj, expr, envir, data) { #nolint
113113

114114

115115
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
116-
## 5. Create futures
116+
## 3. Prepare for creating futures
117117
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
118118
## Relay standard output or conditions?
119119
stdout <- options[["stdout"]]
@@ -133,8 +133,6 @@ doFuture2 <- function(obj, expr, envir, data) { #nolint
133133
if (length(conditions) > 0) conditions <- structure(conditions, drop = TRUE)
134134

135135
nchunks <- length(chunks)
136-
fs <- vector("list", length = nchunks)
137-
if (debug) mdebugf("Number of futures (= number of chunks): %d", nchunks)
138136

139137
## Adjust option 'future.globals.maxSize' to account for the fact that more
140138
## than one element is processed per future. The adjustment is done by
@@ -170,7 +168,7 @@ doFuture2 <- function(obj, expr, envir, data) { #nolint
170168

171169

172170
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
173-
## Reproducible RNG (for sequential and parallel processing)
171+
## 4. Reproducible RNG
174172
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
175173
seed <- options[["seed"]]
176174
if (is.null(seed)) seed <- eval(formals(future)$seed)
@@ -215,7 +213,7 @@ doFuture2 <- function(obj, expr, envir, data) { #nolint
215213

216214

217215
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
218-
## 2. Construct future expression from %dofuture% expression
216+
## 5. Construct future expression from %dofuture% expression
219217
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
220218
if (debug) {
221219
mdebug_push("%dofuture% R expression:")
@@ -269,7 +267,7 @@ doFuture2 <- function(obj, expr, envir, data) { #nolint
269267

270268

271269
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
272-
## 3. Identify globals and packages
270+
## 6. Identify globals and packages
273271
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
274272
if (debug) mdebug_push("Identifying globals and packages ...")
275273

@@ -365,96 +363,106 @@ doFuture2 <- function(obj, expr, envir, data) { #nolint
365363
}
366364

367365
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
368-
## Creating futures
366+
## 7. Creating futures
369367
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
370368
labels <- sprintf("doFuture2-%s", seq_len(nchunks))
371-
372-
if (debug) mdebugf_push("Launching %d futures (chunks) ...", nchunks)
373-
for (ii in seq_along(chunks)) {
374-
chunk <- chunks[[ii]]
369+
fs <- tryCatch({
375370
if (debug) {
376-
mdebugf_push("Chunk #%d of %d ...", ii, length(chunks))
377-
mdebugf("Chunk indices: [n=%d] %s", length(chunk), hpaste(chunk))
371+
mdebugf_push("Launching %d futures (chunks) ...", nchunks)
372+
on.exit(mdebugf_pop())
378373
}
379-
## Subsetting outside future is more efficient
380-
globals_ii <- globals
381-
packages_ii <- packages
382-
args_list_ii <- args_list[chunk]
383-
globals_ii[["...future.x_ii"]] <- args_list_ii
384-
385-
if (debug) mdebugf_push("Finding globals in 'args_list' for chunk #%d ...", ii)
386-
## Search for globals in 'args_list_ii':
387-
gp <- getGlobalsAndPackages(args_list_ii, envir = envir, globals = TRUE)
388-
globals_X <- gp$globals
389-
packages_X <- gp$packages
390-
gp <- NULL
391374

392-
if (debug) {
393-
info <- if (length(globals_X) == 0) "" else hpaste(sQuote(names(globals_X)))
394-
mdebugf("Globals: [n=%d] %s", length(globals_X), info)
395-
info <- if (length(packages_X) == 0) "" else hpaste(sQuote(packages))
396-
mdebugf("Packages: [n=%d] %s", length(packages_X), info)
397-
}
375+
fs <- vector("list", length = nchunks)
376+
377+
for (ii in seq_along(chunks)) {
378+
chunk <- chunks[[ii]]
379+
if (debug) {
380+
mdebugf_push("Chunk #%d of %d ...", ii, length(chunks))
381+
mdebugf("Chunk indices: [n=%d] %s", length(chunk), hpaste(chunk))
382+
}
383+
## Subsetting outside future is more efficient
384+
globals_ii <- globals
385+
packages_ii <- packages
386+
args_list_ii <- args_list[chunk]
387+
globals_ii[["...future.x_ii"]] <- args_list_ii
388+
389+
if (debug) mdebugf_push("Finding globals in 'args_list' for chunk #%d ...", ii)
390+
## Search for globals in 'args_list_ii':
391+
gp <- getGlobalsAndPackages(args_list_ii, envir = envir, globals = TRUE)
392+
globals_X <- gp$globals
393+
packages_X <- gp$packages
394+
gp <- NULL
398395

399-
## Export also globals found in 'args_list_ii'
400-
if (length(globals_X) > 0L) {
401-
reserved <- intersect(c("...future.FUN", "...future.x_ii"), names(globals_X))
402-
if (length(reserved) > 0) {
403-
mdebugf_pop() ## "Finding globals in 'args_list' for chunk #%d ..."
404-
mdebugf_pop() ## "Chunk #%d of %d ..."
405-
mdebugf_pop() ## "Launching %d futures (chunks) ..."
406-
stop("Detected globals in 'args_list' using reserved variables names: ",
407-
paste(sQuote(reserved), collapse = ", "))
396+
if (debug) {
397+
info <- if (length(globals_X) == 0) "" else hpaste(sQuote(names(globals_X)))
398+
mdebugf("Globals: [n=%d] %s", length(globals_X), info)
399+
info <- if (length(packages_X) == 0) "" else hpaste(sQuote(packages))
400+
mdebugf("Packages: [n=%d] %s", length(packages_X), info)
408401
}
409-
globals_ii <- unique(c(globals_ii, globals_X))
410-
411-
## Packages needed due to globals in 'args_list_ii'?
412-
if (length(packages_X) > 0L)
413-
packages_ii <- unique(c(packages_ii, packages_X))
414-
}
415-
416-
rm(list = c("globals_X", "packages_X"))
417-
418-
if (debug) mdebug_pop() ## "Finding globals in 'args_list' for chunk #%d ..."
419-
420-
rm(list = "args_list_ii")
421402

422-
if (!is.null(globals.maxSize.adjusted)) {
423-
globals_ii <- c(globals_ii, ...future.globals.maxSize = globals.maxSize)
424-
}
425-
426-
## Using RNG seeds or not?
427-
if (is.null(seeds)) {
428-
if (debug) mdebug("seeds: <none>")
429-
} else {
430-
if (debug) mdebugf("seeds: [n=%d] <seeds>", length(chunk))
431-
globals_ii[["...future.seeds_ii"]] <- seeds[chunk]
432-
stop_if_not(length(seeds[chunk]) > 0, is.list(seeds[chunk]))
433-
}
434-
435-
fs[[ii]] <- future(
436-
expr_mapreduce, substitute = FALSE,
437-
envir = envir,
438-
globals = globals_ii,
439-
packages = packages_ii,
440-
seed = seed,
441-
stdout = stdout,
442-
conditions = conditions,
443-
label = labels[ii]
444-
)
445-
446-
## Not needed anymore
447-
rm(list = c("chunk", "globals_ii", "packages_ii"))
403+
## Export also globals found in 'args_list_ii'
404+
if (length(globals_X) > 0L) {
405+
reserved <- intersect(c("...future.FUN", "...future.x_ii"), names(globals_X))
406+
if (length(reserved) > 0) {
407+
mdebugf_pop() ## "Finding globals in 'args_list' for chunk #%d ..."
408+
mdebugf_pop() ## "Chunk #%d of %d ..."
409+
stop("Detected globals in 'args_list' using reserved variables names: ",
410+
paste(sQuote(reserved), collapse = ", "))
411+
}
412+
globals_ii <- unique(c(globals_ii, globals_X))
413+
414+
## Packages needed due to globals in 'args_list_ii'?
415+
if (length(packages_X) > 0L)
416+
packages_ii <- unique(c(packages_ii, packages_X))
417+
}
418+
419+
rm(list = c("globals_X", "packages_X"))
420+
421+
if (debug) mdebug_pop() ## "Finding globals in 'args_list' for chunk #%d ..."
422+
423+
rm(list = "args_list_ii")
424+
425+
if (!is.null(globals.maxSize.adjusted)) {
426+
globals_ii <- c(globals_ii, ...future.globals.maxSize = globals.maxSize)
427+
}
428+
429+
## Using RNG seeds or not?
430+
if (is.null(seeds)) {
431+
if (debug) mdebug("seeds: <none>")
432+
} else {
433+
if (debug) mdebugf("seeds: [n=%d] <seeds>", length(chunk))
434+
globals_ii[["...future.seeds_ii"]] <- seeds[chunk]
435+
stop_if_not(length(seeds[chunk]) > 0, is.list(seeds[chunk]))
436+
}
437+
438+
fs[[ii]] <- future(
439+
expr_mapreduce, substitute = FALSE,
440+
envir = envir,
441+
globals = globals_ii,
442+
packages = packages_ii,
443+
seed = seed,
444+
stdout = stdout,
445+
conditions = conditions,
446+
label = labels[ii]
447+
)
448+
449+
## Not needed anymore
450+
rm(list = c("chunk", "globals_ii", "packages_ii"))
451+
452+
if (debug) mdebug_pop() ## "Chunk #%d of %d ..."
453+
} ## for (ii ...)
448454

449-
if (debug) mdebug_pop() ## "Chunk #%d of %d ..."
450-
} ## for (ii ...)
455+
fs
456+
}, interrupt = identity, error = identity) ## tryCatch()
451457
rm(list = c("globals", "packages", "labels", "seeds"))
452-
if (debug) mdebug_pop() ## "Launching %d futures (chunks) ..."
458+
459+
## Handle errors and interrupts (during launching of futures)
460+
handleInterruptsAndErrors(fs, values = fs)
453461
stop_if_not(length(fs) == nchunks)
454462

455463

456464
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
457-
## 6. Resolve futures, gather their values, and reduce
465+
## 8. Resolve futures, gather their values, and reduce
458466
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
459467
## Resolve futures
460468
values <- tryCatch({
@@ -529,33 +537,9 @@ doFuture2 <- function(obj, expr, envir, data) { #nolint
529537
}, interrupt = identity, error = identity) ## tryCatch()
530538
rm(list = "chunks")
531539

532-
if (inherits(values, "interrupt") || inherits(values, "error")) {
533-
if (inherits(values, "interrupt")) {
534-
when <- Sys.time()
535-
host <- Sys.info()[["nodename"]]
536-
pid <- Sys.getpid()
537-
msg <- sprintf("'%%dofuture%%' interrupted at %s, while running on %s (pid %s)", format(when, format = "%FT%T"), sQuote(host), pid)
538-
warning(sprintf("%s. Canceling all iterations ...", msg), immediate. = TRUE, call. = FALSE)
539-
540-
## Interrupt all futures (if an error, value() already did it)
541-
fs <- cancel(fs)
542-
}
543-
544-
## Make sure all workers finish before continuing
545-
fs <- resolve(fs)
546-
547-
## Collect all results
548-
void <- lapply(fs, FUN = function(f) {
549-
tryCatch(value(f), error = identity)
550-
})
551-
552-
## Resignal error?
553-
if (inherits(values, "error")) {
554-
stop(values)
555-
}
556-
557-
stop(FutureInterruptError(msg))
558-
}
540+
## Handle errors and interrupts (during collection of futures)
541+
handleInterruptsAndErrors(fs, values = values)
542+
stop_if_not(length(values) == nchunks)
559543

560544
## Not needed anymore
561545
rm(list = "fs")
@@ -564,7 +548,6 @@ doFuture2 <- function(obj, expr, envir, data) { #nolint
564548
mdebugf("Number of value chunks collected: %d", length(values))
565549
}
566550

567-
stop_if_not(length(values) == nchunks)
568551
if (debug) mdebugf("Reducing values from %d chunks ...", nchunks)
569552

570553
if (debug) {
@@ -607,6 +590,10 @@ elements in 'X' (= %d). There were in total %d chunks and %d elements (%s)",
607590
}
608591
values <- values2 <- results <- NULL
609592

593+
594+
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
595+
## 10. Accumlate results
596+
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
610597
## Combine results (and identify errors)
611598
## NOTE: This is adopted from foreach:::doSEQ()
612599
if (debug) mdebug_push("Accumulating results ...")
@@ -633,7 +620,7 @@ elements in 'X' (= %d). There were in total %d chunks and %d elements (%s)",
633620

634621

635622
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
636-
## 7. Error handling
623+
## 11. Error handling
637624
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
638625
if (debug) mdebug_push("Handling errors ...")
639626
error_value <- getErrorValue(it)
@@ -665,7 +652,7 @@ elements in 'X' (= %d). There were in total %d chunks and %d elements (%s)",
665652

666653

667654
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
668-
## 8. Final results
655+
## 12. Final results
669656
## - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
670657
if (debug) mdebug_push("Extracting results ...")
671658
res <- getResult(it)
@@ -806,3 +793,44 @@ tmpl_expr_options <- bquote_compile({
806793
}
807794
.(expr)
808795
})
796+
797+
798+
799+
handleInterruptsAndErrors <- function(fs, values = fs, debug = FALSE) {
800+
if (debug) {
801+
mdebug_push("handleInterruptsAndErrors() ...")
802+
mdebug(sprintf("Result: <%s>", class(values)[1]))
803+
on.exit(mdebug_pop())
804+
}
805+
806+
if (!inherits(values, "interrupt") && !inherits(values, "error")) return(fs)
807+
808+
if (inherits(values, "interrupt")) {
809+
when <- Sys.time()
810+
host <- Sys.info()[["nodename"]]
811+
pid <- Sys.getpid()
812+
msg <- sprintf("'%%dofuture%%' interrupted at %s, while running on %s (pid %s)", format(when, format = "%FT%T"), sQuote(host), pid)
813+
warning(sprintf("%s. Canceling all iterations ...", msg), immediate. = TRUE, call. = FALSE)
814+
815+
## Interrupt all futures (if an error, value() already did it)
816+
fs <- cancel(fs)
817+
}
818+
819+
## Make sure all workers finish before continuing
820+
fs <- resolve(fs)
821+
822+
## Collect all results
823+
void <- lapply(fs, FUN = function(f) {
824+
tryCatch(value(f), error = identity)
825+
})
826+
827+
## Signal error or interrupt?
828+
if (inherits(fs, "error")) {
829+
ex <- fs
830+
} else {
831+
ex <- FutureInterruptError(msg)
832+
}
833+
if (debug) mdebug(sprintf("Signaling: <%s>", class(ex)[1]))
834+
835+
stop(ex)
836+
} ## handleInterruptsAndErrors()

0 commit comments

Comments
 (0)