Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 58 additions & 26 deletions qmanager/modules/qmanager_callbacks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,30 @@ int qmanager_cb_t::post_sched_loop (flux_t *h, schedutil_t *schedutil,
return rc;
}

/* The RFC 27 hello handshake occurs during scheduler initialization. Its
* purpose is to inform the scheduler of jobs that already have resources
* allocated. This callback is made once per job. The callback should return
* 0 on success or -1 on failure. On failure, the job manager raises a fatal
* exception on the job.
*
* Jobs that already have resources at hello need to be assigned to the correct
* qmanager queue, but the queue is not provided in the hello metadata.
* Therefore, jobspec is fetched from the KVS so that attributes.system.queue
* can be extracted from it.
*
* Note that fluxion instantiates the "default" queue when no named queues
* are configured. Therefore, when the queue attribute is not defined, we
* put the job in the default queue.
*
* Fail the job if its queue attribute (or lack thereof) no longer matches a
* valid queue. This can occur if queues have been reconfigured since job
* submission.
*/
int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
const char *R, void *arg)

{
int rc = 0;
json_t *o = NULL;
json_error_t err;
int rc = -1;
std::string R_out;
char *qn_attr = NULL;
std::string queue_name;
Expand All @@ -130,53 +147,68 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
unsigned int prio;
uint32_t uid;
double ts;
json_t *jobspec = NULL;
flux_future_t *f = NULL;

/* Don't expect jobspec to be set here as it is not currently defined
* in RFC 27. However, add it anyway in case the hello protocol
* evolves to include it. If it is not set, it must be looked up.
*/
if (flux_msg_unpack (msg,
"{s:I s:i s:i s:f}",
"{s:I s:i s:i s:f s?o}",
"id", &id,
"priority", &prio,
"userid", &uid,
"t_submit", &ts) < 0) {
"t_submit", &ts,
"jobspec", &jobspec) < 0) {
flux_log_error (h, "%s: flux_msg_unpack", __FUNCTION__);
goto out;
}

