Skip to content

Commit ed95396

Browse files
Most types of batchtools future can now be interrupted
1 parent d19b658 commit ed95396

File tree

6 files changed

+120
-39
lines changed

6 files changed

+120
-39
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Package: future.batchtools
2-
Version: 0.12.2-9905
2+
Version: 0.12.2-9908
33
Depends:
44
R (>= 3.2.0),
55
parallelly,

NAMESPACE

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
S3method(add_finalizer,BatchtoolsFuture)
44
S3method(delete,BatchtoolsFuture)
5+
S3method(interruptFuture,BatchtoolsFutureBackend)
56
S3method(launchFuture,BatchtoolsFutureBackend)
67
S3method(loggedError,BatchtoolsFuture)
78
S3method(loggedOutput,BatchtoolsFuture)
@@ -66,6 +67,7 @@ importFrom(batchtools,findTemplateFile)
6667
importFrom(batchtools,getErrorMessages)
6768
importFrom(batchtools,getLog)
6869
importFrom(batchtools,getStatus)
70+
importFrom(batchtools,killJobs)
6971
importFrom(batchtools,loadResult)
7072
importFrom(batchtools,makeClusterFunctions)
7173
importFrom(batchtools,makeClusterFunctionsInteractive)
@@ -85,7 +87,9 @@ importFrom(batchtools,submitJobs)
8587
importFrom(batchtools,waitForJobs)
8688
importFrom(future,FutureBackend)
8789
importFrom(future,FutureError)
90+
importFrom(future,FutureInterruptError)
8891
importFrom(future,cancel)
92+
importFrom(future,interruptFuture)
8993
importFrom(future,launchFuture)
9094
importFrom(future,nbrOfFreeWorkers)
9195
importFrom(future,nbrOfWorkers)
@@ -98,6 +102,7 @@ importFrom(future,tweak)
98102
importFrom(parallelly,availableCores)
99103
importFrom(parallelly,availableWorkers)
100104
importFrom(parallelly,supportsMulticore)
105+
importFrom(tools,pskill)
101106
importFrom(utils,capture.output)
102107
importFrom(utils,file_test)
103108
importFrom(utils,sessionInfo)

NEWS.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@
55
* **future.batchtools** now implements the FutureBackend API
66
introduced in **future** 1.40.0.
77

8+
## New Features
9+
10+
* Most types of batchtools future can now be interrupted, including
11+
`batchtools_multicore` and all job-scheduler backends, e.g.
12+
`batchtools_sge` and `batchtools_slurm`.
13+
814

915
# Version 0.12.2 [2025-06-06]
1016

R/BatchtoolsFuture-class.R

Lines changed: 102 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,55 @@ launchFuture.BatchtoolsFutureBackend <- local({
328328
})
329329

330330

