Skip to content

Commit f9dc433

Browse files
authored
Merge pull request #1144 from milroy/issue-1137
Improve sched.resource-status RPC and search performance
2 parents 2c61dcf + 568f27e commit f9dc433

File tree

7 files changed

+177
-75
lines changed

7 files changed

+177
-75
lines changed

resource/modules/resource_match.cpp

Lines changed: 87 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,15 @@ struct resource_ctx_t : public resource_interface_t {
105105
std::map<uint64_t, uint64_t> allocations; /* Allocation table */
106106
std::map<uint64_t, uint64_t> reservations; /* Reservation table */
107107
std::map<std::string, std::shared_ptr<msg_wrap_t>> notify_msgs;
108+
bool m_resources_updated = true; /* resources have been updated */
109+
bool m_resources_down_updated = true; /* down resources have been updated */
110+
/* last time allocated resources search updated */
111+
std::chrono::time_point<
112+
std::chrono::system_clock> m_resources_alloc_updated;
113+
/* R caches */
114+
json_t *m_r_all;
115+
json_t *m_r_down;
116+
json_t *m_r_alloc;
108117
};
109118

110119
msg_wrap_t::msg_wrap_t (const msg_wrap_t &o)
@@ -296,13 +305,6 @@ static const struct flux_msg_handler_spec htab[] = {
296305
FLUX_MSGHANDLER_TABLE_END
297306
};
298307

299-
static double get_elapse_time (timeval &st, timeval &et)
300-
{
301-
double ts1 = (double)st.tv_sec + (double)st.tv_usec/1000000.0f;
302-
double ts2 = (double)et.tv_sec + (double)et.tv_usec/1000000.0f;
303-
return ts2 - ts1;
304-
}
305-
306308
/******************************************************************************
307309
* *
308310
* Module Initialization Routines *
@@ -317,6 +319,7 @@ static void set_default_args (std::shared_ptr<resource_ctx_t> &ctx)
317319
ct_opts.set_match_policy ("first");
318320
ct_opts.set_prune_filters ("ALL:core");
319321
ct_opts.set_match_format ("rv1_nosched");
322+
ct_opts.set_update_interval (0);
320323
ctx->opts += ct_opts;
321324
}
322325

@@ -348,6 +351,13 @@ static std::shared_ptr<resource_ctx_t> getctx (flux_t *h)
348351
ctx->fgraph = nullptr; /* Cannot be allocated at this point */
349352
ctx->writers = nullptr; /* Cannot be allocated at this point */
350353
ctx->reader = nullptr; /* Cannot be allocated at this point */
354+
ctx->m_r_all = nullptr;
355+
ctx->m_r_down = nullptr;
356+
ctx->m_r_alloc = nullptr;
357+
ctx->m_resources_updated = true;
358+
ctx->m_resources_down_updated = true;
359+
//gettimeofday (&(ctx->m_resources_alloc_updated), NULL);
360+
ctx->m_resources_alloc_updated = std::chrono::system_clock::now ();
351361
}
352362

353363
done:
@@ -1110,6 +1120,7 @@ static int grow_resource_db (std::shared_ptr<resource_ctx_t> &ctx,
11101120
}
11111121
}
11121122
ctx->db->metadata.set_graph_duration (duration);
1123+
ctx->m_resources_updated = true;
11131124

11141125
done:
11151126
idset_destroy (grow_set);
@@ -1202,6 +1213,10 @@ static int mark_now (std::shared_ptr<resource_ctx_t> &ctx,
12021213
flux_log (ctx->h, LOG_DEBUG,
12031214
"resource status changed (rankset=[%s] status=%s)",
12041215
ids, resource_pool_t::status_to_str (status).c_str ());
1216+
1217+
// Updated the ranks
1218+
ctx->m_resources_down_updated = true;
1219+
12051220
done:
12061221
return rc;
12071222
}
@@ -1308,16 +1323,14 @@ static int populate_resource_db_acquire (std::shared_ptr<resource_ctx_t> &ctx)
13081323
static int populate_resource_db (std::shared_ptr<resource_ctx_t> &ctx)
13091324
{
13101325
int rc = -1;
1311-
double elapse;
1312-
struct timeval st, et;
1326+
std::chrono::time_point<std::chrono::system_clock> start;
1327+
std::chrono::duration<double> elapsed;
13131328

13141329
if (ctx->opts.get_opt ().is_reserve_vtx_vec_set ())
13151330
ctx->db->resource_graph.m_vertices.reserve (
13161331
ctx->opts.get_opt ().get_reserve_vtx_vec ());
1317-
if ( (rc = gettimeofday (&st, NULL)) < 0) {
1318-
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
1319-
goto done;
1320-
}
1332+
1333+
start = std::chrono::system_clock::now ();
13211334
if (ctx->opts.get_opt ().is_load_file_set ()) {
13221335
if (populate_resource_db_file (ctx) < 0)
13231336
goto done;
@@ -1335,11 +1348,9 @@ static int populate_resource_db (std::shared_ptr<resource_ctx_t> &ctx)
13351348
"%s: loaded resources from core's resource.acquire",
13361349
__FUNCTION__);
13371350
}
1338-
if ( (rc = gettimeofday (&et, NULL)) < 0) {
1339-
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
1340-
goto done;
1341-
}
1342-
ctx->perf.load = get_elapse_time (st, et);
1351+
1352+
elapsed = std::chrono::system_clock::now () - start;
1353+
ctx->perf.load = elapsed.count ();
13431354
rc = 0;
13441355

