4242# ' @importFrom batchtools submitJobs
4343# ' @keywords internal
4444BatchtoolsFuture <- function (expr = NULL , envir = parent.frame(),
45- substitute = TRUE , globals = TRUE ,
46- label = " batchtools" , cluster.functions = NULL ,
45+ substitute = TRUE ,
46+ globals = TRUE , packages = NULL ,
47+ label = NULL , cluster.functions = NULL ,
4748 resources = list (), workers = NULL ,
4849 finalize = getOption(" future.finalize" , TRUE ),
4950 ... ) {
@@ -77,7 +78,7 @@ BatchtoolsFuture <- function(expr = NULL, envir = parent.frame(),
7778 workers = workers , label = label , ... )
7879
7980 future $ globals <- gp $ globals
80- future $ packages <- gp $ packages
81+ future $ packages <- unique(c( packages , gp $ packages ))
8182
8283 # # Create batchtools registry
8384 reg <- temp_registry(label = future $ label )
@@ -131,6 +132,8 @@ print.BatchtoolsFuture <- function(x, ...) {
131132 printf(" batchtools Registry:\n " )
132133 print(reg )
133134 }
135+
136+ invisible (x )
134137}
135138
136139
@@ -237,8 +240,6 @@ loggedOutput.BatchtoolsFuture <- function(future, ...) {
237240 stop(BatchtoolsFutureError(msg , future = future ))
238241 }
239242
240- if (! " error" %in% stat ) return (NULL )
241-
242243 config <- future $ config
243244 reg <- config $ reg
244245 jobid <- config $ jobid
@@ -305,7 +306,7 @@ value.BatchtoolsFuture <- function(future, signal = TRUE,
305306run <- function (... ) UseMethod(" run" )
306307
307308# ' @importFrom future getExpression
308- # ' @importFrom batchtools batchExport batchMap saveRegistry
309+ # ' @importFrom batchtools batchExport batchMap saveRegistry setJobNames
309310run.BatchtoolsFuture <- function (future , ... ) {
310311 if (future $ state != " created" ) {
311312 label <- future $ label
@@ -372,6 +373,12 @@ run.BatchtoolsFuture <- function(future, ...) {
372373 jobid <- batchMap(fun = geval , list (expr ),
373374 more.args = list (substitute = TRUE ), reg = reg )
374375
376+ # # 1b. Set job name, if specified
377+ label <- future $ label
378+ if (! is.null(label )) {
379+ setJobNames(ids = jobid , names = label , reg = reg )
380+ }
381+
375382 # # 2. Update
376383 future $ config $ jobid <- jobid
377384 mdebug(" Created %s future #%d" , class(future )[1 ], jobid $ job.id )
@@ -420,6 +427,8 @@ await <- function(...) UseMethod("await")
420427# ' success, otherwise not.
421428# ' @param timeout Total time (in seconds) waiting before generating an error.
422429# ' @param delta The number of seconds to wait between each poll.
430+ # ' @param alpha A factor to scale up the waiting time in each iteration such
431+ # ' that the waiting time in the k:th iteration is \code{alpha ^ k * delta}.
423432# ' @param \ldots Not used.
424433# '
425434# ' @return The value of the evaluated expression.
@@ -433,16 +442,19 @@ await <- function(...) UseMethod("await")
433442# '
434443# ' @export
435444# ' @importFrom batchtools getErrorMessages loadResult waitForJobs
445+ # ' @importFrom utils tail
436446# ' @keywords internal
437447await.BatchtoolsFuture <- function (future , cleanup = TRUE ,
438448 timeout = getOption(" future.wait.timeout" ,
439449 30 * 24 * 60 * 60 ),
440450 delta = getOption(" future.wait.interval" ,
441451 1.0 ),
452+ alpha = getOption(" future.wait.alpha" , 1.01 ),
442453 ... ) {
443454 mdebug <- import_future(" mdebug" )
444455 stopifnot(is.finite(timeout ), timeout > = 0 )
445-
456+ stopifnot(is.finite(alpha ), alpha > 0 )
457+
446458 debug <- getOption(" future.debug" , FALSE )
447459
448460 expr <- future $ expr
@@ -456,7 +468,10 @@ await.BatchtoolsFuture <- function(future, cleanup = TRUE,
456468 oopts <- options(batchtools.verbose = debug )
457469 on.exit(options(oopts ))
458470
459- res <- waitForJobs(ids = jobid , timeout = timeout , sleep = delta ,
471+ # # Sleep function - increases geometrically as a function of iterations
472+ sleep_fcn <- function (i ) delta * alpha ^ (i - 1 )
473+
474+ res <- waitForJobs(ids = jobid , timeout = timeout , sleep = sleep_fcn ,
460475 stop.on.error = FALSE , reg = reg )
461476 mdebug(" - batchtools::waitForJobs(): %s" , res )
462477 stat <- status(future )
@@ -480,10 +495,19 @@ await.BatchtoolsFuture <- function(future, cleanup = TRUE,
480495 output = loggedOutput(future )))
481496 } else if (" expired" %in% stat ) {
482497 cleanup <- FALSE
483- msg <- sprintf(" BatchtoolsExpiration: Future ('%s') expired: %s" ,
484- label , reg $ file.dir )
485- stop(BatchtoolsFutureError(msg , future = future ,
486- output = loggedOutput(future )))
498+ msg <- sprintf(" BatchtoolsExpiration: Future ('%s') expired (registry path %s)." , label , reg $ file.dir )
499+ output <- loggedOutput(future )
500+ hint <- unlist(strsplit(output , split = " \n " , fixed = TRUE ))
501+ hint <- hint [nzchar(hint )]
502+ hint <- tail(hint , n = 6L )
503+ if (length(hint ) > 0 ) {
504+ hint <- paste(hint , collapse = " \n " )
505+ msg <- sprintf(" %s. The last few lines of the logged output:\n %s" ,
506+ msg , hint )
507+ } else {
508+ msg <- sprintf(" %s. No logged output exist." , msg )
509+ }
510+ stop(BatchtoolsFutureError(msg , future = future , output = output ))
487511 } else if (is_na(stat )) {
488512 msg <- sprintf(" BatchtoolsDeleted: Cannot retrieve value. Future ('%s') deleted: %s" , label , reg $ file.dir ) # nolint
489513 stop(BatchtoolsFutureError(msg , future = future ))
0 commit comments