@@ -44,9 +44,15 @@ using namespace Flux::opts_manager;
4444struct match_perf_t {
4545 double load; /* Graph load time */
4646 uint64_t njobs; /* Total match count */
47+ uint64_t njobs_reset; /* Jobs since reset match count */
48+ /* Graph uptime in seconds */
49+ std::chrono::time_point<std::chrono::system_clock> graph_uptime;
50+ /* Time since stats were last cleared */
51+ std::chrono::time_point<std::chrono::system_clock> time_since_reset;
4752 double min; /* Min match time */
4853 double max; /* Max match time */
49- double accum; /* Total match time accumulated */
54+ double mean; /* Mean match time */
55+ double M2; /* Welford's algorithm */
5056};
5157
5258class msg_wrap_t {
@@ -234,6 +240,9 @@ static void info_request_cb (flux_t *h, flux_msg_handler_t *w,
234240static void stat_request_cb (flux_t *h, flux_msg_handler_t *w,
235241 const flux_msg_t *msg, void *arg);
236242
243+ static void stat_clear_cb (flux_t *h, flux_msg_handler_t *w,
244+ const flux_msg_t *msg, void *arg);
245+
237246static void next_jobid_request_cb (flux_t *h, flux_msg_handler_t *w,
238247 const flux_msg_t *msg, void *arg);
239248
@@ -279,7 +288,9 @@ static const struct flux_msg_handler_spec htab[] = {
279288 { FLUX_MSGTYPE_REQUEST,
280289 " sched-fluxion-resource.info" , info_request_cb, 0 },
281290 { FLUX_MSGTYPE_REQUEST,
282- " sched-fluxion-resource.stat" , stat_request_cb, 0 },
291+ " sched-fluxion-resource.stats-get" , stat_request_cb, FLUX_ROLE_USER },
292+ { FLUX_MSGTYPE_REQUEST,
293+ " sched-fluxion-resource.stats-clear" , stat_clear_cb, FLUX_ROLE_USER },
283294 { FLUX_MSGTYPE_REQUEST,
284295 " sched-fluxion-resource.next_jobid" , next_jobid_request_cb, 0 },
285296 { FLUX_MSGTYPE_REQUEST,
@@ -344,9 +355,13 @@ static std::shared_ptr<resource_ctx_t> getctx (flux_t *h)
344355 set_default_args (ctx);
345356 ctx->perf .load = 0 .0f ;
346357 ctx->perf .njobs = 0 ;
347- ctx->perf .min = std::numeric_limits<double >::max ();
358+ ctx->perf .njobs_reset = 0 ;
359+ ctx->perf .min = std::numeric_limits<double >::max ();
348360 ctx->perf .max = 0 .0f ;
349- ctx->perf .accum = 0 .0f ;
361+ ctx->perf .mean = 0 .0f ;
362+ ctx->perf .M2 = 0 .0f ;
363+ ctx->perf .graph_uptime = std::chrono::system_clock::now ();
364+ ctx->perf .time_since_reset = std::chrono::system_clock::now ();
350365 ctx->matcher = nullptr ; /* Cannot be allocated at this point */
351366 ctx->fgraph = nullptr ; /* Cannot be allocated at this point */
352367 ctx->writers = nullptr ; /* Cannot be allocated at this point */
@@ -356,7 +371,6 @@ static std::shared_ptr<resource_ctx_t> getctx (flux_t *h)
356371 ctx->m_r_alloc = nullptr ;
357372 ctx->m_resources_updated = true ;
358373 ctx->m_resources_down_updated = true ;
359- // gettimeofday (&(ctx->m_resources_alloc_updated), NULL);
360374 ctx->m_resources_alloc_updated = std::chrono::system_clock::now ();
361375 }
362376
@@ -1351,6 +1365,7 @@ static int populate_resource_db (std::shared_ptr<resource_ctx_t> &ctx)
13511365
13521366 elapsed = std::chrono::system_clock::now () - start;
13531367 ctx->perf .load = elapsed.count ();
1368+ ctx->perf .graph_uptime = std::chrono::system_clock::now ();
13541369 rc = 0 ;
13551370
13561371done:
@@ -1505,10 +1520,17 @@ static int init_resource_graph (std::shared_ptr<resource_ctx_t> &ctx)
15051520static void update_match_perf (std::shared_ptr<resource_ctx_t > &ctx,
15061521 double elapse)
15071522{
1523+ double delta = 0 .0f ;
1524+ double delta2 = 0 .2f ;
15081525 ctx->perf .njobs ++;
1526+ ctx->perf .njobs_reset ++;
15091527 ctx->perf .min = (ctx->perf .min > elapse)? elapse : ctx->perf .min ;
15101528 ctx->perf .max = (ctx->perf .max < elapse)? elapse : ctx->perf .max ;
1511- ctx->perf .accum += elapse;
1529+ // Welford's online algorithm for variance
1530+ delta = elapse - ctx->perf .mean ;
1531+ ctx->perf .mean += delta / (double )ctx->perf .njobs ;
1532+ delta2 = elapse - ctx->perf .mean ;
1533+ ctx->perf .M2 += delta * delta2;
15121534}
15131535
15141536static inline std::string get_status_string (int64_t now, int64_t at)
@@ -2141,12 +2163,18 @@ static void stat_request_cb (flux_t *h, flux_msg_handler_t *w,
21412163 std::shared_ptr<resource_ctx_t > ctx = getctx ((flux_t *)arg);
21422164 int saved_errno;
21432165 json_t *o = nullptr ;
2144- double avg = 0 .0f ;
2166+ double mean = 0 .0f ;
21452167 double min = 0 .0f ;
2168+ double variance = 0 .0f ;
2169+ int64_t graph_uptime_s = 0 ;
2170+ int64_t time_since_reset_s = 0 ;
2171+ std::chrono::time_point<std::chrono::system_clock> now;
21462172
2147- if (ctx->perf .njobs ) {
2148- avg = ctx->perf .accum / ( double )ctx-> perf . njobs ;
2173+ if (ctx->perf .njobs_reset > 1 ) {
2174+ mean = ctx->perf .mean ;
21492175 min = ctx->perf .min ;
2176+ // Welford's online algorithm
2177+ variance = ctx->perf .M2 / (double )ctx->perf .njobs_reset ;
21502178 }
21512179 if ( !(o = json_object ())) {
21522180 errno = ENOMEM;
@@ -2156,15 +2184,24 @@ static void stat_request_cb (flux_t *h, flux_msg_handler_t *w,
21562184 flux_log_error (h, " %s: get_stat_by_rank" , __FUNCTION__);
21572185 goto error_free;
21582186 }
2159- if (flux_respond_pack (h, msg, " {s:I s:I s:o s:f s:I s:f s:f s:f}" ,
2187+ now = std::chrono::system_clock::now ();
2188+ graph_uptime_s = std::chrono::duration_cast<std::chrono::seconds> (
2189+ now - ctx->perf .graph_uptime ).count ();
2190+ time_since_reset_s = std::chrono::duration_cast<std::chrono::seconds> (
2191+ now - ctx->perf .time_since_reset ).count ();
2192+ if (flux_respond_pack (h, msg, " {s:I s:I s:o s:f s:I s:I s:I s:I s:f s:f s:f s:f}" ,
21602193 " V" , num_vertices (ctx->db ->resource_graph ),
21612194 " E" , num_edges (ctx->db ->resource_graph ),
21622195 " by_rank" , o,
21632196 " load-time" , ctx->perf .load ,
2197+ " graph-uptime" , graph_uptime_s,
2198+ " time-since-reset" , time_since_reset_s,
21642199 " njobs" , ctx->perf .njobs ,
2200+ " njobs-reset" , ctx->perf .njobs_reset ,
21652201 " min-match" , min,
21662202 " max-match" , ctx->perf .max ,
2167- " avg-match" , avg) < 0 ) {
2203+ " avg-match" , mean,
2204+ " match-variance" , variance) < 0 ) {
21682205 flux_log_error (h, " %s: flux_respond_pack" , __FUNCTION__);
21692206 }
21702207
@@ -2179,6 +2216,23 @@ static void stat_request_cb (flux_t *h, flux_msg_handler_t *w,
21792216 flux_log_error (h, " %s: flux_respond_error" , __FUNCTION__);
21802217}
21812218
2219+ static void stat_clear_cb (flux_t *h, flux_msg_handler_t *w,
2220+ const flux_msg_t *msg, void *arg)
2221+ {
2222+ std::shared_ptr<resource_ctx_t > ctx = getctx ((flux_t *)arg);
2223+
2224+ // Clear the jobs-related stats and reset time
2225+ ctx->perf .njobs_reset = 0 ;
2226+ ctx->perf .min = std::numeric_limits<double >::max ();
2227+ ctx->perf .max = 0 .0f ;
2228+ ctx->perf .mean = 0 .0f ;
2229+ ctx->perf .M2 = 0 .0f ;
2230+ ctx->perf .time_since_reset = std::chrono::system_clock::now ();
2231+
2232+ if (flux_respond (h, msg, NULL ) < 0 )
2233+ flux_log_error (h, " %s: flux_respond" , __FUNCTION__);
2234+ }
2235+
21822236static inline int64_t next_jobid (const std::map<uint64_t ,
21832237 std::shared_ptr<job_info_t >> &m)
21842238{
0 commit comments