Skip to content

Commit 7f532e9

Browse files
garlickmilroy
authored andcommitted
qmanager: look up job queue name in KVS
Problem: when the fluxion modules are reloaded with running jobs, and the match format is "rv1_nosched", and queues are enabled, running jobs are killed with a fatal scheduler-restart exception. During the hello handshake defined by RFC 27, the scheduler is informed during its initialization of jobs that are holding resources. The hello callback in qmanager retrieves the job's queue name from the R key "attributes.system.scheduler.queue". The queue name is used to locate the proper queue for the job to be inserted into. This attribute is not being set in R when the match format is "rv1_nosched" so the default queue is assumed. Since the default queue is not instantiated when named queues are defined, a fatal job exception is raised when the queue lookup fails. There has been a proposal to deprecate the R attribute (#1108), so rather than ensure that it is set in this case, determine the queue instead by fetching the jobspec from the KVS. Fixes #1108
1 parent ec48748 commit 7f532e9

File tree

1 file changed

+47
-23
lines changed

1 file changed

+47
-23
lines changed

qmanager/modules/qmanager_callbacks.cpp

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,30 @@ int qmanager_cb_t::post_sched_loop (flux_t *h, schedutil_t *schedutil,
120120
return rc;
121121
}
122122

123+
/* The RFC 27 hello handshake occurs during scheduler initialization. Its
124+
* purpose is to inform the scheduler of jobs that already have resources
125+
* allocated. This callback is made once per job. The callback should return
126+
* 0 on success or -1 on failure. On failure, the job manager raises a fatal
127+
* exception on the job.
128+
*
129+
* Jobs that already have resources at hello need to be assigned to the correct
130+
* qmanager queue, but the queue is not provided in the hello metadata.
131+
* Therefore, jobspec is fetched from the KVS so that attributes.system.queue
132+
* can be extracted from it.
133+
*
134+
* Note that fluxion instantiates the "default" queue when no named queues
135+
* are configured. Therefore, when the queue attribute is not defined, we
136+
* put the job in the default queue.
137+
*
138+
* Fail the job if its queue attribute (or lack thereof) no longer matches a
139+
* valid queue. This can occur if queues have been reconfigured since job
140+
* submission.
141+
*/
123142
int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
124143
const char *R, void *arg)
125144

126145
{
127146
int rc = 0;
128-
json_t *o = NULL;
129-
json_error_t err;
130147
std::string R_out;
131148
char *qn_attr = NULL;
132149
std::string queue_name;
@@ -137,38 +154,44 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
137154
unsigned int prio;
138155
uint32_t uid;
139156
double ts;
157+
json_t *jobspec = NULL;
158+
flux_future_t *f = NULL;
140159

160+
/* Don't expect jobspec to be set here as it is not currently defined
161+
* in RFC 27. However, add it anyway in case the hello protocol
162+
* evolves to include it. If it is not set, it must be looked up.
163+
*/
141164
if (flux_msg_unpack (msg,
142-
"{s:I s:i s:i s:f}",
165+
"{s:I s:i s:i s:f s?o}",
143166
"id", &id,
144167
"priority", &prio,
145168
"userid", &uid,
146-
"t_submit", &ts) < 0) {
169+
"t_submit", &ts,
170+
"jobspec", &jobspec) < 0) {
147171
flux_log_error (h, "%s: flux_msg_unpack", __FUNCTION__);
148172
goto out;
149173
}
150-
151-
if ( (o = json_loads (R, 0, &err)) == NULL) {
152-
rc = -1;
153-
errno = EPROTO;
154-
flux_log (h, LOG_ERR, "%s: parsing R for job (id=%jd): %s %s@%d:%d",
155-
__FUNCTION__, static_cast<intmax_t> (id),
156-
err.text, err.source, err.line, err.column);
157-
goto out;
174+
if (!jobspec) {
175+
char key[64] = { 0 };
176+
if (flux_job_kvs_key (key, sizeof (key), id, "jobspec") < 0
177+
|| !(f = flux_kvs_lookup (h, NULL, 0, key))
178+
|| flux_kvs_lookup_get_unpack (f, "o", &jobspec) < 0) {
179+
flux_log_error (h, "%s", key);
180+
goto out;
181+
}
158182
}
159-
if ( (rc = json_unpack (o, "{ s?:{s?:{s?:{s?:s}}} }",
160-
"attributes",
161-
"system",
162-
"scheduler",
163-
"queue", &qn_attr)) < 0) {
164-
json_decref (o);
165-
errno = EPROTO;
166-
flux_log (h, LOG_ERR, "%s: json_unpack for attributes", __FUNCTION__);
183+
if (json_unpack (jobspec,
184+
"{s?{s?{s?s}}}",
185+
"attributes",
186+
"system",
187+
"queue", &qn_attr) < 0) {
188+
flux_log_error (h, "error parsing jobspec");
167189
goto out;
168190
}
169-
170-
queue_name = qn_attr? qn_attr : ctx->opts.get_opt ().get_default_queue_name ();
171-
json_decref (o);
191+
if (qn_attr)
192+
queue_name = qn_attr;
193+
else
194+
queue_name = ctx->opts.get_opt ().get_default_queue_name ();
172195
queue = ctx->queues.at (queue_name);
173196
running_job = std::make_shared<job_t> (job_state_kind_t::RUNNING,
174197
id, uid, calc_priority (prio),
@@ -184,6 +207,7 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
184207
queue_name.c_str (), static_cast<intmax_t> (id));
185208

186209
out:
210+
flux_future_destroy (f);
187211
return rc;
188212
}
189213

0 commit comments

Comments
 (0)