331+
#' @importFrom batchtools killJobs
332+
#' @importFrom future interruptFuture
333+
#' @export
334+
interruptFuture.BatchtoolsFutureBackend <- function(backend, future, ...) {
335+
job.id <- NULL ## To please R CMD check
336+
337+
debug <- isTRUE(getOption("future.debug"))
338+
if (debug) {
339+
mdebugf_push("interruptFuture() for %s ...", class(backend)[1])
340+
mdebugf("Future state before: %s", sQuote(future[["state"]]))
341+
on.exit({
342+
mdebugf("Future state after: %s", sQuote(future[["state"]]))
343+
mdebug_pop()
344+
})
345+
}
346+
347+
config <- future[["config"]]
348+
reg <- config[["reg"]]
349+
350+
## Does the backend support terminating jobs?
351+
cluster.functions <- reg[["cluster.functions"]]
352+
if (is.null(cluster.functions$killJob)) {
353+
if (debug) mdebug("Cannot interrupt, because the registered cluster functions does not define a killJob() function")
354+
return(future)
355+
}
356+
357+
jobid <- config[["jobid"]]$job.id
358+
if (debug) mdebugf("Job ID: %s", jobid)
359+
360+
res <- killJobs(ids = jobid, reg = reg)
361+
if (debug) {
362+
mdebug("killJobs() result:")
363+
mprint(res)
364+
}
365+
366+
if (nrow(res) == 0L) {
367+
future[["state"]] <- "interrupted"
368+
} else {
369+
res <- subset(res, job.id == jobid)
370+
if (nrow(res) == 1L && isTRUE(res$killed)) {
371+
future[["state"]] <- "interrupted"
372+
}
373+
}
374+
375+
invisible(future)
376+
}
377+
378+
379+
331380
#' Prints a batchtools future
332381
#'
333382
#' @param x An BatchtoolsFuture object
@@ -556,17 +605,14 @@ result.BatchtoolsFuture <- function(future, cleanup = TRUE, ...) {
556605

557606
## Has the value already been collected?
558607
result <- future$result
559-
if (inherits(result, "FutureResult")) {
608+
if (!is.null(result)) {
560609
if (debug) mdebug("FutureResult already collected")
610+
if (inherits(result, "FutureError")) {
611+
stop(result)
612+
}
561613
return(result)
562614
}
563615

564-
## Has the value already been collected? - take two
565-
if (future$state %in% c("finished", "failed", "interrupted")) {
566-
if (debug) mdebug("FutureResult already collected - take 2")
567-
return(NextMethod())
568-
}
569-
570616
if (future$state == "created") {
571617
future <- local({
572618
if (debug) {
@@ -585,15 +631,24 @@ result.BatchtoolsFuture <- function(future, cleanup = TRUE, ...) {
585631
}
586632

587633
result <- local({
634+
res <- NULL
588635
if (debug) {
589636
mdebug_push("Waiting for batchtools job to finish ...")
590-
on.exit(mdebug_pop())
637+
on.exit({
638+
mdebugf("Result: <%s>", class(res)[1])
639+
mdebug_pop()
640+
})
591641
}
592-
await(future, cleanup = FALSE)
642+
res <- await(future, cleanup = FALSE)
643+
res
593644
})
594-
stop_if_not(inherits(result, "FutureResult"))
595-
future$result <- result
596-
future$state <- "finished"
645+
stop_if_not(inherits(result, c("FutureResult", "FutureError")))
646+
future[["result"]] <- result
647+
if (inherits(result, "FutureInterruptError")) {
648+
future[["state"]] <- "interrupted"
649+
} else {
650+
future[["state"]] <- "finished"
651+
}
597652

598653
if (cleanup) {
599654
local({
@@ -605,20 +660,22 @@ result.BatchtoolsFuture <- function(future, cleanup = TRUE, ...) {
605660
})
606661
}
607662

608-
if (debug) mdebug("NextMethod()")
609-
NextMethod()
610-
}
663+
if (inherits(result, "FutureError")) {
664+
stop(result)
665+
}
666+
667+
result
668+
} ## result()
611669

612670

671+
#' @importFrom future FutureInterruptError
613672
#' @importFrom batchtools loadResult waitForJobs
614673
#' @importFrom utils tail
615-
await <- function(future, cleanup = TRUE,
616-
timeout = getOption("future.wait.timeout", 30 * 24 * 60 * 60),
617-
delta = getOption("future.wait.interval", 1.0),
618-
alpha = getOption("future.wait.alpha", 1.01),
619-
...) {
620-
stop_if_not(is.finite(timeout), timeout >= 0)
621-
stop_if_not(is.finite(alpha), alpha > 0)
674+
await <- function(future, cleanup = TRUE, ...) {
675+
backend <- future[["backend"]]
676+
timeout <- backend[["future.wait.timeout"]]
677+
delta <- backend[["future.wait.interval"]]
678+
alpha <- backend[["future.wait.alpha"]]
622679

623680
debug <- isTRUE(getOption("future.debug"))
624681
if (debug) {
@@ -704,12 +761,21 @@ await <- function(future, cleanup = TRUE,
704761
msg <- sprintf("%s. No logged output exist.", msg)
705762
}
706763
stop(BatchtoolsFutureError(msg, future = future))
764+
} else if (future[["state"]] %in% c("canceled", "interrupted")) {
765+
label <- sQuoteLabel(future)
766+
msg <- sprintf("Future (%s) of class %s was %s", label, class(future)[1], future[["state"]])
767+
result <- FutureInterruptError(msg, future = future)
707768
} else if (is_na(stat)) {
708769
msg <- sprintf("BatchtoolsDeleted: Cannot retrieve value. Future ('%s') deleted: %s", label, reg$file.dir) #nolint
709770
stop(BatchtoolsFutureError(msg, future = future))
710771
}
711-
if (debug) { mstr(result) }
772+
if (debug) { mstr(result) }
773+
} else if (future[["state"]] %in% c("canceled", "interrupted")) {
774+
label <- sQuoteLabel(future)
775+
msg <- sprintf("Future (%s) of class %s was %s", label, class(future)[1], future[["state"]])
776+
result <- FutureInterruptError(msg, future = future)
712777
} else {
778+
label <- sQuoteLabel(future)
713779
cleanup <- FALSE
714780
msg <- sprintf("AsyncNotReadyError: Polled for results for %s seconds every %g seconds, but asynchronous evaluation for future ('%s') is still running: %s", timeout, delta, label, reg$file.dir) #nolint
715781
stop(BatchtoolsFutureError(msg, future = future))
@@ -737,9 +803,6 @@ delete <- function(...) UseMethod("delete")
737803
#' @param onFailure Action if failing to delete future.
738804
#' @param onMissing Action if future does not exist.
739805
#' @param times The number of tries before giving up.
740-
#' @param delta The delay interval (in seconds) between retries.
741-
#' @param alpha A multiplicative penalty increasing the delay
742-
#' for each failed try.
743806
#' @param \ldots Not used.
744807
#'
745808
#' @return (invisibly) TRUE if deleted and FALSE otherwise.
@@ -753,14 +816,20 @@ delete.BatchtoolsFuture <- function(future,
753816
onFailure = c("error", "warning", "ignore"),
754817
onMissing = c("ignore", "warning", "error"),
755818
times = 10L,
756-
delta = getOption("future.wait.interval", 1.0),
757-
alpha = getOption("future.wait.alpha", 1.01),
758819
...) {
759820
onRunning <- match.arg(onRunning)
760821
onMissing <- match.arg(onMissing)
761822
onFailure <- match.arg(onFailure)
762823

763824
debug <- isTRUE(getOption("future.debug"))
825+
if (debug) {
826+
mdebugf_push("delete() for %s ...", class(future)[1])
827+
on.exit(mdebugf_pop())
828+
}
829+
830+
backend <- future[["backend"]]
831+
delta <- backend[["future.wait.interval"]]
832+
alpha <- backend[["future.wait.alpha"]]
764833

765834
## Identify registry
766835
config <- future$config
@@ -786,7 +855,7 @@ delete.BatchtoolsFuture <- function(future,
786855
}
787856

788857

789-
## Is the future still not resolved? If so, then...
858+
## Is the future still not resolved? If so, then...
790859
if (!resolved(future)) {
791860
if (onRunning == "skip") return(invisible(TRUE))
792861
status <- status(future)
@@ -803,8 +872,10 @@ delete.BatchtoolsFuture <- function(future,
803872

804873
## Make sure to collect the results before deleting
805874
## the internal batchtools registry
806-
result <- result(future, cleanup = FALSE)
807-
stop_if_not(inherits(result, "FutureResult"))
875+
result <- future[["result"]]
876+
if (is.null(result)) {
877+
result <- result(future, cleanup = FALSE)
878+
}
808879

809880
## Free up worker
810881
unregisterFuture(future)

R/batchtools_multicore.R

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#'
2626
#' @importFrom batchtools makeClusterFunctionsMulticore
2727
#' @importFrom parallelly availableCores supportsMulticore
28+
#' @importFrom tools pskill
2829
#' @keywords internal
2930
#' @export
3031
BatchtoolsMulticoreFutureBackend <- function(workers = availableCores(constraints = "multicore"), ...) {
@@ -49,6 +50,11 @@ BatchtoolsMulticoreFutureBackend <- function(workers = availableCores(constraint
4950
on.exit(options(oopts))
5051

5152
cluster.functions <- makeClusterFunctionsMulticore(ncpus = workers)
53+
cluster.functions$killJob <- function(reg, batch.id) {
54+
pid <- as.integer(batch.id)
55+
if (is.na(pid) || pid <= 0) return(FALSE)
56+
pskill(pid)
57+
}
5258

5359
core <- BatchtoolsMultiprocessFutureBackend(
5460
workers = workers,

man/delete.BatchtoolsFuture.Rd

Lines changed: 0 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)