@@ -48,10 +48,10 @@ print.BatchtoolsSlurmFutureBackend <- function(x, ...) {
4848# '
4949# ' 1. Slurm 21.08.4, Rocky Linux 8, NFS global filesystem (August 2025)
5050# ' 2. Slurm 22.05.11, Rocky Linux 8, NFS global filesystem (August 2025)*
51- # ' 3. Slurm 23.02.6, Ubuntu 24.04 LTS, NFS global filesystem (August 2025)*
51+ # ' 3. Slurm 23.02.6, Ubuntu 24.04 LTS, NFS global filesystem (August 2025)
5252# ' 4. Slurm 24.11.3, AlmaLinux 9, Lustre global filesystem (September 2025)*
5353# '
54- # ' (*) Verified with **future.batchtools** 1 .20.0, which used
54+ # ' (*) Verified with **future.batchtools** 0 .20.0, which used
5555# ' [batchtools::makeClusterFunctionsSlurm()], which the new
5656# ' [makeClusterFunctionsSlurm2()] enhances.
5757# '
@@ -154,139 +154,3 @@ slurm_version <- local({
154154 version
155155 }
156156})
157-
158-
159- # Patch Slurm cluster functions listJobsQueued() and listJobsRunning()
160- # to use `sacct` instead of `squeue`
161- # ' @importFrom batchtools assertRegistry runOSCommand
162- # ' @importFrom utils tail
163- patchClusterFunctionsSlurm <- function (cf ) {
164- OSError <- import_from(" OSError" , package = " batchtools" )
165- stopifnot(inherits(cf , " ClusterFunctions" ))
166-
167- env <- environment(cf [[" listJobsQueued" ]])
168- array.jobs <- env [[" array.jobs" ]]
169- getClusters <- env [[" getClusters" ]]
170- nodename <- env [[" nodename" ]]
171-
172- listJobs <- function (reg , args ) {
173- assertRegistry(reg , writeable = FALSE )
174- args <- c(args , " --user=$USER" , " --noheader" , " --parsable2" , " --allocations" , " --format=JobID" )
175- clusters <- getClusters(reg )
176- if (length(clusters ) > 0 ) {
177- args <- c(args , sprintf(" --clusters=%s" , clusters ))
178- }
179- res <- runOSCommand(" sacct" , args , nodename = nodename )
180- if (res $ exit.code > 0L ) {
181- OSError(" Listing of jobs failed" , res )
182- }
183- if (length(clusters ) > 0 ) {
184- res <- tail(res $ output , - 1L )
185- } else {
186- res <- res $ output
187- }
188- res
189- } # # listJobs()
190-
191- cf $ listJobsQueued <- function (reg ) {
192- # # List PENDING (PD) and REQUEUED (RQ) jobs
193- listJobs(reg , " --state=PD,RQ" )
194- }
195-
196- cf $ listJobsRunning <- function (reg ) {
197- # # List RUNNING (R), SUSPENDED (S), RESIZING (RS) jobs
198- listJobs(reg , " --state=R,S,RS" )
199- }
200-
201- cf
202- } # # patchClusterFunctionsSlurm()
203-
204-
205-
206- # Patch Slurm cluster functions listJobsQueued() and listJobsRunning()
207- # to use `sacct` instead of `squeue`
208- # ' @importFrom batchtools assertRegistry runOSCommand
209- # ' @importFrom utils tail
210- patchClusterFunctionsSlurm2 <- function (cf ) {
211- OSError <- import_from(" OSError" , package = " batchtools" )
212- stopifnot(inherits(cf , " ClusterFunctions" ))
213-
214- env <- environment(cf [[" listJobsQueued" ]])
215- array.jobs <- env [[" array.jobs" ]]
216- getClusters <- env [[" getClusters" ]]
217- nodename <- env [[" nodename" ]]
218- org_listJobsQueued <- env [[" listJobsQueued" ]]
219-
220- # # Allow for a 15-minute offset in time between host and Slurm's sacct server
221- isJobQueued <- function (reg , batch_id , since = NULL , offset = 15 * 60 ) {
222- stopifnot(length(batch_id ) == 1L , ! is.na(batch_id ), nzchar(batch_id ))
223- stopifnot(is.null(since ) || inherits(since , " POSIXct" ))
224-
225- args <- c(" --user=$USER" , " --noheader" , " --parsable2" , " --allocations" , " --format=State" , sprintf(" --jobs=%s" , batch_id ), sprintf(" --starttime=%s" , format(since - offset , format = " %FT%T" )))
226- clusters <- getClusters(reg )
227- if (length(clusters ) > 0 ) {
228- args <- c(args , sprintf(" --clusters=%s" , clusters ))
229- }
230- res <- runOSCommand(" sacct" , args , nodename = nodename )
231- if (res $ exit.code > 0L ) {
232- OSError(" Failed to check if job is pending" , res )
233- }
234- if (length(clusters ) > 0 ) {
235- res <- tail(res $ output , - 1L )
236- } else {
237- res <- res $ output
238- }
239-
240- if (length(res ) == 0 ) return (FALSE )
241-
242- res %in% c(" PENDING" , " REQUEUED" )
243- } # # isJobQueued()
244-
245- cf $ listJobsQueued <- function (reg ) {
246- batch_id <- getOption(" future.batchtools.batch_id" , NULL )
247-
248- # # Queued jobs according to 'squeue'
249- jobs <- org_listJobsQueued(reg )
250- if (is.null(batch_id )) return (jobs )
251-
252- # # Is the job queued?
253- if (length(jobs ) > 0 ) {
254- jobs <- intersect(jobs , as.character(batch_id ))
255- if (length(jobs ) > 0 ) return (jobs )
256- }
257-
258- # # Ask 'sacct' it if is PENDING or REQUEUED
259- submitted_on <- getOption(" future.batchtools.submitted_on" , NULL )
260- if (isJobQueued(reg , batch_id , since = submitted_on )) {
261- jobs <- as.character(batch_id )
262- }
263-
264- jobs
265- }
266-
267- cf
268- } # # patchClusterFunctionsSlurm2()
269-
270-
271-
272-
273- # ' ClusterFunctions for Slurm Systems (robustified)
274- # '
275- # ' This functions enhances [batchtools::makeClusterFunctionsSlurm()] by
276- # ' patching the `listJobsQueued()` cluster function such that it falls
277- # ' back to querying Slurm's account database (`sacct`), if the future was
278- # ' _not_ found in the Slurm job queue (`squeue`), which might be the case
279- # ' when Slurm provisions a job that was just submitted to the scheduler.
280- # '
281- # ' @inheritParams batchtools::makeClusterFunctionsSlurm
282- # '
283- # ' @return
284- # ' A [batchtools::ClusterFunctions] object.
285- # '
286- # ' @importFrom batchtools makeClusterFunctionsSlurm
287- # ' @export
288- makeClusterFunctionsSlurm2 <- function (template = " slurm" , array.jobs = TRUE , nodename = " localhost" , scheduler.latency = 1 , fs.latency = 65 ) {
289- cf <- makeClusterFunctionsSlurm(template = template , array.jobs = array.jobs , nodename = nodename , scheduler.latency = scheduler.latency , fs.latency = fs.latency )
290- cf <- patchClusterFunctionsSlurm2(cf )
291- cf
292- }
0 commit comments