Skip to content

Commit 2f8ada9

Browse files
garlickmilroy
authored andcommitted
qmanager: look up job queue name in KVS
Problem: when the fluxion modules are reloaded with running jobs, and the match format is "rv1_nosched", and queues are enabled, running jobs are killed with a fatal scheduler-restart exception. During the hello handshake defined by RFC 27, the scheduler is informed during its initialization of jobs that are holding resources. The hello callback in qmanager retrieves the job's queue name from the R key "attributes.system.scheduler.queue". The queue name is used to locate the proper queue for the job to be inserted into. This attribute is not being set in R when the match format is "rv1_nosched" so the default queue is assumed. Since the default queue is not instantiated when named queues are defined, a fatal job exception is raised when the queue lookup fails. There has been a proposal to deprecate the R attribute (#1108), so rather than ensure that it is set in this case, determine the queue instead by fetching the jobspec from the KVS. Fixes #1108
1 parent 9f5bd6a commit 2f8ada9

File tree

1 file changed

+47
-23
lines changed

1 file changed

+47
-23
lines changed

qmanager/modules/qmanager_callbacks.cpp

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,30 @@ int qmanager_cb_t::post_sched_loop (flux_t *h, schedutil_t *schedutil,
113113
return rc;
114114
}
115115

116+
/* The RFC 27 hello handshake occurs during scheduler initialization. Its
117+
* purpose is to inform the scheduler of jobs that already have resources
118+
* allocated. This callback is made once per job. The callback should return
119+
* 0 on success or -1 on failure. On failure, the job manager raises a fatal
120+
* exception on the job.
121+
*
122+
* Jobs that already have resources at hello need to be assigned to the correct
123+
* qmanager queue, but the queue is not provided in the hello metadata.
124+
* Therefore, jobspec is fetched from the KVS so that attributes.system.queue
125+
* can be extracted from it.
126+
*
127+
* Note that fluxion instantiates the "default" queue when no named queues
128+
* are configured. Therefore, when the queue attribute is not defined, we
129+
* put the job in the default queue.
130+
*
131+
* Fail the job if its queue attribute (or lack thereof) no longer matches a
132+
* valid queue. This can occur if queues have been reconfigured since job
133+
* submission.
134+
*/
116135
int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
117136
const char *R, void *arg)
118137

119138
{
120139
int rc = 0;
121-
json_t *o = NULL;
122-
json_error_t err;
123140
std::string R_out;
124141
char *qn_attr = NULL;
125142
std::string queue_name;
@@ -130,38 +147,44 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
130147
unsigned int prio;
131148
uint32_t uid;
132149
double ts;
150+
json_t *jobspec = NULL;
151+
flux_future_t *f = NULL;
133152

153+
/* Don't expect jobspec to be set here as it is not currently defined
154+
* in RFC 27. However, add it anyway in case the hello protocol
155+
* evolves to include it. If it is not set, it must be looked up.
156+
*/
134157
if (flux_msg_unpack (msg,
135-
"{s:I s:i s:i s:f}",
158+
"{s:I s:i s:i s:f s?o}",
136159
"id", &id,
137160
"priority", &prio,
138161
"userid", &uid,
139-
"t_submit", &ts) < 0) {
162+
"t_submit", &ts,
163+
"jobspec", &jobspec) < 0) {
140164
flux_log_error (h, "%s: flux_msg_unpack", __FUNCTION__);
141165
goto out;
142166
}
143-
144-
if ( (o = json_loads (R, 0, &err)) == NULL) {
145-
rc = -1;
146-
errno = EPROTO;
147-
flux_log (h, LOG_ERR, "%s: parsing R for job (id=%jd): %s %s@%d:%d",
148-
__FUNCTION__, static_cast<intmax_t> (id),
149-
err.text, err.source, err.line, err.column);
150-
goto out;
167+
if (!jobspec) {
168+
char key[64] = { 0 };
169+
if (flux_job_kvs_key (key, sizeof (key), id, "jobspec") < 0
170+
|| !(f = flux_kvs_lookup (h, NULL, 0, key))
171+
|| flux_kvs_lookup_get_unpack (f, "o", &jobspec) < 0) {
172+
flux_log_error (h, "%s", key);
173+
goto out;
174+
}
151175
}
152-
if ( (rc = json_unpack (o, "{ s?:{s?:{s?:{s?:s}}} }",
153-
"attributes",
154-
"system",
155-
"scheduler",
156-
"queue", &qn_attr)) < 0) {
157-
json_decref (o);
158-
errno = EPROTO;
159-
flux_log (h, LOG_ERR, "%s: json_unpack for attributes", __FUNCTION__);
176+
if (json_unpack (jobspec,
177+
"{s?{s?{s?s}}}",
178+
"attributes",
179+
"system",
180+
"queue", &qn_attr) < 0) {
181+
flux_log_error (h, "error parsing jobspec");
160182
goto out;
161183
}
162-
163-
queue_name = qn_attr? qn_attr : ctx->opts.get_opt ().get_default_queue_name ();
164-
json_decref (o);
184+
if (qn_attr)
185+
queue_name = qn_attr;
186+
else
187+
queue_name = ctx->opts.get_opt ().get_default_queue_name ();
165188
queue = ctx->queues.at (queue_name);
166189
running_job = std::make_shared<job_t> (job_state_kind_t::RUNNING,
167190
id, uid, calc_priority (prio),
@@ -177,6 +200,7 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
177200
queue_name.c_str (), static_cast<intmax_t> (id));
178201

179202
out:
203+
flux_future_destroy (f);
180204
return rc;
181205
}
182206

0 commit comments

Comments
 (0)