Skip to content

Commit 143020d

Browse files
authored
Merge pull request #1176 from milroy/rv1-update
Implement rv1exec reader update to facilitate Fluxion reload
2 parents 8e413e7 + 5fc9272 commit 143020d

25 files changed

+1084
-172
lines changed

qmanager/modules/qmanager_callbacks.cpp

Lines changed: 58 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,30 @@ int qmanager_cb_t::post_sched_loop (flux_t *h, schedutil_t *schedutil,
113113
return rc;
114114
}
115115

116+
/* The RFC 27 hello handshake occurs during scheduler initialization. Its
117+
* purpose is to inform the scheduler of jobs that already have resources
118+
* allocated. This callback is made once per job. The callback should return
119+
* 0 on success or -1 on failure. On failure, the job manager raises a fatal
120+
* exception on the job.
121+
*
122+
* Jobs that already have resources at hello need to be assigned to the correct
123+
* qmanager queue, but the queue is not provided in the hello metadata.
124+
* Therefore, jobspec is fetched from the KVS so that attributes.system.queue
125+
* can be extracted from it.
126+
*
127+
* Note that fluxion instantiates the "default" queue when no named queues
128+
* are configured. Therefore, when the queue attribute is not defined, we
129+
* put the job in the default queue.
130+
*
131+
* Fail the job if its queue attribute (or lack thereof) no longer matches a
132+
* valid queue. This can occur if queues have been reconfigured since job
133+
* submission.
134+
*/
116135
int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
117136
const char *R, void *arg)
118137