if ( (o = json_loads (R, 0, &err)) == NULL) {
rc = -1;
errno = EPROTO;
flux_log (h, LOG_ERR, "%s: parsing R for job (id=%jd): %s %s@%d:%d",
__FUNCTION__, static_cast<intmax_t> (id),
err.text, err.source, err.line, err.column);
if (!jobspec) {
char key[64] = { 0 };
if (flux_job_kvs_key (key, sizeof (key), id, "jobspec") < 0
|| !(f = flux_kvs_lookup (h, NULL, 0, key))
|| flux_kvs_lookup_get_unpack (f, "o", &jobspec) < 0) {
flux_log_error (h, "%s", key);
goto out;
}
}
if (json_unpack (jobspec,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really need a json path wrapper for things like this. Definitely not in this PR but mentioning it to remind myself.

"{s?{s?{s?s}}}",
"attributes",
"system",
"queue", &qn_attr) < 0) {
flux_log_error (h, "error parsing jobspec");
goto out;
}
if ( (rc = json_unpack (o, "{ s?:{s?:{s?:{s?:s}}} }",
"attributes",
"system",
"scheduler",
"queue", &qn_attr)) < 0) {
json_decref (o);
errno = EPROTO;
flux_log (h, LOG_ERR, "%s: json_unpack for attributes", __FUNCTION__);
if (qn_attr)
queue_name = qn_attr;
else
queue_name = ctx->opts.get_opt ().get_default_queue_name ();
if (ctx->queues.find (queue_name) == ctx->queues.end ()) {
flux_log (h,
LOG_ERR,
"%s: unknown queue name (id=%jd queue=%s)",
__FUNCTION__,
static_cast<intmax_t> (id),
queue_name.c_str ());
goto out;
}

queue_name = qn_attr? qn_attr : ctx->opts.get_opt ().get_default_queue_name ();
json_decref (o);
queue = ctx->queues.at (queue_name);
running_job = std::make_shared<job_t> (job_state_kind_t::RUNNING,
id, uid, calc_priority (prio),
ts, R);

if ( (rc = queue->reconstruct (static_cast<void *> (h),
running_job, R_out)) < 0) {
if (queue->reconstruct (static_cast<void *> (h), running_job, R_out) < 0) {
flux_log_error (h, "%s: reconstruct (id=%jd queue=%s)", __FUNCTION__,
static_cast<intmax_t> (id), queue_name.c_str ());
goto out;
}
flux_log (h, LOG_DEBUG, "requeue success (queue=%s id=%jd)",
queue_name.c_str (), static_cast<intmax_t> (id));

rc = 0;
out:
flux_future_destroy (f);
return rc;
}

Expand Down
64 changes: 41 additions & 23 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,8 @@ static int track_schedule_info (std::shared_ptr<resource_ctx_t> &ctx,
}

static int parse_R (std::shared_ptr<resource_ctx_t> &ctx, const char *R,
std::string &jgf, int64_t &starttime, uint64_t &duration)
std::string &R_graph_fmt, int64_t &starttime, uint64_t &duration,
std::string &format)
{
int rc = 0;
int version = 0;
Expand Down Expand Up @@ -1605,20 +1606,22 @@ static int parse_R (std::shared_ptr<resource_ctx_t> &ctx, const char *R,
static_cast<intmax_t> (st), static_cast<intmax_t> (et));
goto freemem_out;
}
if (graph == NULL) {
rc = -1;
errno = ENOENT;
flux_log (ctx->h, LOG_ERR, "%s: no scheduling key in R", __FUNCTION__);
goto freemem_out;
}
if ( !(jgf_str = json_dumps (graph, JSON_INDENT (0)))) {
rc = -1;
errno = ENOMEM;
flux_log (ctx->h, LOG_ERR, "%s: json_dumps", __FUNCTION__);
goto freemem_out;
if (graph != NULL) {
if ( !(jgf_str = json_dumps (graph, JSON_INDENT (0)))) {
rc = -1;
errno = ENOMEM;
flux_log (ctx->h, LOG_ERR, "%s: json_dumps", __FUNCTION__);
goto freemem_out;
}
R_graph_fmt = jgf_str;
free (jgf_str);
format = "jgf";
} else {
// Use the rv1exec reader
R_graph_fmt = R;
format = "rv1exec";
}
jgf = jgf_str;
free (jgf_str);

starttime = static_cast<int64_t> (st);
duration = et - st;

Expand Down Expand Up @@ -1710,18 +1713,33 @@ static int run (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
}

static int run (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
const std::string &jgf, int64_t at, uint64_t duration)
const std::string &R, int64_t at, uint64_t duration,
std::string &format)
{
int rc = 0;
dfu_traverser_t &tr = *(ctx->traverser);
std::shared_ptr<resource_reader_base_t> rd;
if ((rd = create_resource_reader ("jgf")) == nullptr) {
if (format == "jgf") {
if ((rd = create_resource_reader ("jgf")) == nullptr) {
rc = -1;
flux_log (ctx->h, LOG_ERR, "%s: create JGF reader (id=%jd)",
__FUNCTION__, static_cast<intmax_t> (jobid));
goto out;
}
} else if (format == "rv1exec") {
if ((rd = create_resource_reader ("rv1exec")) == nullptr) {
rc = -1;
flux_log (ctx->h, LOG_ERR, "%s: create rv1exec reader (id=%jd)",
__FUNCTION__, static_cast<intmax_t> (jobid));
goto out;
}
} else {
rc = -1;
flux_log (ctx->h, LOG_ERR, "%s: create_resource_reader (id=%jd)",
__FUNCTION__, static_cast<intmax_t> (jobid));
flux_log (ctx->h, LOG_ERR, "%s: create rv1exec reader (id=%jd)",
__FUNCTION__, static_cast<intmax_t> (jobid));
goto out;
}
if ((rc = tr.run (jgf, ctx->writers, rd, jobid, at, duration)) < 0) {
if ((rc = tr.run (R, ctx->writers, rd, jobid, at, duration)) < 0) {
flux_log (ctx->h, LOG_ERR, "%s: dfu_traverser_t::run (id=%jd): %s",
__FUNCTION__, static_cast<intmax_t> (jobid),
ctx->traverser->err_message ().c_str ());
Expand Down Expand Up @@ -1791,15 +1809,15 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
uint64_t duration = 0;
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::duration<double> elapsed;
std::string jgf;
std::string R2;
std::string R_graph_fmt;
std::string format;

start = std::chrono::system_clock::now ();
if ( (rc = parse_R (ctx, R, jgf, at, duration)) < 0) {
if ( (rc = parse_R (ctx, R, R_graph_fmt, at, duration, format)) < 0) {
flux_log_error (ctx->h, "%s: parsing R", __FUNCTION__);
goto done;
}
if ( (rc = run (ctx, jobid, jgf, at, duration)) < 0) {
if ( (rc = run (ctx, jobid, R_graph_fmt, at, duration, format)) < 0) {
flux_log_error (ctx->h, "%s: run", __FUNCTION__);
goto done;
}
Expand Down
Loading