13451356
done:
@@ -1399,9 +1410,12 @@ static std::shared_ptr<f_resource_graph_t> create_filtered_graph (
13991410
edg_infra_map_t emap = get (&resource_relation_t::idata, g);
14001411

14011412
// Set vertex and edge filters based on subsystems to use
1413+
int subsys_size = ctx->db->metadata.roots.size ();
14021414
const multi_subsystemsS &filter = ctx->matcher->subsystemsS ();
1403-
subsystem_selector_t<vtx_t, f_vtx_infra_map_t> vtxsel (vmap, filter);
1404-
subsystem_selector_t<edg_t, f_edg_infra_map_t> edgsel (emap, filter);
1415+
subsystem_selector_t<vtx_t, f_vtx_infra_map_t> vtxsel (vmap, filter,
1416+
subsys_size);
1417+
subsystem_selector_t<edg_t, f_edg_infra_map_t> edgsel (emap, filter,
1418+
subsys_size);
14051419
fg = std::make_shared<f_resource_graph_t> (g, edgsel, vtxsel);
14061420
} catch (std::bad_alloc &e) {
14071421
errno = ENOMEM;
@@ -1706,15 +1720,12 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
17061720
flux_error_t *errp)
17071721
{
17081722
int rc = 0;
1709-
double elapse = 0.0f;
1710-
struct timeval start;
1711-
struct timeval end;
1723+
std::chrono::time_point<std::chrono::system_clock> start;
1724+
std::chrono::duration<double> elapsed;
1725+
std::chrono::duration<int64_t> epoch;
17121726
bool rsv = false;
17131727

1714-
if ( (rc = gettimeofday (&start, NULL)) < 0) {
1715-
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
1716-
goto done;
1717-
}
1728+
start = std::chrono::system_clock::now ();
17181729
if (strcmp ("allocate", cmd) != 0
17191730
&& strcmp ("allocate_orelse_reserve", cmd) != 0
17201731
&& strcmp ("allocate_with_satisfiability", cmd) != 0
@@ -1725,7 +1736,9 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
17251736
goto done;
17261737
}
17271738

1728-
*at = *now = (int64_t)start.tv_sec;
1739+
epoch = std::chrono::duration_cast<std::chrono::seconds>
1740+
(start.time_since_epoch ());
1741+
*at = *now = epoch.count ();
17291742
if ( (rc = run (ctx, jobid, cmd, jstr, at, errp)) < 0) {
17301743
goto done;
17311744
}
@@ -1735,11 +1748,8 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
17351748
}
17361749

17371750
rsv = (*now != *at)? true : false;
1738-
if ( (rc = gettimeofday (&end, NULL)) < 0) {
1739-
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
1740-
goto done;
1741-
}
1742-
*ov = get_elapse_time (start, end);
1751+
elapsed = std::chrono::system_clock::now () - start;
1752+
*ov = elapsed.count ();
17431753
update_match_perf (ctx, *ov);
17441754

