Skip to content

Commit aa44243

Browse files
batchtools_slurm(): Use 'sacct --jobs=batch_id' to find queued jobs that not yet show up with 'squeue'
1 parent 0e4e6da commit aa44243

File tree

5 files changed

+129
-2
lines changed

5 files changed

+129
-2
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Package: future.batchtools
2-
Version: 0.20.0-9004
2+
Version: 0.20.0-9005
33
Depends:
44
R (>= 3.2.0),
55
parallelly,

NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ export(loggedError)
6060
export(loggedOutput)
6161
export(makeClusterFunctionsBash)
6262
importFrom(batchtools,Worker)
63+
importFrom(batchtools,assertRegistry)
6364
importFrom(batchtools,batchExport)
6465
importFrom(batchtools,batchMap)
6566
importFrom(batchtools,cfBrewTemplate)
@@ -84,6 +85,7 @@ importFrom(batchtools,makeClusterFunctionsTORQUE)
8485
importFrom(batchtools,makeRegistry)
8586
importFrom(batchtools,makeSubmitJobResult)
8687
importFrom(batchtools,removeRegistry)
88+
importFrom(batchtools,runOSCommand)
8789
importFrom(batchtools,saveRegistry)
8890
importFrom(batchtools,setJobNames)
8991
importFrom(batchtools,submitJobs)

