@@ -45,18 +45,19 @@ static void free_wrapper (void **item)
4545}
4646
4747static struct job_stats * queue_stats_lookup (struct job_stats_ctx * statsctx ,
48- struct job * job )
48+ const char * name ,
49+ bool create_if_missing )
4950{
5051 struct job_stats * stats = NULL ;
5152
52- if (!job -> queue )
53+ if (!name )
5354 return NULL ;
5455
55- stats = zhashx_lookup (statsctx -> queue_stats , job -> queue );
56- if (!stats ) {
56+ stats = zhashx_lookup (statsctx -> queue_stats , name );
57+ if (!stats && create_if_missing ) {
5758 if (!(stats = calloc (1 , sizeof (* stats ))))
5859 return NULL ;
59- (void )zhashx_insert (statsctx -> queue_stats , job -> queue , stats );
60+ (void )zhashx_insert (statsctx -> queue_stats , name , stats );
6061 }
6162 return stats ;
6263}
@@ -127,7 +128,7 @@ void job_stats_update (struct job_stats_ctx *statsctx,
127128
128129 stats_update (& statsctx -> all , job , newstate );
129130
130- if ((stats = queue_stats_lookup (statsctx , job )))
131+ if ((stats = queue_stats_lookup (statsctx , job -> queue , true )))
131132 stats_update (stats , job , newstate );
132133
133134 arm_timer (statsctx );
@@ -138,7 +139,7 @@ void job_stats_add_queue (struct job_stats_ctx *statsctx,
138139{
139140 struct job_stats * stats ;
140141
141- if ((stats = queue_stats_lookup (statsctx , job )))
142+ if ((stats = queue_stats_lookup (statsctx , job -> queue , true )))
142143 stats_add (stats , job , job -> state );
143144
144145 arm_timer (statsctx );
@@ -174,9 +175,16 @@ void job_stats_remove_queue (struct job_stats_ctx *statsctx,
174175{
175176 struct job_stats * stats ;
176177
177- if ((stats = queue_stats_lookup (statsctx , job )))
178- stats_remove (stats , job );
178+ if (!(stats = queue_stats_lookup (statsctx , job -> queue , false))) {
179+ if (job -> queue )
180+ flux_log (statsctx -> h ,
181+ LOG_DEBUG ,
182+ "no queue stats for %s" ,
183+ job -> queue );
184+ return ;
185+ }
179186
187+ stats_remove (stats , job );
180188 arm_timer (statsctx );
181189}
182190
@@ -211,9 +219,16 @@ void job_stats_purge (struct job_stats_ctx *statsctx, struct job *job)
211219
212220 stats_purge (& statsctx -> all , job );
213221
214- if ((stats = queue_stats_lookup (statsctx , job )))
215- stats_purge (stats , job );
222+ if (!(stats = queue_stats_lookup (statsctx , job -> queue , false))) {
223+ if (job -> queue )
224+ flux_log (statsctx -> h ,
225+ LOG_DEBUG ,
226+ "no queue stats for %s" ,
227+ job -> queue );
228+ return ;
229+ }
216230
231+ stats_purge (stats , job );
217232 arm_timer (statsctx );
218233}
219234
@@ -408,6 +423,36 @@ static void job_stats_cb (flux_t *h,
408423 flux_log_error (h , "error responding to job-stats request" );
409424}
410425
426+ static int config_parse_queues (struct job_stats_ctx * statsctx ,
427+ const flux_conf_t * conf ,
428+ flux_error_t * errp )
429+ {
430+ json_t * queues ;
431+
432+ if (flux_conf_unpack (conf , NULL , "{s:o}" , "queues" , & queues ) == 0
433+ && json_object_size (queues ) > 0 ) {
434+ const char * name ;
435+ json_t * value ;
436+ json_object_foreach (queues , name , value ) {
437+ /* setup initial queue stats, so that user gets initial
438+ * stats before first job is submitted to the queue */
439+ if (!queue_stats_lookup (statsctx , name , true)) {
440+ flux_log_error (statsctx -> h , "queue_stats_lookup" );
441+ return -1 ;
442+ }
443+ }
444+ }
445+
446+ return 0 ;
447+ }
448+
449+ int job_stats_config_reload (struct job_stats_ctx * statsctx ,
450+ const flux_conf_t * conf ,
451+ flux_error_t * errp )
452+ {
453+ return config_parse_queues (statsctx , conf , errp );
454+ }
455+
411456static const struct flux_msg_handler_spec htab [] = {
412457 { .typemask = FLUX_MSGTYPE_REQUEST ,
413458 .topic_glob = "job-list.job-stats" ,
@@ -420,6 +465,7 @@ static const struct flux_msg_handler_spec htab[] = {
420465struct job_stats_ctx * job_stats_ctx_create (flux_t * h )
421466{
422467 struct job_stats_ctx * statsctx = NULL ;
468+ flux_error_t error ;
423469
424470 if (!(statsctx = calloc (1 , sizeof (* statsctx ))))
425471 return NULL ;
@@ -441,6 +487,13 @@ struct job_stats_ctx *job_stats_ctx_create (flux_t *h)
441487 statsctx )))
442488 goto error ;
443489
490+ if (config_parse_queues (statsctx ,
491+ flux_get_conf (statsctx -> h ),
492+ & error ) < 0 ) {
493+ flux_log (statsctx -> h , LOG_ERR , "%s" , error .text );
494+ goto error ;
495+ }
496+
444497 return statsctx ;
445498
446499error :
0 commit comments