55# ' @param envir The environment in which global environment
66# ' should be located.
77# '
8- # ' @param substitute Controls whether \code{ expr} should be
9- # ' \code{ substitute()} :d or not.
8+ # ' @param substitute Controls whether ` expr` should be
9+ # ' ` substitute()` :d or not.
1010# '
1111# ' @param globals (optional) a logical, a character vector, a named list, or
12- # ' a \link[globals]{ Globals} object. If TRUE, globals are identified by code
13- # ' inspection based on \code{ expr} and \code{ tweak} searching from environment
14- # ' \code{ envir} . If FALSE, no globals are used. If a character vector, then
15- # ' globals are identified by lookup based their names \code{ globals} searching
16- # ' from environment \code{ envir} . If a named list or a Globals object, the
12+ # ' a [Globals][globals:: Globals] object. If TRUE, globals are identified by code
13+ # ' inspection based on ` expr` and ` tweak` searching from environment
14+ # ' ` envir` . If FALSE, no globals are used. If a character vector, then
15+ # ' globals are identified by lookup based their names ` globals` searching
16+ # ' from environment ` envir` . If a named list or a Globals object, the
1717# ' globals are used as is.
1818# '
1919# ' @param label (optional) Label of the future (where applicable, becomes the
2020# ' job name for most job schedulers).
2121# '
2222# ' @param conf A batchtools configuration environment.
2323# '
24- # ' @param cluster.functions A batchtools \link[batchtools]{ ClusterFunctions}
24+ # ' @param cluster.functions A batchtools [ClusterFunctions][batchtools:: ClusterFunctions]
2525# ' object.
2626# '
2727# ' @param resources A named list passed to the batchtools template (available
28- # ' as variable \code{ resources} ).
28+ # ' as variable ` resources` ).
2929# '
30- # ' @param workers (optional) Additional specification for the batchtools
31- # ' backend.
30+ # ' @param workers (optional) The maximum number of workers the batchtools
31+ # ' backend may use at any time. Interactive and "local" backends can only
32+ # ' process one future at the time, whereas HPC backends where futures are
33+ # ' resolved via separate jobs on a scheduler, the default is to assume an
34+ # ' infinite number of workers.
3235# '
3336# ' @param finalize If TRUE, any underlying registries are
3437# ' deleted when this object is garbage collected, otherwise not.
3538# '
36- # ' @param \ldots Additional arguments passed to \code{\link [future]{ Future}()} .
39+ # ' @param \ldots Additional arguments passed to [future:: Future()] .
3740# '
3841# ' @return A BatchtoolsFuture object
3942# '
4043# ' @export
41- # ' @importFrom future Future
44+ # ' @importFrom future Future getGlobalsAndPackages
4245# ' @importFrom batchtools submitJobs
4346# ' @keywords internal
4447BatchtoolsFuture <- function (expr = NULL , envir = parent.frame(),
@@ -53,29 +56,27 @@ BatchtoolsFuture <- function(expr = NULL, envir = parent.frame(),
5356 if (! is.null(label )) label <- as.character(label )
5457
5558 if (! is.null(cluster.functions )) {
56- stopifnot (is.list(cluster.functions ))
59+ stop_if_not (is.list(cluster.functions ))
5760 }
5861
5962 if (! is.null(workers )) {
60- stopifnot (length(workers ) > = 1 )
63+ stop_if_not (length(workers ) > = 1 )
6164 if (is.numeric(workers )) {
62- stopifnot (! anyNA(workers ), all(workers > = 1 ))
65+ stop_if_not (! anyNA(workers ), all(workers > = 1 ))
6366 } else if (is.character(workers )) {
6467 } else {
65- stopifnot (" Argument 'workers' should be either numeric or character: " ,
68+ stop_if_not (" Argument 'workers' should be either numeric or character: " ,
6669 mode(workers ))
6770 }
6871 }
6972
70- stopifnot (is.list(resources ))
73+ stop_if_not (is.list(resources ))
7174
7275 # # Record globals
73- getGlobalsAndPackages <- import_future(" getGlobalsAndPackages" )
7476 gp <- getGlobalsAndPackages(expr , envir = envir , globals = globals )
7577
76- # # Create BatchtoolsFuture object
7778 future <- Future(expr = gp $ expr , envir = envir , substitute = FALSE ,
78- workers = workers , label = label , ... )
79+ workers = workers , label = label , version = " 1.8 " , ... )
7980
8081 future $ globals <- gp $ globals
8182 future $ packages <- unique(c(packages , gp $ packages ))
@@ -131,6 +132,8 @@ print.BatchtoolsFuture <- function(x, ...) {
131132 } else {
132133 printf(" batchtools Registry:\n " )
133134 print(reg )
135+ printf(" File dir exists: %s\n " , file_test(" -d" , reg $ file.dir ))
136+ printf(" Work dir exists: %s\n " , file_test(" -d" , reg $ work.dir ))
134137 }
135138
136139 invisible (x )
@@ -189,6 +192,13 @@ status.BatchtoolsFuture <- function(future, ...) {
189192 status <- status [status ]
190193 status <- sort(names(status ))
191194 status <- setdiff(status , c(" n" ))
195+
196+ result <- future $ result
197+ if (inherits(result , " FutureResult" )) {
198+ condition <- result $ condition
199+ if (inherits(condition , " error" )) status <- c(" error" , status )
200+ }
201+
192202 status
193203}
194204
@@ -272,7 +282,7 @@ value.BatchtoolsFuture <- function(future, signal = TRUE,
272282 onMissing = c(" default" , " error" ),
273283 default = NULL , cleanup = TRUE , ... ) {
274284 # # Has the value already been collected?
275- if (future $ state %in% c(" finished " , " failed" , " interrupted" )) {
285+ if (future $ state %in% c(" done " , " failed" , " interrupted" )) {
276286 return (NextMethod(" value" ))
277287 }
278288
@@ -289,14 +299,11 @@ value.BatchtoolsFuture <- function(future, signal = TRUE,
289299 stop(sprintf(" The value no longer exists (or never existed) for Future ('%s') of class %s" , label , paste(sQuote(class(future )), collapse = " , " ))) # nolint
290300 }
291301
292- tryCatch({
293- future $ value <- await(future , cleanup = FALSE )
294- future $ state <- " finished"
295- if (cleanup ) delete(future , ... )
296- }, simpleError = function (ex ) {
297- future $ state <- " failed"
298- future $ value <- ex
299- })
302+ result <- await(future , cleanup = FALSE )
303+ stop_if_not(inherits(result , " FutureResult" ))
304+ future $ result <- result
305+ future $ state <- " finished"
306+ if (cleanup ) delete(future , ... )
300307
301308 NextMethod(" value" )
302309} # value()
@@ -338,7 +345,7 @@ run.BatchtoolsFuture <- function(future, ...) {
338345 expr <- substitute(local(expr ), list (expr = expr ))
339346
340347 reg <- future $ config $ reg
341- stopifnot (inherits(reg , " Registry" ))
348+ stop_if_not (inherits(reg , " Registry" ))
342349
343350 # # (ii) Attach packages that needs to be attached
344351 packages <- future $ packages
@@ -383,9 +390,9 @@ run.BatchtoolsFuture <- function(future, ...) {
383390 future $ config $ jobid <- jobid
384391 mdebug(" Created %s future #%d" , class(future )[1 ], jobid $ job.id )
385392
386- # # WORKAROUND: (For multicore and OS X only)
393+ # # WORKAROUND: (For multicore and macOS only)
387394 if (reg $ cluster.functions $ name == " Multicore" ) {
388- # # On some OS X systems, a system call to 'ps' may output an error message
395+ # # On some macOS systems, a system call to 'ps' may output an error message
389396 # # "dyld: DYLD_ environment variables being ignored because main executable
390397 # # (/bin/ps) is setuid or setgid" to standard error that is picked up by
391398 # # batchtools which incorrectly tries to parse it. By unsetting all DYLD_*
@@ -428,14 +435,14 @@ await <- function(...) UseMethod("await")
428435# ' @param timeout Total time (in seconds) waiting before generating an error.
429436# ' @param delta The number of seconds to wait between each poll.
430437# ' @param alpha A factor to scale up the waiting time in each iteration such
431- # ' that the waiting time in the k:th iteration is \code{ alpha ^ k * delta} .
438+ # ' that the waiting time in the k:th iteration is ` alpha ^ k * delta` .
432439# ' @param \ldots Not used.
433440# '
434441# ' @return The value of the evaluated expression.
435442# ' If an error occurs, an informative Exception is thrown.
436443# '
437444# ' @details
438- # ' Note that \code{ await()} should only be called once, because
445+ # ' Note that ` await()` should only be called once, because
439446# ' after being called the actual asynchronous future may be removed
440447# ' and will no longer available in subsequent calls. If called
441448# ' again, an error may be thrown.
@@ -452,8 +459,8 @@ await.BatchtoolsFuture <- function(future, cleanup = TRUE,
452459 alpha = getOption(" future.wait.alpha" , 1.01 ),
453460 ... ) {
454461 mdebug <- import_future(" mdebug" )
455- stopifnot (is.finite(timeout ), timeout > = 0 )
456- stopifnot (is.finite(alpha ), alpha > 0 )
462+ stop_if_not (is.finite(timeout ), timeout > = 0 )
463+ stop_if_not (is.finite(alpha ), alpha > 0 )
457464
458465 debug <- getOption(" future.debug" , FALSE )
459466
@@ -480,13 +487,23 @@ await.BatchtoolsFuture <- function(future, cleanup = TRUE,
480487
481488 finished <- is_na(stat ) || any(c(" done" , " error" , " expired" ) %in% stat )
482489
483- res <- NULL
490+ # # PROTOTYPE RESULTS BELOW:
491+ prototype_fields <- NULL
492+
493+ result <- NULL
484494 if (finished ) {
485495 mdebug(" Results:" )
486496 label <- future $ label
487497 if (is.null(label )) label <- " <none>"
488498 if (" done" %in% stat ) {
489- res <- loadResult(reg = reg , id = jobid )
499+ result <- loadResult(reg = reg , id = jobid )
500+ if (inherits(result , " FutureResult" )) {
501+ prototype_fields <- c(prototype_fields , " stdout" )
502+ result $ stdout <- getLog(id = jobid , reg = reg )
503+ if (inherits(result $ condition , " error" )) {
504+ cleanup <- FALSE
505+ }
506+ }
490507 } else if (" error" %in% stat ) {
491508 cleanup <- FALSE
492509 msg <- sprintf(" BatchtoolsError in %s ('%s'): %s" ,
@@ -512,19 +529,23 @@ await.BatchtoolsFuture <- function(future, cleanup = TRUE,
512529 msg <- sprintf(" BatchtoolsDeleted: Cannot retrieve value. Future ('%s') deleted: %s" , label , reg $ file.dir ) # nolint
513530 stop(BatchtoolsFutureError(msg , future = future ))
514531 }
515- if (debug ) { mstr(res ) }
532+ if (debug ) { mstr(result ) }
516533 } else {
517534 cleanup <- FALSE
518535 msg <- sprintf(" AsyncNotReadyError: Polled for results for %s seconds every %g seconds, but asynchronous evaluation for future ('%s') is still running: %s" , timeout , delta , label , reg $ file.dir ) # nolint
519536 stop(BatchtoolsFutureError(msg , future = future ))
520537 }
521538
539+ if (length(prototype_fields ) > 0 ) {
540+ result $ PROTOTYPE_WARNING <- sprintf(" WARNING: The fields %s should be considered internal and experimental for now, that is, until the Future API for these additional features has been settled. For more information, please see https://github.com/HenrikBengtsson/future/issues/172" , hpaste(sQuote(prototype_fields ), max_head = Inf , collapse = " , " , last_collapse = " and " ))
541+ }
542+
522543 # # Cleanup?
523544 if (cleanup ) {
524545 delete(future , delta = 0.5 * delta , ... )
525546 }
526547
527- res
548+ result
528549} # await()
529550
530551
@@ -600,13 +621,27 @@ delete.BatchtoolsFuture <- function(future,
600621 }
601622 }
602623
624+ # # FIXME: Make sure to collect the results before deleting
625+ # # the internal batchtools registry
626+ result <- future $ result
627+ if (is.null(result )) {
628+ value(future , signal = FALSE )
629+ result <- future $ result
630+ }
631+ stop_if_not(inherits(result , " FutureResult" ))
603632
604633 # # To simplify post mortem troubleshooting in non-interactive sessions,
605634 # # should the batchtools registry files be removed or not?
606635 mdebug(" delete(): Option 'future.delete = %s" ,
607636 sQuote(getOption(" future.delete" , " <NULL>" )))
608637 if (! getOption(" future.delete" , interactive())) {
609638 status <- status(future )
639+ res <- future $ result
640+ if (inherits(res , " FutureResult" )) {
641+ if (inherits(res $ condition , " error" )) status <- " error"
642+ }
643+ mdebug(" delete(): status(<future>) = %s" ,
644+ paste(sQuote(status ), collapse = " , " ))
610645 if (any(c(" error" , " expired" ) %in% status )) {
611646 msg <- sprintf(" Will not remove batchtools registry, because the status of the batchtools was %s and option 'future.delete' is FALSE or running in an interactive session: %s" , paste(sQuote(status ), collapse = " , " ), sQuote(path )) # nolint
612647 mdebug(" delete(): %s" , msg )
0 commit comments