2828# ' process one future at the time (`workers = 1L`), whereas HPC backends,
2929# ' where futures are resolved via separate jobs on a scheduler, can have
3030# ' multiple workers. In the latter, the default is `workers = NULL`, which
31- # ' will resolve to `getOption("future.batchtools.workers")`. If neither
32- # ' are specified, then the default is `100`.
31+ # ' will resolve to
32+ # ' \code{getOption("\link{future.batchtools.workers}")}.
33+ # ' If neither are specified, then the default is `100`.
3334# '
3435# ' @param finalize If TRUE, any underlying registries are
3536# ' deleted when this object is garbage collected, otherwise not.
@@ -230,6 +231,14 @@ status <- function(future, ...) {
230231 })
231232 } # # get_status()
232233
234+ # # Known to be in its final state?
235+ if (getOption(" future.batchtools.status.cache" , TRUE )) {
236+ status <- future $ .status
237+ if (identical(status , c(" defined" , " finished" , " started" , " submitted" ))) {
238+ return (status )
239+ }
240+ }
241+
233242 config <- future $ config
234243 reg <- config $ reg
235244 if (! inherits(reg , " Registry" )) return (NA_character_ )
@@ -251,6 +260,9 @@ status <- function(future, ...) {
251260 if (result_has_errors(result )) status <- unique(c(" error" , status ))
252261 }
253262
263+ # # Cache result
264+ future $ .status <- status
265+
254266 if (debug ) mdebug(" - status: " , paste(sQuote(status ), collapse = " , " ))
255267
256268 status
@@ -335,48 +347,82 @@ loggedOutput.BatchtoolsFuture <- function(future, ...) {
335347# ' @export
336348# ' @keywords internal
337349resolved.BatchtoolsFuture <- function (x , ... ) {
338- # # Has internal future state already been switched to be resolved
339- resolved <- NextMethod()
340- if (resolved ) return (TRUE )
350+ signalEarly <- import_future(" signalEarly" )
351+
352+ # # Is value already collected?
353+ if (! is.null(x $ result )) {
354+ # # Signal conditions early?
355+ signalEarly(x , ... )
356+ return (TRUE )
357+ }
358+
359+ # # Assert that the process that created the future is
360+ # # also the one that evaluates/resolves/queries it.
361+ assertOwner <- import_future(" assertOwner" )
362+ assertOwner(x )
341363
342364 # # If not, checks the batchtools registry status
343365 resolved <- finished(x )
344366 if (is.na(resolved )) return (FALSE )
345-
367+
368+ # # Signal conditions early? (happens only iff requested)
369+ if (resolved ) signalEarly(x , ... )
370+
346371 resolved
347372}
348373
349374# ' @importFrom future result
350375# ' @export
351376# ' @keywords internal
352377result.BatchtoolsFuture <- function (future , cleanup = TRUE , ... ) {
378+
379+ debug <- getOption(" future.debug" , FALSE )
380+ if (debug ) {
381+ mdebug(" result() for BatchtoolsFuture ..." )
382+ on.exit(mdebug(" result() for BatchtoolsFuture ... done" ), add = TRUE )
383+ }
384+
353385 # # Has the value already been collected?
354386 result <- future $ result
355- if (inherits(result , " FutureResult" )) return (result )
387+ if (inherits(result , " FutureResult" )) {
388+ if (debug ) mdebug(" - FutureResult already collected" )
389+ return (result )
390+ }
356391
357392 # # Has the value already been collected? - take two
358393 if (future $ state %in% c(" finished" , " failed" , " interrupted" )) {
394+ if (debug ) mdebug(" - FutureResult already collected - take 2" )
359395 return (NextMethod())
360396 }
361397
362398 if (future $ state == " created" ) {
399+ if (debug ) mdebug(" - starting future ..." )
363400 future <- run(future )
401+ if (debug ) mdebug(" - starting future ... done" )
364402 }
365403
404+ if (debug ) mdebug(" - getting batchtools status" )
366405 stat <- status(future )
367406 if (is_na(stat )) {
368407 label <- future $ label
369408 if (is.null(label )) label <- " <none>"
370409 stopf(" The result no longer exists (or never existed) for Future ('%s') of class %s" , label , paste(sQuote(class(future )), collapse = " , " )) # nolint
371410 }
372411
412+ if (debug ) mdebug(" - waiting for batchtools job to finish ..." )
373413 result <- await(future , cleanup = FALSE )
414+ if (debug ) mdebug(" - waiting for batchtools job to finish ... done" )
374415 stop_if_not(inherits(result , " FutureResult" ))
375416 future $ result <- result
376417 future $ state <- " finished"
377418
378- if (cleanup ) delete(future )
419+ if (cleanup ) {
420+ if (debug ) mdebugf(" - delete %s ..." , class(future )[1 ])
421+ delete(future )
422+ if (debug ) mdebugf(" - delete %s ... done" , class(future )[1 ])
423+ }
379424
425+ if (debug ) mdebug(" - NextMethod()" )
380426 NextMethod()
381427}
382428
@@ -454,7 +500,9 @@ run.BatchtoolsFuture <- function(future, ...) {
454500 # # will have the same state of (loaded, attached) packages.
455501
456502 reg $ packages <- packages
457- saveRegistry(reg = reg )
503+ with_stealth_rng({
504+ saveRegistry(reg = reg )
505+ })
458506
459507 mdebugf(" Attaching %d packages (%s) ... DONE" ,
460508 length(packages ), hpaste(sQuote(packages )))
@@ -538,6 +586,11 @@ run.BatchtoolsFuture <- function(future, ...) {
538586 # # 6. Rerserve worker for future
539587 registerFuture(future )
540588
589+ # # 7. Trigger early signalling
590+ if (inherits(future , " BatchtoolsUniprocessFuture" )) {
591+ resolved(future )
592+ }
593+
541594 invisible (future )
542595} # # run()
543596
@@ -553,6 +606,7 @@ await <- function(future, cleanup = TRUE,
553606 stop_if_not(is.finite(alpha ), alpha > 0 )
554607
555608 debug <- getOption(" future.debug" , FALSE )
609+ if (debug ) mdebug(" future.batchtools:::await() ..." )
556610
557611 expr <- future $ expr
558612 config <- future $ config
@@ -571,10 +625,12 @@ await <- function(future, cleanup = TRUE,
571625
572626 res <- waitForJobs(ids = jobid , timeout = timeout , sleep = sleep_fcn ,
573627 stop.on.error = FALSE , reg = reg )
574- mdebugf(" - batchtools::waitForJobs(): %s" , res )
628+ if ( debug ) mdebugf(" - batchtools::waitForJobs(): %s" , res )
575629 stat <- status(future )
576- mdebugf(" - status(): %s" , paste(sQuote(stat ), collapse = " , " ))
577- mdebug(" batchtools::waitForJobs() ... done" )
630+ if (debug ) {
631+ mdebugf(" - status(): %s" , paste(sQuote(stat ), collapse = " , " ))
632+ mdebug(" batchtools::waitForJobs() ... done" )
633+ }
578634
579635 finished <- is_na(stat ) || any(c(" finished" , " error" , " expired" ) %in% stat )
580636
@@ -587,20 +643,23 @@ await <- function(future, cleanup = TRUE,
587643 label <- future $ label
588644 if (is.null(label )) label <- " <none>"
589645 if (" finished" %in% stat ) {
590- mdebug(" - batchtools::loadResult() ..." )
646+ if ( debug ) mdebug(" - batchtools::loadResult() ..." )
591647 result <- loadResult(reg = reg , id = jobid )
592- mdebug(" - batchtools::loadResult() ... done" )
648+ if (debug ) mdebug(" - batchtools::loadResult() ... done" )
649+
593650 if (inherits(result , " FutureResult" )) {
594651 prototype_fields <- c(prototype_fields , " batchtools_log" )
595- result [[" batchtools_log" ]] <- try({
596- mdebug(" - batchtools::getLog() ..." )
597- on.exit(mdebug(" - batchtools::getLog() ... done" ))
652+ result [[" batchtools_log" ]] <- try(local({
653+ if (debug ) {
654+ mdebug(" - batchtools::getLog() ..." )
655+ on.exit(mdebug(" - batchtools::getLog() ... done" ))
656+ }
598657 # # Since we're already collected the results, the log file
599658 # # should already exist, if it exists. Because of this,
600659 # # only poll for the log file for a second before giving up.
601660 reg $ cluster.functions $ fs.latency <- 1.0
602661 getLog(id = jobid , reg = reg )
603- }, silent = TRUE )
662+ }) , silent = TRUE )
604663 if (result_has_errors(result )) cleanup <- FALSE
605664 }
606665 } else if (" error" %in% stat ) {
@@ -644,6 +703,8 @@ await <- function(future, cleanup = TRUE,
644703 delete(future , delta = 0.5 * delta , ... )
645704 }
646705
706+ if (debug ) mdebug(" future.batchtools:::await() ... done" )
707+
647708 result
648709} # await()
649710
@@ -768,6 +829,10 @@ delete.BatchtoolsFuture <- function(future,
768829 with_stealth_rng({
769830 interval <- delta
770831 for (kk in seq_len(times )) {
832+ try(unlink(path , recursive = TRUE ), silent = FALSE )
833+ if (! file_test(" -d" , path )) break
834+ try(removeRegistry(wait = 0.0 , reg = reg ), silent = FALSE )
835+ if (! file_test(" -d" , path )) break
771836 try(clearRegistry(reg = reg ), silent = TRUE )
772837 try(removeRegistry(wait = 0.0 , reg = reg ), silent = FALSE )
773838 if (! file_test(" -d" , path )) break
0 commit comments