17451755
if (cmd != std::string ("satisfiability")) {
@@ -1761,16 +1771,12 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
17611771
{
17621772
int rc = 0;
17631773
uint64_t duration = 0;
1764-
double elapse = 0.0f;
1765-
struct timeval start;
1766-
struct timeval end;
1774+
std::chrono::time_point<std::chrono::system_clock> start;
1775+
std::chrono::duration<double> elapsed;
17671776
std::string jgf;
17681777
std::string R2;
17691778

1770-
if ( (rc = gettimeofday (&start, NULL)) < 0) {
1771-
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
1772-
goto done;
1773-
}
1779+
start = std::chrono::system_clock::now ();
17741780
if ( (rc = parse_R (ctx, R, jgf, at, duration)) < 0) {
17751781
flux_log_error (ctx->h, "%s: parsing R", __FUNCTION__);
17761782
goto done;
@@ -1783,11 +1789,8 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
17831789
flux_log_error (ctx->h, "%s: writers->emit", __FUNCTION__);
17841790
goto done;
17851791
}
1786-
if ( (rc = gettimeofday (&end, NULL)) < 0) {
1787-
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
1788-
goto done;
1789-
}
1790-
ov = get_elapse_time (start, end);
1792+
elapsed = std::chrono::system_clock::now () - start;
1793+
ov = elapsed.count ();
17911794
update_match_perf (ctx, ov);
17921795
if ( (rc = track_schedule_info (ctx, jobid, false, at, "", o, ov)) != 0) {
17931796
flux_log_error (ctx->h, "%s: can't add job info (id=%jd)",
@@ -1809,6 +1812,8 @@ static void update_request_cb (flux_t *h, flux_msg_handler_t *w,
18091812
uint64_t duration = 0;
18101813
std::string status = "";
18111814
std::stringstream o;
1815+
std::chrono::time_point<std::chrono::system_clock> start;
1816+
std::chrono::duration<double> elapsed;
18121817

18131818
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);
18141819
if (flux_request_unpack (msg, NULL, "{s:I s:s}",
@@ -1819,11 +1824,7 @@ static void update_request_cb (flux_t *h, flux_msg_handler_t *w,
18191824
}
18201825
if (is_existent_jobid (ctx, jobid)) {
18211826
int rc = 0;
1822-
struct timeval st, et;
1823-
if ( (rc = gettimeofday (&st, NULL)) < 0) {
1824-
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
1825-
goto error;
1826-
}
1827+
start = std::chrono::system_clock::now ();
18271828
if ( (rc = Rlite_equal (ctx, R, ctx->jobs[jobid]->R.c_str ())) < 0) {
18281829
flux_log_error (ctx->h, "%s: Rlite_equal", __FUNCTION__);
18291830
goto error;
@@ -1834,12 +1835,9 @@ static void update_request_cb (flux_t *h, flux_msg_handler_t *w,
18341835
__FUNCTION__, static_cast<intmax_t> (jobid));
18351836
goto error;
18361837
}
1837-
if ( (rc = gettimeofday (&et, NULL)) < 0) {
1838-
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
1839-
goto error;
1840-
}
1838+
elapsed = std::chrono::system_clock::now () - start;
18411839
// If a jobid with matching R exists, no need to update
1842-
ov = get_elapse_time (st, et);
1840+
ov = elapsed.count ();
18431841
get_jobstate_str (ctx->jobs[jobid]->state, status);
18441842
o << ctx->jobs[jobid]->R;
18451843
at = ctx->jobs[jobid]->scheduled_at;
@@ -2485,22 +2483,48 @@ static void status_request_cb (flux_t *h, flux_msg_handler_t *w,
24852483
json_t *R_all = nullptr;
24862484
json_t *R_down = nullptr;
24872485
json_t *R_alloc = nullptr;
2486+
std::chrono::time_point<std::chrono::system_clock> now;
2487+
std::chrono::duration<double> elapsed;
24882488
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);
24892489

2490-
if (run_find (ctx, "status=up or status=down", "rv1_nosched", &R_all) < 0)
2491-
goto error;
2492-
if (run_find (ctx, "status=down", "rv1_nosched", &R_down) < 0)
2493-
goto error;
2494-
if (run_find (ctx, "sched-now=allocated", "rv1_nosched", &R_alloc) < 0)
2495-
goto error;
2490+
now = std::chrono::system_clock::now ();
2491+
elapsed = now - ctx->m_resources_alloc_updated;
2492+
// Get R alloc whenever m_resources_alloc_updated or
2493+
// the elapsed time is greater than configured limit
2494+
if ( (elapsed.count () >
2495+
static_cast<double> (ctx->opts.get_opt ().get_update_interval ())) ||
2496+
ctx->m_resources_updated) {
2497+
if (run_find (ctx, "sched-now=allocated", "rv1_nosched", &R_alloc) < 0)
2498+
goto error;
2499+
ctx->m_r_alloc = json_deep_copy (R_alloc);
2500+
ctx->m_resources_alloc_updated = std::chrono::system_clock::now ();
2501+
} else
2502+
R_alloc = json_deep_copy (ctx->m_r_alloc);
2503+
2504+
if (ctx->m_resources_updated) {
2505+
if (run_find (ctx, "status=up or status=down", "rv1_nosched",
2506+
&R_all) < 0)
2507+
goto error;
2508+
ctx->m_r_all = json_deep_copy (R_all);
2509+
ctx->m_resources_updated = false;
2510+
} else
2511+
R_all = json_deep_copy (ctx->m_r_all);
2512+
2513+
if (ctx->m_resources_down_updated) {
2514+
if (run_find (ctx, "status=down", "rv1_nosched", &R_down) < 0)
2515+
goto error;
2516+
ctx->m_r_down = json_deep_copy (R_down);
2517+
ctx->m_resources_down_updated = false;
2518+
} else
2519+
R_down = json_deep_copy (ctx->m_r_down);
2520+
24962521
if (flux_respond_pack (h, msg, "{s:o? s:o? s:o?}",
24972522
"all", R_all,
24982523
"down", R_down,
24992524
"allocated", R_alloc) < 0) {
25002525
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
25012526
goto error;
25022527
}
2503-
25042528
flux_log (h, LOG_DEBUG, "%s: status succeeded", __FUNCTION__);
25052529
return;
25062530

@@ -2679,6 +2703,7 @@ static void set_status_request_cb (flux_t *h, flux_msg_handler_t *w,
26792703
errmsg = "Failed to set status of resource vertex";
26802704
goto error;
26812705
}
2706+
ctx->m_resources_down_updated = true;
26822707
if (flux_respond (h, msg, NULL) < 0) {
26832708
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
26842709
}

0 commit comments

Comments
 (0)