Skip to content

Commit 48b809a

Browse files
garlickmilroy
authored andcommitted
qmanager: look up job queue name in KVS
Problem: when the fluxion modules are reloaded with running jobs, and the match format is "rv1_nosched", and queues are enabled, running jobs are killed with a fatal scheduler-restart exception. During the hello handshake defined by RFC 27, the scheduler is informed during its initialization of jobs that are holding resources. The hello callback in qmanager retrieves the job's queue name from the R key "attributes.system.scheduler.queue". The queue name is used to locate the proper queue for the job to be inserted into. This attribute is not being set in R when the match format is "rv1_nosched" so the default queue is assumed. Since the default queue is not instantiated when named queues are defined, a fatal job exception is raised when the queue lookup fails. There has been a proposal to deprecate the R attribute (#1108), so rather than ensure that it is set in this case, determine the queue instead by fetching the jobspec from the KVS. Fixes #1108
1 parent 416c59a commit 48b809a

File tree

1 file changed

+47
-23
lines changed

1 file changed

+47
-23
lines changed

qmanager/modules/qmanager_callbacks.cpp

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,30 @@ int qmanager_cb_t::post_sched_loop (flux_t *h, schedutil_t *schedutil,
119119
return rc;
120120
}
121121

122+
/* The RFC 27 hello handshake occurs during scheduler initialization. Its
123+
* purpose is to inform the scheduler of jobs that already have resources
124+
* allocated. This callback is made once per job. The callback should return
125+
* 0 on success or -1 on failure. On failure, the job manager raises a fatal
126+
* exception on the job.
127+
*
128+
* Jobs that already have resources at hello need to be assigned to the correct
129+
* qmanager queue, but the queue is not provided in the hello metadata.
130+
* Therefore, jobspec is fetched from the KVS so that attributes.system.queue
131+
* can be extracted from it.
132+
*
133+
* Note that fluxion instantiates the "default" queue when no named queues
134+
* are configured. Therefore, when the queue attribute is not defined, we
135+
* put the job in the default queue.
136+
*
137+
* Fail the job if its queue attribute (or lack thereof) no longer matches a
138+
* valid queue. This can occur if queues have been reconfigured since job
139+
* submission.
140+
*/
122141
int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
123142
const char *R, void *arg)
124143

125144
{
126145
int rc = 0;
127-
json_t *o = NULL;
128-
json_error_t err;
129146
std::string R_out;
130147
char *qn_attr = NULL;
131148
std::string queue_name;
@@ -136,38 +153,44 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
136153
unsigned int prio;
137154
uint32_t uid;
138155
double ts;
156+
json_t *jobspec = NULL;
157+
flux_future_t *f = NULL;
139158

159+
/* Don't expect jobspec to be set here as it is not currently defined
160+
* in RFC 27. However, add it anyway in case the hello protocol
161+
* evolves to include it. If it is not set, it must be looked up.
162+
*/
140163
if (flux_msg_unpack (msg,
141-
"{s:I s:i s:i s:f}",
164+
"{s:I s:i s:i s:f s?o}",
142165
"id", &id,
143166
"priority", &prio,
144167
"userid", &uid,
145-
"t_submit", &ts) < 0) {
168+
"t_submit", &ts,
169+
"jobspec", &jobspec) < 0) {
146170
flux_log_error (h, "%s: flux_msg_unpack", __FUNCTION__);
147171
goto out;
148172
}
149-
150-
if ( (o = json_loads (R, 0, &err)) == NULL) {
151-
rc = -1;
152-
errno = EPROTO;
153-
flux_log (h, LOG_ERR, "%s: parsing R for job (id=%jd): %s %s@%d:%d",
154-
__FUNCTION__, static_cast<intmax_t> (id),
155-
err.text, err.source, err.line, err.column);
156-
goto out;
173+
if (!jobspec) {
174+
char key[64] = { 0 };
175+
if (flux_job_kvs_key (key, sizeof (key), id, "jobspec") < 0
176+
|| !(f = flux_kvs_lookup (h, NULL, 0, key))
177+
|| flux_kvs_lookup_get_unpack (f, "o", &jobspec) < 0) {
178+
flux_log_error (h, "%s", key);
179+
goto out;
180+
}
157181
}
158-
if ( (rc = json_unpack (o, "{ s?:{s?:{s?:{s?:s}}} }",
159-
"attributes",
160-
"system",
161-
"scheduler",
162-
"queue", &qn_attr)) < 0) {
163-
json_decref (o);
164-
errno = EPROTO;
165-
flux_log (h, LOG_ERR, "%s: json_unpack for attributes", __FUNCTION__);
182+
if (json_unpack (jobspec,
183+
"{s?{s?{s?s}}}",
184+
"attributes",
185+
"system",
186+
"queue", &qn_attr) < 0) {
187+
flux_log_error (h, "error parsing jobspec");
166188
goto out;
167189
}
168-
169-
queue_name = qn_attr? qn_attr : ctx->opts.get_opt ().get_default_queue_name ();
170-
json_decref (o);
190+
if (qn_attr)
191+
queue_name = qn_attr;
192+
else
193+
queue_name = ctx->opts.get_opt ().get_default_queue_name ();
171194
queue = ctx->queues.at (queue_name);
172195
running_job = std::make_shared<job_t> (job_state_kind_t::RUNNING,
173196
id, uid, calc_priority (prio),
@@ -183,6 +206,7 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
183206
queue_name.c_str (), static_cast<intmax_t> (id));
184207

185208
out:
209+
flux_future_destroy (f);
186210
return rc;
187211
}
188212

0 commit comments

Comments
 (0)