Skip to content

Commit bce54d0

Browse files
committed
resource module: add support for update with rv1exec
Problem: the sched-fluxion-resource.update RPC only supports JGF. Add a check for an empty scheduling key and fall back to rv1exec.
1 parent ddd5df4 commit bce54d0

File tree

1 file changed

+41
-23
lines changed

1 file changed

+41
-23
lines changed

resource/modules/resource_match.cpp

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1568,7 +1568,8 @@ static int track_schedule_info (std::shared_ptr<resource_ctx_t> &ctx,
15681568
}
15691569

15701570
static int parse_R (std::shared_ptr<resource_ctx_t> &ctx, const char *R,
1571-
std::string &jgf, int64_t &starttime, uint64_t &duration)
1571+
std::string &R_graph_fmt, int64_t &starttime, uint64_t &duration,
1572+
std::string &format)
15721573
{
15731574
int rc = 0;
15741575
int version = 0;
@@ -1605,20 +1606,22 @@ static int parse_R (std::shared_ptr<resource_ctx_t> &ctx, const char *R,
16051606
static_cast<intmax_t> (st), static_cast<intmax_t> (et));
16061607
goto freemem_out;
16071608
}
1608-
if (graph == NULL) {
1609-
rc = -1;
1610-
errno = ENOENT;
1611-
flux_log (ctx->h, LOG_ERR, "%s: no scheduling key in R", __FUNCTION__);
1612-
goto freemem_out;
1613-
}
1614-
if ( !(jgf_str = json_dumps (graph, JSON_INDENT (0)))) {
1615-
rc = -1;
1616-
errno = ENOMEM;
1617-
flux_log (ctx->h, LOG_ERR, "%s: json_dumps", __FUNCTION__);
1618-
goto freemem_out;
1609+
if (graph != NULL) {
1610+
if ( !(jgf_str = json_dumps (graph, JSON_INDENT (0)))) {
1611+
rc = -1;
1612+
errno = ENOMEM;
1613+
flux_log (ctx->h, LOG_ERR, "%s: json_dumps", __FUNCTION__);
1614+
goto freemem_out;
1615+
}
1616+
R_graph_fmt = jgf_str;
1617+
free (jgf_str);
1618+
format = "jgf";
1619+
} else {
1620+
// Use the rv1exec reader
1621+
R_graph_fmt = R;
1622+
format = "rv1exec";
16191623
}
1620-
jgf = jgf_str;
1621-
free (jgf_str);
1624+
16221625
starttime = static_cast<int64_t> (st);
16231626
duration = et - st;
16241627

@@ -1710,18 +1713,33 @@ static int run (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
17101713
}
17111714

17121715
static int run (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
1713-
const std::string &jgf, int64_t at, uint64_t duration)
1716+
const std::string &R, int64_t at, uint64_t duration,
1717+
std::string &format)
17141718
{
17151719
int rc = 0;
17161720
dfu_traverser_t &tr = *(ctx->traverser);
17171721
std::shared_ptr<resource_reader_base_t> rd;
1718-
if ((rd = create_resource_reader ("jgf")) == nullptr) {
1722+
if (format == "jgf") {
1723+
if ((rd = create_resource_reader ("jgf")) == nullptr) {
1724+
rc = -1;
1725+
flux_log (ctx->h, LOG_ERR, "%s: create JGF reader (id=%jd)",
1726+
__FUNCTION__, static_cast<intmax_t> (jobid));
1727+
goto out;
1728+
}
1729+
} else if (format == "rv1exec") {
1730+
if ((rd = create_resource_reader ("rv1exec")) == nullptr) {
1731+
rc = -1;
1732+
flux_log (ctx->h, LOG_ERR, "%s: create rv1exec reader (id=%jd)",
1733+
__FUNCTION__, static_cast<intmax_t> (jobid));
1734+
goto out;
1735+
}
1736+
} else {
17191737
rc = -1;
1720-
flux_log (ctx->h, LOG_ERR, "%s: create_resource_reader (id=%jd)",
1721-
__FUNCTION__, static_cast<intmax_t> (jobid));
1738+
flux_log (ctx->h, LOG_ERR, "%s: create rv1exec reader (id=%jd)",
1739+
__FUNCTION__, static_cast<intmax_t> (jobid));
17221740
goto out;
17231741
}
1724-
if ((rc = tr.run (jgf, ctx->writers, rd, jobid, at, duration)) < 0) {
1742+
if ((rc = tr.run (R, ctx->writers, rd, jobid, at, duration)) < 0) {
17251743
flux_log (ctx->h, LOG_ERR, "%s: dfu_traverser_t::run (id=%jd): %s",
17261744
__FUNCTION__, static_cast<intmax_t> (jobid),
17271745
ctx->traverser->err_message ().c_str ());
@@ -1791,15 +1809,15 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
17911809
uint64_t duration = 0;
17921810
std::chrono::time_point<std::chrono::system_clock> start;
17931811
std::chrono::duration<double> elapsed;
1794-
std::string jgf;
1795-
std::string R2;
1812+
std::string R_graph_fmt;
1813+
std::string format;
17961814

17971815
start = std::chrono::system_clock::now ();
1798-
if ( (rc = parse_R (ctx, R, jgf, at, duration)) < 0) {
1816+
if ( (rc = parse_R (ctx, R, R_graph_fmt, at, duration, format)) < 0) {
17991817
flux_log_error (ctx->h, "%s: parsing R", __FUNCTION__);
18001818
goto done;
18011819
}
1802-
if ( (rc = run (ctx, jobid, jgf, at, duration)) < 0) {
1820+
if ( (rc = run (ctx, jobid, R_graph_fmt, at, duration, format)) < 0) {
18031821
flux_log_error (ctx->h, "%s: run", __FUNCTION__);
18041822
goto done;
18051823
}

0 commit comments

Comments
 (0)