Skip to content

Commit ea0749c

Browse files
committed
module: add caching for status cb
Problem: Fluxion `find` search times are large for large resources graphs. To serve many users effectively, the resource status RPC response needs to return quickly. We needs caches for R allocations, R down, and R all resources. Add caches and logic to update the caches when resources or their states are updated, or after the configured update interval has been exceeded.
1 parent c43667d commit ea0749c

File tree

1 file changed

+56
-7
lines changed

1 file changed

+56
-7
lines changed

resource/modules/resource_match.cpp

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,15 @@ struct resource_ctx_t : public resource_interface_t {
105105
std::map<uint64_t, uint64_t> allocations; /* Allocation table */
106106
std::map<uint64_t, uint64_t> reservations; /* Reservation table */
107107
std::map<std::string, std::shared_ptr<msg_wrap_t>> notify_msgs;
108+
bool m_resources_updated = true; /* resources have been updated */
109+
bool m_resources_down_updated = true; /* down resources have been updated */
110+
/* last time allocated resources search updated */
111+
std::chrono::time_point<
112+
std::chrono::system_clock> m_resources_alloc_updated;
113+
/* R caches */
114+
json_t *m_r_all;
115+
json_t *m_r_down;
116+
json_t *m_r_alloc;
108117
};
109118

110119
msg_wrap_t::msg_wrap_t (const msg_wrap_t &o)
@@ -317,6 +326,7 @@ static void set_default_args (std::shared_ptr<resource_ctx_t> &ctx)
317326
ct_opts.set_match_policy ("first");
318327
ct_opts.set_prune_filters ("ALL:core");
319328
ct_opts.set_match_format ("rv1_nosched");
329+
ct_opts.set_update_interval (0);
320330
ctx->opts += ct_opts;
321331
}
322332

@@ -348,6 +358,13 @@ static std::shared_ptr<resource_ctx_t> getctx (flux_t *h)
348358
ctx->fgraph = nullptr; /* Cannot be allocated at this point */
349359
ctx->writers = nullptr; /* Cannot be allocated at this point */
350360
ctx->reader = nullptr; /* Cannot be allocated at this point */
361+
ctx->m_r_all = nullptr;
362+
ctx->m_r_down = nullptr;
363+
ctx->m_r_alloc = nullptr;
364+
ctx->m_resources_updated = true;
365+
ctx->m_resources_down_updated = true;
366+
//gettimeofday (&(ctx->m_resources_alloc_updated), NULL);
367+
ctx->m_resources_alloc_updated = std::chrono::system_clock::now ();
351368
}
352369

353370
done:
@@ -1110,6 +1127,7 @@ static int grow_resource_db (std::shared_ptr<resource_ctx_t> &ctx,
11101127
}
11111128
}
11121129
ctx->db->metadata.set_graph_duration (duration);
1130+
ctx->m_resources_updated = true;
11131131

11141132
done:
11151133
idset_destroy (grow_set);
@@ -1202,6 +1220,10 @@ static int mark_now (std::shared_ptr<resource_ctx_t> &ctx,
12021220
flux_log (ctx->h, LOG_DEBUG,
12031221
"resource status changed (rankset=[%s] status=%s)",
12041222
ids, resource_pool_t::status_to_str (status).c_str ());
1223+
1224+
// Updated the ranks
1225+
ctx->m_resources_down_updated = true;
1226+
12051227
done:
12061228
return rc;
12071229
}
@@ -2485,22 +2507,48 @@ static void status_request_cb (flux_t *h, flux_msg_handler_t *w,
24852507
json_t *R_all = nullptr;
24862508
json_t *R_down = nullptr;
24872509
json_t *R_alloc = nullptr;
2510+
std::chrono::time_point<std::chrono::system_clock> now;
2511+
std::chrono::duration<double> elapsed;
24882512
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);
24892513

2490-
if (run_find (ctx, "status=up or status=down", "rv1_nosched", &R_all) < 0)
2491-
goto error;
2492-
if (run_find (ctx, "status=down", "rv1_nosched", &R_down) < 0)
2493-
goto error;
2494-
if (run_find (ctx, "sched-now=allocated", "rv1_nosched", &R_alloc) < 0)
2495-
goto error;
2514+
now = std::chrono::system_clock::now ();
2515+
elapsed = now - ctx->m_resources_alloc_updated;
2516+
// Get R alloc whenever m_resources_alloc_updated or
2517+
// the elapsed time is greater than configured limit
2518+
if ( (elapsed.count () >
2519+
static_cast<double> (ctx->opts.get_opt ().get_update_interval ())) ||
2520+
ctx->m_resources_updated) {
2521+
if (run_find (ctx, "sched-now=allocated", "rv1_nosched", &R_alloc) < 0)
2522+
goto error;
2523+
ctx->m_r_alloc = json_deep_copy (R_alloc);
2524+
ctx->m_resources_alloc_updated = std::chrono::system_clock::now ();
2525+
} else
2526+
R_alloc = json_deep_copy (ctx->m_r_alloc);
2527+
2528+
if (ctx->m_resources_updated) {
2529+
if (run_find (ctx, "status=up or status=down", "rv1_nosched",
2530+
&R_all) < 0)
2531+
goto error;
2532+
ctx->m_r_all = json_deep_copy (R_all);
2533+
ctx->m_resources_updated = false;
2534+
} else
2535+
R_all = json_deep_copy (ctx->m_r_all);
2536+
2537+
if (ctx->m_resources_down_updated) {
2538+
if (run_find (ctx, "status=down", "rv1_nosched", &R_down) < 0)
2539+
goto error;
2540+
ctx->m_r_down = json_deep_copy (R_down);
2541+
ctx->m_resources_down_updated = false;
2542+
} else
2543+
R_down = json_deep_copy (ctx->m_r_down);
2544+
24962545
if (flux_respond_pack (h, msg, "{s:o? s:o? s:o?}",
24972546
"all", R_all,
24982547
"down", R_down,
24992548
"allocated", R_alloc) < 0) {
25002549
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
25012550
goto error;
25022551
}
2503-
25042552
flux_log (h, LOG_DEBUG, "%s: status succeeded", __FUNCTION__);
25052553
return;
25062554

@@ -2679,6 +2727,7 @@ static void set_status_request_cb (flux_t *h, flux_msg_handler_t *w,
26792727
errmsg = "Failed to set status of resource vertex";
26802728
goto error;
26812729
}
2730+
ctx->m_resources_down_updated = true;
26822731
if (flux_respond (h, msg, NULL) < 0) {
26832732
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
26842733
}

0 commit comments

Comments
 (0)