R/BatchtoolsFutureBackend-class.R

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,14 @@ status <- function(future, ...) {
535535

536536
jobid <- config$jobid
537537
if (is.na(jobid)) return("not submitted")
538+
539+
## Optionally filter by the scheduler's job ID, if it exists
540+
batch_id <- reg[["status"]][["batch.id"]]
541+
## Pass this to cluster functions listJobsQueued() and listJobsRunning()
542+
## via an R option, because we cannot pass as an argument.
543+
options(future.batchtools.batch_id = batch_id)
544+
on.exit(options(future.batchtools.batch_id = NULL), add = TRUE)
545+
538546
status <- get_status(reg = reg, ids = jobid)
539547
status <- (unlist(status) == 1L)
540548
status <- status[status]

R/BatchtoolsTemplateFutureBackend-class.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ BatchtoolsTemplateFutureBackend <- function(type, scheduler.latency = 1.0, fs.la
7171
lsf = makeClusterFunctionsLSF,
7272
openlava = makeClusterFunctionsOpenLava,
7373
sge = makeClusterFunctionsSGE,
74-
slurm = makeClusterFunctionsSlurm,
74+
slurm = makeClusterFunctionsSlurm2,
7575
torque = makeClusterFunctionsTORQUE,
7676
makeClusterFunctions
7777
)

R/batchtools_slurm.R

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,120 @@ slurm_version <- local({
151151
version
152152
}
153153
})
154+
155+
156+
# Patch Slurm cluster functions listJobsQueued() and listJobsRunning()
157+
# to use `sacct` instead of `squeue`
158+
#' @importFrom batchtools assertRegistry runOSCommand
159+
#' @importFrom utils tail
160+
patchClusterFunctionsSlurm <- function(cf) {
161+
OSError <- import_from("OSError", package = "batchtools")
162+
stopifnot(inherits(cf, "ClusterFunctions"))
163+
164+
env <- environment(cf[["listJobsQueued"]])
165+
array.jobs <- env[["array.jobs"]]
166+
getClusters <- env[["getClusters"]]
167+
nodename <- env[["nodename"]]
168+
169+
listJobs <- function(reg, args) {
170+
assertRegistry(reg, writeable = FALSE)
171+
args <- c(args, "--user=$USER", "--noheader", "--parsable2", "--allocations", "--format=JobID")
172+
clusters <- getClusters(reg)
173+
if (length(clusters) > 0) {
174+
args <- c(args, sprintf("--clusters=%s", clusters))
175+
}
176+
res <- runOSCommand("sacct", args, nodename = nodename)
177+
if (res$exit.code > 0L) {
178+
OSError("Listing of jobs failed", res)
179+
}
180+
if (length(clusters) > 0) {
181+
res <- tail(res$output, -1L)
182+
} else {
183+
res <- res$output
184+
}
185+
res
186+
} ## listJobs()
187+
188+
cf$listJobsQueued <- function(reg) {
189+
## List PENDING (PD) and REQUEUED (RQ) jobs
190+
listJobs(reg, "--state=PD,RQ")
191+
}
192+
193+
cf$listJobsRunning <- function(reg) {
194+
## List RUNNING (R), SUSPENDED (S), RESIZING (RS) jobs
195+
listJobs(reg, "--state=R,S,RS")
196+
}
197+
198+
cf
199+
} ## patchClusterFunctionsSlurm()
200+
201+
202+
203+
# Patch Slurm cluster functions listJobsQueued() and listJobsRunning()
204+
# to use `sacct` instead of `squeue`
205+
#' @importFrom batchtools assertRegistry runOSCommand
206+
#' @importFrom utils tail
207+
patchClusterFunctionsSlurm2 <- function(cf) {
208+
OSError <- import_from("OSError", package = "batchtools")
209+
stopifnot(inherits(cf, "ClusterFunctions"))
210+
211+
env <- environment(cf[["listJobsQueued"]])
212+
array.jobs <- env[["array.jobs"]]
213+
getClusters <- env[["getClusters"]]
214+
nodename <- env[["nodename"]]
215+
org_listJobsQueued <- env[["listJobsQueued"]]
216+
217+
isJobQueued <- function(reg, batch_id) {
218+
stopifnot(length(batch_id) == 1L, !is.na(batch_id), nzchar(batch_id))
219+
220+
## FIXME: Add also --starttime=<start time>, because 'sacct' only returns jobs ran today
221+
args <- c("--user=$USER", "--noheader", "--parsable2", "--allocations", "--format=State", sprintf("--jobs=%s", batch_id))
222+
clusters <- getClusters(reg)
223+
if (length(clusters) > 0) {
224+
args <- c(args, sprintf("--clusters=%s", clusters))
225+
}
226+
res <- runOSCommand("sacct", args, nodename = nodename)
227+
if (res$exit.code > 0L) {
228+
OSError("Failed to check if job is pending", res)
229+
}
230+
if (length(clusters) > 0) {
231+
res <- tail(res$output, -1L)
232+
} else {
233+
res <- res$output
234+
}
235+
236+
if (length(res) == 0) return(FALSE)
237+
238+
res %in% c("PENDING", "REQUEUED")
239+
} ## isJobQueued()
240+
241+
cf$listJobsQueued <- function(reg) {
242+
batch_id <- getOption("future.batchtools.batch_id", NULL)
243+
244+
## Queued jobs according to 'squeue'
245+
jobs <- org_listJobsQueued(reg)
246+
if (is.null(batch_id)) return(jobs)
247+
248+
## Is the job queued?
249+
if (length(jobs) > 0) {
250+
jobs <- intersect(jobs, as.character(batch_id))
251+
if (length(jobs) > 0) return(jobs)
252+
}
253+
254+
## Ask 'sacct' it if is PENDING or REQUEUED
255+
if (isJobQueued(reg, batch_id)) jobs <- as.character(batch_id)
256+
257+
jobs
258+
}
259+
260+
cf
261+
} ## patchClusterFunctionsSlurm2()
262+
263+
264+
#' @importFrom batchtools makeClusterFunctionsSlurm
265+
266+
makeClusterFunctionsSlurm2 <- function(template = "slurm", array.jobs = TRUE, nodename = "localhost", scheduler.latency = 1, fs.latency = 65, ...) {
267+
cf <- makeClusterFunctionsSlurm(template = template, array.jobs = array.jobs, nodename = nodename, scheduler.latency = scheduler.latency, fs.latency =fs.latency, ...)
268+
cf <- patchClusterFunctionsSlurm2(cf)
269+
cf
270+
}

0 commit comments

Comments
 (0)