1616import sys
1717
1818import flux
19- from flux .util import UtilConfig , parse_fsd
19+ from flux .util import AltField , FilterActionSetUpdate , UtilConfig , parse_fsd
2020
2121
2222def print_enable_status (name , status ):
@@ -110,8 +110,14 @@ class FluxQueueConfig(UtilConfig):
110110 "default" : {
111111 "description" : "Default flux-queue list format string" ,
112112 "format" : (
113- "?:{queuem:<8.8} {defaults.timelimit!F:>11} {limits.timelimit!F:>10} {limits.range.nnodes:>10} "
114- "{limits.range.ncores:>10} {limits.range.ngpus:>10}"
113+ "?:{queuem:<8.8} "
114+ "{color_enabled}{enabled:>2}{color_off} "
115+ "{color_started}{started:>2}{color_off} "
116+ "{defaults.timelimit!F:>8} "
117+ "{limits.timelimit!F:>8} "
118+ "{limits.range.nnodes:>10} "
119+ "{limits.range.ncores:>10} "
120+ "{limits.range.ngpus:>10}"
115121 ),
116122 },
117123 }
@@ -234,11 +240,15 @@ def timelimit(self):
234240
235241
236242class QueueInfo :
237- def __init__ (self , name , config ):
243+ def __init__ (self , name , config , enabled , started ):
238244 self .name = name
239245 self .config = config
240246 self .limits = QueueLimitsInfo (name , config )
241247 self .defaults = QueueDefaultsInfo (name , config )
248+ self .scheduling = "started" if started else "stopped"
249+ self .submission = "enabled" if enabled else "disabled"
250+ self ._enabled = enabled
251+ self ._started = started
242252
243253 def __getattr__ (self , attr ):
244254 try :
@@ -259,13 +269,53 @@ def queuem(self):
259269 q = self .queue + ("*" if defaultq and self .queue == defaultq else "" )
260270 return q
261271
272+ @property
273+ def color_enabled (self ):
274+ return "\033 [01;32m" if self ._enabled else "\033 [01;31m"
275+
276+ @property
277+ def color_off (self ):
278+ return "\033 [0;0m"
279+
280+ @property
281+ def enabled (self ):
282+ return AltField ("✔" , "y" ) if self ._enabled else AltField ("✗" , "n" )
283+
284+ @property
285+ def color_started (self ):
286+ return "\033 [01;32m" if self ._started else "\033 [01;31m"
287+
288+ @property
289+ def started (self ):
290+ return AltField ("✔" , "y" ) if self ._started else AltField ("✗" , "n" )
291+
292+
293+ def fetch_all_queue_status (handle , queues = None ):
294+ if handle is None :
295+ # Return fake payload if handle is not open (e.g. during testing)
296+ return {"enable" : True , "start" : True }
297+ topic = "job-manager.queue-status"
298+ if queues is None :
299+ return handle .rpc (topic , {}).get ()
300+ rpcs = {x : handle .rpc (topic , {"name" : x }) for x in queues }
301+ return {x : rpcs [x ].get () for x in rpcs }
302+
262303
263304def list (args ):
264305 headings = {
265306 "queue" : "QUEUE" ,
266307 "queuem" : "QUEUE" ,
267- "defaults.timelimit" : "DEFAULTTIME" ,
268- "limits.timelimit" : "TIMELIMIT" ,
308+ "submission" : "SUBMIT" ,
309+ "scheduling" : "SCHED" ,
310+ "enabled" : "EN" ,
311+ "started" : "ST" ,
312+ "enabled.ascii" : "EN" ,
313+ "started.ascii" : "ST" ,
314+ "color_enabled" : "" ,
315+ "color_started" : "" ,
316+ "color_off" : "" ,
317+ "defaults.timelimit" : "TDEFAULT" ,
318+ "limits.timelimit" : "TLIMIT" ,
269319 "limits.range.nnodes" : "NNODES" ,
270320 "limits.range.ncores" : "NCORES" ,
271321 "limits.range.ngpus" : "NGPUS" ,
@@ -277,6 +327,7 @@ def list(args):
277327 "limits.max.ngpus" : "MAXGPUS" ,
278328 }
279329 config = None
330+ handle = None
280331
281332 if args .from_stdin :
282333 config = json .loads (sys .stdin .read ())
@@ -291,13 +342,29 @@ def list(args):
291342 fmt = FluxQueueConfig ("list" ).load ().get_format_string (args .format )
292343 formatter = flux .util .OutputFormat (fmt , headings = headings )
293344
345+ # Build queue_config from args.queue, or config["queue"] if --queue
346+ # was unused:
347+ queue_config = {}
348+ if args .queue :
349+ for queue in args .queue :
350+ try :
351+ queue_config [queue ] = config ["queues" ][queue ]
352+ except KeyError :
353+ raise ValueError (f"No such queue: { queue } " )
354+ elif config and "queues" in config :
355+ queue_config = config ["queues" ]
356+
294357 queues = []
295358 if config and "queues" in config :
296- for key , value in config ["queues" ].items ():
297- queues .append (QueueInfo (key , config ))
359+ status = fetch_all_queue_status (handle , queue_config .keys ())
360+ for key , value in queue_config .items ():
361+ queues .append (
362+ QueueInfo (key , config , status [key ]["enable" ], status [key ]["start" ])
363+ )
298364 else :
299365 # single anonymous queue
300- queues .append (QueueInfo (None , config ))
366+ status = fetch_all_queue_status (handle )
367+ queues .append (QueueInfo (None , config , status ["enable" ], status ["start" ]))
301368
302369 formatter .print_items (queues , no_header = args .no_header )
303370
@@ -425,6 +492,10 @@ def __call__(self, parser, namespace, values, option_string=None):
425492
426493@flux .util .CLIMain (LOGGER )
427494def main ():
495+ sys .stdout = open (
496+ sys .stdout .fileno (), "w" , encoding = "utf8" , errors = "surrogateescape"
497+ )
498+
428499 parser = argparse .ArgumentParser (prog = "flux-queue" )
429500 subparsers = parser .add_subparsers (
430501 title = "subcommands" , description = "" , dest = "subcommand"
@@ -452,6 +523,14 @@ def main():
452523 list_parser = subparsers .add_parser (
453524 "list" , formatter_class = flux .util .help_formatter ()
454525 )
526+ list_parser .add_argument (
527+ "-q" ,
528+ "--queue" ,
529+ action = FilterActionSetUpdate ,
530+ default = set (),
531+ metavar = "QUEUE,..." ,
532+ help = "Include only specified queues in output" ,
533+ )
455534 list_parser .add_argument (
456535 "-o" ,
457536 "--format" ,
0 commit comments