119138
{
120-
int rc = 0;
121-
json_t *o = NULL;
122-
json_error_t err;
139+
int rc = -1;
123140
std::string R_out;
124141
char *qn_attr = NULL;
125142
std::string queue_name;
@@ -130,53 +147,68 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
130147
unsigned int prio;
131148
uint32_t uid;
132149
double ts;
150+
json_t *jobspec = NULL;
151+
flux_future_t *f = NULL;
133152

153+
/* Don't expect jobspec to be set here as it is not currently defined
154+
* in RFC 27. However, add it anyway in case the hello protocol
155+
* evolves to include it. If it is not set, it must be looked up.
156+
*/
134157
if (flux_msg_unpack (msg,
135-
"{s:I s:i s:i s:f}",
158+
"{s:I s:i s:i s:f s?o}",
136159
"id", &id,
137160
"priority", &prio,
138161
"userid", &uid,
139-
"t_submit", &ts) < 0) {
162+
"t_submit", &ts,
163+
"jobspec", &jobspec) < 0) {
140164
flux_log_error (h, "%s: flux_msg_unpack", __FUNCTION__);
141165
goto out;
142166
}
143-
144-
if ( (o = json_loads (R, 0, &err)) == NULL) {
145-
rc = -1;
146-
errno = EPROTO;
147-
flux_log (h, LOG_ERR, "%s: parsing R for job (id=%jd): %s %s@%d:%d",
148-
__FUNCTION__, static_cast<intmax_t> (id),
149-
err.text, err.source, err.line, err.column);
167+
if (!jobspec) {
168+
char key[64] = { 0 };
169+
if (flux_job_kvs_key (key, sizeof (key), id, "jobspec") < 0
170+
|| !(f = flux_kvs_lookup (h, NULL, 0, key))
171+
|| flux_kvs_lookup_get_unpack (f, "o", &jobspec) < 0) {
172+
flux_log_error (h, "%s", key);
173+
goto out;
174+
}
175+
}
176+
if (json_unpack (jobspec,
177+
"{s?{s?{s?s}}}",
178+
"attributes",
179+
"system",
180+
"queue", &qn_attr) < 0) {
181+
flux_log_error (h, "error parsing jobspec");
150182
goto out;
151183
}
152-
if ( (rc = json_unpack (o, "{ s?:{s?:{s?:{s?:s}}} }",
153-
"attributes",
154-
"system",
155-
"scheduler",
156-
"queue", &qn_attr)) < 0) {
157-
json_decref (o);
158-
errno = EPROTO;
159-
flux_log (h, LOG_ERR, "%s: json_unpack for attributes", __FUNCTION__);
184+
if (qn_attr)
185+
queue_name = qn_attr;
186+
else
187+
queue_name = ctx->opts.get_opt ().get_default_queue_name ();
188+
if (ctx->queues.find (queue_name) == ctx->queues.end ()) {
189+
flux_log (h,
190+
LOG_ERR,
191+
"%s: unknown queue name (id=%jd queue=%s)",
192+
__FUNCTION__,
193+
static_cast<intmax_t> (id),
194+
queue_name.c_str ());
160195
goto out;
161196
}
162-
163-
queue_name = qn_attr? qn_attr : ctx->opts.get_opt ().get_default_queue_name ();
164-
json_decref (o);
165197
queue = ctx->queues.at (queue_name);
166198
running_job = std::make_shared<job_t> (job_state_kind_t::RUNNING,
167199
id, uid, calc_priority (prio),
168200
ts, R);
169201

170-
if ( (rc = queue->reconstruct (static_cast<void *> (h),
171-
running_job, R_out)) < 0) {
202+
if (queue->reconstruct (static_cast<void *> (h), running_job, R_out) < 0) {
172203
flux_log_error (h, "%s: reconstruct (id=%jd queue=%s)", __FUNCTION__,
173204
static_cast<intmax_t> (id), queue_name.c_str ());
174205
goto out;
175206
}
176207
flux_log (h, LOG_DEBUG, "requeue success (queue=%s id=%jd)",
177208
queue_name.c_str (), static_cast<intmax_t> (id));
178-
209+
rc = 0;
179210
out:
211+
flux_future_destroy (f);
180212
return rc;
181213
}
182214

resource/modules/resource_match.cpp

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1568,7 +1568,8 @@ static int track_schedule_info (std::shared_ptr<resource_ctx_t> &ctx,
15681568
}
15691569

15701570
static int parse_R (std::shared_ptr<resource_ctx_t> &ctx, const char *R,
1571-
std::string &jgf, int64_t &starttime, uint64_t &duration)
1571+
std::string &R_graph_fmt, int64_t &starttime, uint64_t &duration,
1572+
std::string &format)
15721573
{
15731574
int rc = 0;
15741575
int version = 0;
@@ -1605,20 +1606,22 @@ static int parse_R (std::shared_ptr<resource_ctx_t> &ctx, const char *R,
16051606
static_cast<intmax_t> (st), static_cast<intmax_t> (et));
16061607
goto freemem_out;
16071608
}
1608-
if (graph == NULL) {
1609-
rc = -1;
1610-
errno = ENOENT;
1611-
flux_log (ctx->h, LOG_ERR, "%s: no scheduling key in R", __FUNCTION__);
1612-
goto freemem_out;
1613-
}
1614-
if ( !(jgf_str = json_dumps (graph, JSON_INDENT (0)))) {
1615-
rc = -1;
1616-
errno = ENOMEM;
1617-
flux_log (ctx->h, LOG_ERR, "%s: json_dumps", __FUNCTION__);
1618-
goto freemem_out;
1609+
if (graph != NULL) {
1610+
if ( !(jgf_str = json_dumps (graph, JSON_INDENT (0)))) {
1611+
rc = -1;
1612+
errno = ENOMEM;
1613+
flux_log (ctx->h, LOG_ERR, "%s: json_dumps", __FUNCTION__);
1614+
goto freemem_out;
1615+
}
1616+
R_graph_fmt = jgf_str;
1617+
free (jgf_str);
1618+
format = "jgf";
1619+
} else {
1620+
// Use the rv1exec reader
1621+
R_graph_fmt = R;
1622+
format = "rv1exec";
16191623
}
1620-
jgf = jgf_str;
1621-
free (jgf_str);
1624+
16221625
starttime = static_cast<int64_t> (st);
16231626
duration = et - st;
16241627

@@ -1710,18 +1713,33 @@ static int run (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
17101713
}
17111714

17121715
static int run (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
1713-
const std::string &jgf, int64_t at, uint64_t duration)
1716+
const std::string &R, int64_t at, uint64_t duration,
1717+
std::string &format)
17141718
{
17151719
int rc = 0;
17161720
dfu_traverser_t &tr = *(ctx->traverser);
17171721
std::shared_ptr<resource_reader_base_t> rd;
1718-
if ((rd = create_resource_reader ("jgf")) == nullptr) {
1722+
if (format == "jgf") {
1723+
if ((rd = create_resource_reader ("jgf")) == nullptr) {
1724+
rc = -1;
1725+
flux_log (ctx->h, LOG_ERR, "%s: create JGF reader (id=%jd)",
1726+
__FUNCTION__, static_cast<intmax_t> (jobid));
1727+
goto out;
1728+
}
1729+
} else if (format == "rv1exec") {
1730+
if ((rd = create_resource_reader ("rv1exec")) == nullptr) {
1731+
rc = -1;
1732+
flux_log (ctx->h, LOG_ERR, "%s: create rv1exec reader (id=%jd)",
1733+
__FUNCTION__, static_cast<intmax_t> (jobid));
1734+
goto out;
1735+
}
1736+
} else {
17191737
rc = -1;
1720-
flux_log (ctx->h, LOG_ERR, "%s: create_resource_reader (id=%jd)",
1721-
__FUNCTION__, static_cast<intmax_t> (jobid));
1738+
flux_log (ctx->h, LOG_ERR, "%s: create rv1exec reader (id=%jd)",
1739+
__FUNCTION__, static_cast<intmax_t> (jobid));
17221740
goto out;
17231741
}
1724-
if ((rc = tr.run (jgf, ctx->writers, rd, jobid, at, duration)) < 0) {
1742+
if ((rc = tr.run (R, ctx->writers, rd, jobid, at, duration)) < 0) {
17251743
flux_log (ctx->h, LOG_ERR, "%s: dfu_traverser_t::run (id=%jd): %s",
17261744
__FUNCTION__, static_cast<intmax_t> (jobid),
17271745
ctx->traverser->err_message ().c_str ());
@@ -1791,15 +1809,15 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
17911809
uint64_t duration = 0;
17921810
std::chrono::time_point<std::chrono::system_clock> start;
17931811
std::chrono::duration<double> elapsed;
1794-
std::string jgf;
1795-
std::string R2;
1812+
std::string R_graph_fmt;
1813+
std::string format;
17961814

17971815
start = std::chrono::system_clock::now ();
1798-
if ( (rc = parse_R (ctx, R, jgf, at, duration)) < 0) {
1816+
if ( (rc = parse_R (ctx, R, R_graph_fmt, at, duration, format)) < 0) {
17991817
flux_log_error (ctx->h, "%s: parsing R", __FUNCTION__);
18001818
goto done;
18011819
}
1802-
if ( (rc = run (ctx, jobid, jgf, at, duration)) < 0) {
1820+
if ( (rc = run (ctx, jobid, R_graph_fmt, at, duration, format)) < 0) {
18031821
flux_log_error (ctx->h, "%s: run", __FUNCTION__);
18041822
goto done;
18051823
}

0 commit comments

Comments
 (0)