@@ -55,7 +55,8 @@ const LSF_JOB_NAME_MAX_LENGTH: usize = 4094;
5555
5656#[ derive( Debug ) ]
5757struct LsfApptainerTaskRequest {
58- config : Arc < Config > ,
58+ engine_config : Arc < Config > ,
59+ backend_config : Arc < LsfApptainerBackendConfig > ,
5960 name : String ,
6061 spawn_request : TaskSpawnRequest ,
6162 /// The requested container for the task.
@@ -238,9 +239,16 @@ impl TaskManagerRequest for LsfApptainerTaskRequest {
238239 let lsf_stdout_path = attempt_dir. join ( "lsf.stdout" ) ;
239240 let lsf_stderr_path = attempt_dir. join ( "lsf.stderr" ) ;
240241
241- // TODO ACF 2025-09-11: configurable LSF queue, including handling for the
242- // `short_task` hint
243- let mut bsub_child = Command :: new ( "bsub" )
242+ let mut bsub_command = Command :: new ( "bsub" ) ;
243+
244+ // If an LSF queue has been configured, specify it. Otherwise, the job will end
245+ // up on the cluster's default queue.
246+ if let Some ( queue) = & self . backend_config . queue {
247+ bsub_command. arg ( "-q" ) ;
248+ bsub_command. arg ( queue) ;
249+ }
250+
251+ bsub_command
244252 // Pipe stdout and stderr so we can trace them. This should just be the LSF output like
245253 // `<<Waiting for dispatch ...>>`.
246254 //
@@ -279,8 +287,9 @@ impl TaskManagerRequest for LsfApptainerTaskRequest {
279287 "rusage[mem={memory_kb}KB/job]" ,
280288 memory_kb = self . memory / 1024
281289 ) )
282- . arg ( apptainer_command_path)
283- . spawn ( ) ?;
290+ . arg ( apptainer_command_path) ;
291+
292+ let mut bsub_child = bsub_command. spawn ( ) ?;
284293
285294 // Take the stdio pipes from the child process and consume them for tracing
286295 // purposes.
@@ -345,14 +354,16 @@ impl TaskManagerRequest for LsfApptainerTaskRequest {
345354
346355#[ derive( Debug ) ]
347356pub struct LsfApptainerBackend {
348- config : Arc < Config > ,
357+ engine_config : Arc < Config > ,
358+ backend_config : Arc < LsfApptainerBackendConfig > ,
349359 manager : TaskManager < LsfApptainerTaskRequest > ,
350360}
351361
352362impl LsfApptainerBackend {
353- pub fn new ( config : Arc < Config > ) -> Self {
363+ pub fn new ( engine_config : Arc < Config > , backend_config : Arc < LsfApptainerBackendConfig > ) -> Self {
354364 Self {
355- config,
365+ engine_config,
366+ backend_config,
356367 // TODO ACF 2025-09-11: the `MAX` values here mean that in addition to not limiting the
357368 // overall number of CPU and memory used, we don't limit per-task consumption. There is
358369 // potentially a path to pulling queue limits from LSF for these, but for now we just
@@ -375,7 +386,8 @@ impl TaskExecutionBackend for LsfApptainerBackend {
375386 ) -> anyhow:: Result < super :: TaskExecutionConstraints > {
376387 Ok ( super :: TaskExecutionConstraints {
377388 container : Some (
378- v1:: container ( requirements, self . config . task . container . as_deref ( ) ) . into_owned ( ) ,
389+ v1:: container ( requirements, self . engine_config . task . container . as_deref ( ) )
390+ . into_owned ( ) ,
379391 ) ,
380392 // TODO ACF 2025-09-11: populate more meaningful values for these based on the given LSF
381393 // queue. Unfortunately, it's not straightforward to ask "what's the most CPUs I can ask
@@ -409,7 +421,7 @@ impl TaskExecutionBackend for LsfApptainerBackend {
409421 let hints = request. hints ( ) ;
410422
411423 let container =
412- v1:: container ( requirements, self . config . task . container . as_deref ( ) ) . into_owned ( ) ;
424+ v1:: container ( requirements, self . engine_config . task . container . as_deref ( ) ) . into_owned ( ) ;
413425 let cpu = v1:: cpu ( requirements) ;
414426 let memory = v1:: memory ( requirements) ? as u64 ;
415427 // TODO ACF 2025-09-11: I don't _think_ LSF offers a hard/soft CPU limit
@@ -423,9 +435,9 @@ impl TaskExecutionBackend for LsfApptainerBackend {
423435 let name = request. id ( ) [ 0 ..LSF_JOB_NAME_MAX_LENGTH ] . to_string ( ) ;
424436 self . manager . send (
425437 LsfApptainerTaskRequest {
426- config : self . config . clone ( ) ,
438+ engine_config : self . engine_config . clone ( ) ,
439+ backend_config : self . backend_config . clone ( ) ,
427440 spawn_request : request,
428- // backend: self.inner.clone(),
429441 name,
430442 container,
431443 cpu,
@@ -455,3 +467,18 @@ impl TaskExecutionBackend for LsfApptainerBackend {
455467 None
456468 }
457469}
470+
471+ #[ derive( Debug , Clone , serde:: Deserialize , serde:: Serialize ) ]
472+ pub struct LsfApptainerBackendConfig {
473+ // TODO ACF 2025-09-12: add queue option for short tasks
474+ queue : Option < String > ,
475+ }
476+
477+ impl LsfApptainerBackendConfig {
478+ pub fn validate ( & self ) -> Result < ( ) , anyhow:: Error > {
479+ // TODO ACF 2025-09-12: what meaningful work to be done here? Maybe ensure the
480+ // queue exists, interrogate the queue for limits and match them up
481+ // against prospective future config options here?
482+ Ok ( ( ) )
483+ }
484+ }
0 commit comments