diff --git a/qmanager/modules/qmanager_callbacks.cpp b/qmanager/modules/qmanager_callbacks.cpp index 8723134e2..8257266d0 100644 --- a/qmanager/modules/qmanager_callbacks.cpp +++ b/qmanager/modules/qmanager_callbacks.cpp @@ -113,13 +113,30 @@ int qmanager_cb_t::post_sched_loop (flux_t *h, schedutil_t *schedutil, return rc; } +/* The RFC 27 hello handshake occurs during scheduler initialization. Its + * purpose is to inform the scheduler of jobs that already have resources + * allocated. This callback is made once per job. The callback should return + * 0 on success or -1 on failure. On failure, the job manager raises a fatal + * exception on the job. + * + * Jobs that already have resources at hello need to be assigned to the correct + * qmanager queue, but the queue is not provided in the hello metadata. + * Therefore, jobspec is fetched from the KVS so that attributes.system.queue + * can be extracted from it. + * + * Note that fluxion instantiates the "default" queue when no named queues + * are configured. Therefore, when the queue attribute is not defined, we + * put the job in the default queue. + * + * Fail the job if its queue attribute (or lack thereof) no longer matches a + * valid queue. This can occur if queues have been reconfigured since job + * submission. + */ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const char *R, void *arg) { - int rc = 0; - json_t *o = NULL; - json_error_t err; + int rc = -1; std::string R_out; char *qn_attr = NULL; std::string queue_name; @@ -130,53 +147,68 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, unsigned int prio; uint32_t uid; double ts; + json_t *jobspec = NULL; + flux_future_t *f = NULL; + /* Don't expect jobspec to be set here as it is not currently defined + * in RFC 27. However, add it anyway in case the hello protocol + * evolves to include it. If it is not set, it must be looked up. + */ if (flux_msg_unpack (msg, - "{s:I s:i s:i s:f}", + "{s:I s:i s:i s:f s?o}", "id", &id, "priority", &prio, "userid", &uid, - "t_submit", &ts) < 0) { + "t_submit", &ts, + "jobspec", &jobspec) < 0) { flux_log_error (h, "%s: flux_msg_unpack", __FUNCTION__); goto out; } - - if ( (o = json_loads (R, 0, &err)) == NULL) { - rc = -1; - errno = EPROTO; - flux_log (h, LOG_ERR, "%s: parsing R for job (id=%jd): %s %s@%d:%d", - __FUNCTION__, static_cast (id), - err.text, err.source, err.line, err.column); + if (!jobspec) { + char key[64] = { 0 }; + if (flux_job_kvs_key (key, sizeof (key), id, "jobspec") < 0 + || !(f = flux_kvs_lookup (h, NULL, 0, key)) + || flux_kvs_lookup_get_unpack (f, "o", &jobspec) < 0) { + flux_log_error (h, "%s", key); + goto out; + } + } + if (json_unpack (jobspec, + "{s?{s?{s?s}}}", + "attributes", + "system", + "queue", &qn_attr) < 0) { + flux_log_error (h, "error parsing jobspec"); goto out; } - if ( (rc = json_unpack (o, "{ s?:{s?:{s?:{s?:s}}} }", - "attributes", - "system", - "scheduler", - "queue", &qn_attr)) < 0) { - json_decref (o); - errno = EPROTO; - flux_log (h, LOG_ERR, "%s: json_unpack for attributes", __FUNCTION__); + if (qn_attr) + queue_name = qn_attr; + else + queue_name = ctx->opts.get_opt ().get_default_queue_name (); + if (ctx->queues.find (queue_name) == ctx->queues.end ()) { + flux_log (h, + LOG_ERR, + "%s: unknown queue name (id=%jd queue=%s)", + __FUNCTION__, + static_cast (id), + queue_name.c_str ()); goto out; } - - queue_name = qn_attr? qn_attr : ctx->opts.get_opt ().get_default_queue_name (); - json_decref (o); queue = ctx->queues.at (queue_name); running_job = std::make_shared (job_state_kind_t::RUNNING, id, uid, calc_priority (prio), ts, R); - if ( (rc = queue->reconstruct (static_cast (h), - running_job, R_out)) < 0) { + if (queue->reconstruct (static_cast (h), running_job, R_out) < 0) { flux_log_error (h, "%s: reconstruct (id=%jd queue=%s)", __FUNCTION__, static_cast (id), queue_name.c_str ()); goto out; } flux_log (h, LOG_DEBUG, "requeue success (queue=%s id=%jd)", queue_name.c_str (), static_cast (id)); - + rc = 0; out: + flux_future_destroy (f); return rc; } diff --git a/resource/modules/resource_match.cpp b/resource/modules/resource_match.cpp index 9142965f4..fb4606239 100644 --- a/resource/modules/resource_match.cpp +++ b/resource/modules/resource_match.cpp @@ -1568,7 +1568,8 @@ static int track_schedule_info (std::shared_ptr &ctx, } static int parse_R (std::shared_ptr &ctx, const char *R, - std::string &jgf, int64_t &starttime, uint64_t &duration) + std::string &R_graph_fmt, int64_t &starttime, uint64_t &duration, + std::string &format) { int rc = 0; int version = 0; @@ -1605,20 +1606,22 @@ static int parse_R (std::shared_ptr &ctx, const char *R, static_cast (st), static_cast (et)); goto freemem_out; } - if (graph == NULL) { - rc = -1; - errno = ENOENT; - flux_log (ctx->h, LOG_ERR, "%s: no scheduling key in R", __FUNCTION__); - goto freemem_out; - } - if ( !(jgf_str = json_dumps (graph, JSON_INDENT (0)))) { - rc = -1; - errno = ENOMEM; - flux_log (ctx->h, LOG_ERR, "%s: json_dumps", __FUNCTION__); - goto freemem_out; + if (graph != NULL) { + if ( !(jgf_str = json_dumps (graph, JSON_INDENT (0)))) { + rc = -1; + errno = ENOMEM; + flux_log (ctx->h, LOG_ERR, "%s: json_dumps", __FUNCTION__); + goto freemem_out; + } + R_graph_fmt = jgf_str; + free (jgf_str); + format = "jgf"; + } else { + // Use the rv1exec reader + R_graph_fmt = R; + format = "rv1exec"; } - jgf = jgf_str; - free (jgf_str); + starttime = static_cast (st); duration = et - st; @@ -1710,18 +1713,33 @@ static int run (std::shared_ptr &ctx, int64_t jobid, } static int run (std::shared_ptr &ctx, int64_t jobid, - const std::string &jgf, int64_t at, uint64_t duration) + const std::string &R, int64_t at, uint64_t duration, + std::string &format) { int rc = 0; dfu_traverser_t &tr = *(ctx->traverser); std::shared_ptr rd; - if ((rd = create_resource_reader ("jgf")) == nullptr) { + if (format == "jgf") { + if ((rd = create_resource_reader ("jgf")) == nullptr) { + rc = -1; + flux_log (ctx->h, LOG_ERR, "%s: create JGF reader (id=%jd)", + __FUNCTION__, static_cast (jobid)); + goto out; + } + } else if (format == "rv1exec") { + if ((rd = create_resource_reader ("rv1exec")) == nullptr) { + rc = -1; + flux_log (ctx->h, LOG_ERR, "%s: create rv1exec reader (id=%jd)", + __FUNCTION__, static_cast (jobid)); + goto out; + } + } else { rc = -1; - flux_log (ctx->h, LOG_ERR, "%s: create_resource_reader (id=%jd)", - __FUNCTION__, static_cast (jobid)); + flux_log (ctx->h, LOG_ERR, "%s: create rv1exec reader (id=%jd)", + __FUNCTION__, static_cast (jobid)); goto out; } - if ((rc = tr.run (jgf, ctx->writers, rd, jobid, at, duration)) < 0) { + if ((rc = tr.run (R, ctx->writers, rd, jobid, at, duration)) < 0) { flux_log (ctx->h, LOG_ERR, "%s: dfu_traverser_t::run (id=%jd): %s", __FUNCTION__, static_cast (jobid), ctx->traverser->err_message ().c_str ()); @@ -1791,15 +1809,15 @@ static int run_update (std::shared_ptr &ctx, int64_t jobid, uint64_t duration = 0; std::chrono::time_point start; std::chrono::duration elapsed; - std::string jgf; - std::string R2; + std::string R_graph_fmt; + std::string format; start = std::chrono::system_clock::now (); - if ( (rc = parse_R (ctx, R, jgf, at, duration)) < 0) { + if ( (rc = parse_R (ctx, R, R_graph_fmt, at, duration, format)) < 0) { flux_log_error (ctx->h, "%s: parsing R", __FUNCTION__); goto done; } - if ( (rc = run (ctx, jobid, jgf, at, duration)) < 0) { + if ( (rc = run (ctx, jobid, R_graph_fmt, at, duration, format)) < 0) { flux_log_error (ctx->h, "%s: run", __FUNCTION__); goto done; } diff --git a/resource/readers/resource_reader_rv1exec.cpp b/resource/readers/resource_reader_rv1exec.cpp index 8ff14f3dd..0d466919f 100644 --- a/resource/readers/resource_reader_rv1exec.cpp +++ b/resource/readers/resource_reader_rv1exec.cpp @@ -222,6 +222,301 @@ int resource_reader_rv1exec_t::add_cluster_vertex (resource_graph_t &g, return 0; } +vtx_t resource_reader_rv1exec_t::find_vertex (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t parent, int64_t id, + const std::string &subsys, + const std::string &type, + const std::string &basename, + const std::string &name, + int size, int rank) +{ + bool is_root = false; + std::string path = ""; + std::string vtx_name = ""; + vtx_t null_vtx = boost::graph_traits::null_vertex (); + + // Get properties of the vertex + if (parent == boost::graph_traits::null_vertex ()) + is_root = true; + + std::string idstr = (id != -1)? std::to_string (id) : ""; + std::string prefix = is_root ? "" : g[parent].paths[subsys]; + vtx_name = (name != "")? name : basename + idstr; + path = prefix + "/" + vtx_name; + + // Search graph metadata for vertex + const auto &vtx_iter = m.by_path.find (path); + // Not found; return null_vertex + if (vtx_iter == m.by_path.end ()) + return null_vtx; + // Found in by_path + for (vtx_t v : vtx_iter->second) { + if (g[v].rank == rank + && g[v].id == id + && g[v].size == size + && g[v].type == type) { + return v; + } + } + + return null_vtx; +} + +int resource_reader_rv1exec_t::update_vertex (resource_graph_t &g, + vtx_t vtx, + updater_data &update_data) +{ + + int rc = -1; + int64_t span = -1; + int64_t avail = -1; + planner_t *plans = NULL; + + // Check and update plan + if ( (plans = g[vtx].schedule.plans) == NULL) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": plan for " + g[vtx].name + " is null.\n"; + goto error; + } + if ( (avail = planner_avail_resources_during (plans, + update_data.at, + update_data.duration)) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": planner_avail_resource_during return -1 for "; + m_err_msg += g[vtx].name + ".\n"; + goto error; + } + if (avail < g[vtx].size) { + // if g[v] has already been allocated/reserved, this is an error + m_err_msg += __FUNCTION__; + m_err_msg += ": " + g[vtx].name + " is unavailable.\n"; + goto error; + } + // Update the vertex plan here (not in traverser code). + // Traverser update () will handle aggregate filters and + // exclusivity checking filter. + // Can't update the rank-level (node) vertex yet- we + // don't know if all its children are allocated. + // Note this is a hard-coded option. Support for more flexible + // types may require extending rv1exec. + if (g[vtx].type == "node") + return 0; + // Name is anything besides node + if ( (span = planner_add_span (plans, + update_data.at, + update_data.duration, + static_cast (g[vtx].size))) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": can't add span into " + g[vtx].name + ".\n"; + goto error; + } + + if (update_data.reserved) + g[vtx].schedule.reservations[update_data.jobid] = span; + else + g[vtx].schedule.allocations[update_data.jobid] = span; + + update_data.updated_vertices[g[vtx].rank].push_back (vtx); + + rc = 0; + +error: + return rc; +} + +int resource_reader_rv1exec_t::undo_vertices (resource_graph_t &g, + updater_data &update_data) +{ + + int rc = -1; + int64_t span = -1; + planner_t *plans = NULL; + + for (auto &[rank, vertices] : update_data.updated_vertices) { + for (vtx_t vtx : vertices) { + // Check plan + if ( (plans = g[vtx].schedule.plans) == NULL) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": plan for " + g[vtx].name + " is null.\n"; + goto error; + } + // Remove job tags + if (update_data.reserved) { + span = g[vtx].schedule.reservations.at (update_data.jobid); + g[vtx].schedule.reservations.erase (update_data.jobid); + } else { + span = g[vtx].schedule.allocations.at (update_data.jobid); + g[vtx].schedule.allocations.erase (update_data.jobid); + } + // Remove the span. + if (planner_rem_span (plans, span) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": can't remove span from " + g[vtx].name + ".\n"; + goto error; + } + } + } + + rc = 0; + +error: + return rc; +} + +int resource_reader_rv1exec_t::update_edges (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t src, vtx_t dst, + updater_data &update_data) +{ + edg_t e; + int rc = -1; + bool found = false; + boost::graph_traits::out_edge_iterator ei, ei_end; + + boost::tie (ei, ei_end) = boost::out_edges (src, g); + for (; ei != ei_end; ++ei) { + if (boost::target (*ei, g) == dst) { + e = *ei; + found = true; + break; + } + } + if (!found) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": rv1exec edge not found in resource graph.\n"; + goto error; + } + g[e].idata.set_for_trav_update (g[dst].size, + true, update_data.token); + + return 0; + +error: + return -1; +} + +int resource_reader_rv1exec_t::update_exclusivity (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t vtx, + updater_data &update_data) +{ + // idata tag and exclusive checker update + int64_t span = -1; + planner_t *plans = NULL; + + const auto &rank_ex = update_data.updated_vertices.find (g[vtx].rank); + if (rank_ex == update_data.updated_vertices.end ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": rank not found in agfilters map.\n"; + return -1; + } + // This check enforces a rigid constraint on rank equivalence + // between graph initialization and rank in rv1exec string. + const auto &by_rank = m.by_rank.find (g[vtx].rank); + if (by_rank == m.by_rank.end ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": rank not found in by_rank graph map.\n"; + return -1; + } + + // If all child vertices allocated, allocate this vertex. + // Subtract one since the current node hasn't been added to the + // updated_vertices map. + if (rank_ex->second.size () != (by_rank->second.size () - 1)) + return 0; + // Counts indicate exclusive + if ( (plans = g[vtx].schedule.plans) == NULL) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": plan for " + g[vtx].name + " is null.\n"; + return -1; + } + // Update the vertex plan here (not in traverser code). + // Traverser update () will handle aggregate filters and + // exclusivity checking filter. + if ( (span = planner_add_span (plans, update_data.at, update_data.duration, + static_cast (g[vtx].size))) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": can't add span into " + g[vtx].name + ".\n"; + return -1; + } + // Add job tags + if (update_data.reserved) + g[vtx].schedule.reservations[update_data.jobid] = span; + else + g[vtx].schedule.allocations[update_data.jobid] = span; + // Add to the updated vertices vector to undo upon error. + update_data.updated_vertices[g[vtx].rank].push_back (vtx); + + return 0; +} + +vtx_t resource_reader_rv1exec_t::add_or_update (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t parent, int64_t id, + const std::string &subsys, + const std::string &type, + const std::string &basename, + const std::string &name, + int size, int rank, + std::map &properties, + updater_data &update_data) +{ + vtx_t vtx; + vtx_t null_vtx = boost::graph_traits::null_vertex (); + + if (!update_data.update) { + vtx = find_vertex (g, m, parent, id, subsys, type, basename, + name, size, rank); + // Shouldn't be found + if (vtx != boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": found duplicate vertex in graph for "; + m_err_msg += name + ".\n"; + return null_vtx; + } + // Add resources + vtx = add_vertex (g, m, parent, id, subsys, type, basename, + name, properties, size, rank); + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed to add vertex for "; + m_err_msg += name + ".\n"; + return null_vtx; + } + if (add_edges (g, m, parent, vtx, subsys, + "contains", "in") < 0) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed to add edges for "; + m_err_msg += name + ".\n"; + return null_vtx; + } + } else { + // Update resources + vtx = find_vertex (g, m, parent, id, subsys, type, basename, + name, size, rank); + // Not found + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": couldn't find vertex in graph for "; + m_err_msg += name + ".\n"; + return null_vtx; + } + if (update_vertex (g, vtx, update_data) == -1) + return null_vtx; + // Must be the containment subsystem + if (update_edges (g, m, parent, vtx, update_data) == -1) + return null_vtx; + } + + return vtx; +} + int resource_reader_rv1exec_t::build_rmap (json_t *rlite, std::map &rmap) { @@ -350,7 +645,8 @@ int resource_reader_rv1exec_t::unpack_child (resource_graph_t &g, const char *resource_ids, unsigned rank, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { int rc = -1; unsigned id; @@ -358,36 +654,39 @@ int resource_reader_rv1exec_t::unpack_child (resource_graph_t &g, if (!resource_type || !resource_ids) { errno = EINVAL; - goto ret; + goto error; } if ( !(ids = idset_decode (resource_ids))) - goto ret; + goto error; id = idset_first (ids); while (id != IDSET_INVALID_ID) { edg_t e; - vtx_t v; + vtx_t vtx; std::string name = resource_type + std::to_string (id); - std::map p; + std::map properties; if (pmap.find (rank) != pmap.end ()) { if (pmap[rank].exist (resource_type)) { - if (pmap[rank].copy (resource_type, p) < 0) - goto ret; + if (pmap[rank].copy (resource_type, properties) < 0) + goto error; } } - v = add_vertex (g, m, parent, id, - "containment", resource_type, - resource_type, name, p, 1, rank); - if (v == boost::graph_traits::null_vertex ()) - goto ret; - if (add_edges (g, m, parent, v, "containment", "contains", "in") < 0) - goto ret; + // Returns the added or updated vertex; null_vertex on error. + vtx = add_or_update (g, m, parent, id, "containment", resource_type, + resource_type, name, 1, rank, properties, + update_data); + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed unpacking child for "; + m_err_msg += name + ".\n"; + goto error; + } id = idset_next (ids, id); } rc = 0; -ret: +error: idset_destroy (ids); return rc; } @@ -399,7 +698,8 @@ int resource_reader_rv1exec_t::unpack_children (resource_graph_t &g, json_t *children, unsigned rank, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { json_t *res_ids = nullptr; const char *res_type = nullptr; @@ -415,7 +715,8 @@ int resource_reader_rv1exec_t::unpack_children (resource_graph_t &g, goto error; } const char *ids_str = json_string_value (res_ids); - if (unpack_child (g, m, parent, res_type, ids_str, rank, pmap) < 0) + if (unpack_child (g, m, parent, res_type, ids_str, rank, pmap, + update_data) < 0) goto error; } return 0; @@ -432,11 +733,12 @@ int resource_reader_rv1exec_t::unpack_rank (resource_graph_t &g, struct hostlist *hlist, std::map &rmap, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { edg_t e; - vtx_t v; - int64_t iden; + vtx_t vtx; + int64_t id; const char *hostname = nullptr; std::string basename; std::map properties; @@ -452,7 +754,7 @@ int resource_reader_rv1exec_t::unpack_rank (resource_graph_t &g, if ( !(hostname = hostlist_nth (hlist, static_cast (rmap[rank])))) goto error; - if (get_hostname_suffix (hostname, iden) < 0 + if (get_hostname_suffix (hostname, id) < 0 || get_host_basename (hostname, basename) < 0) { m_err_msg += __FUNCTION__; m_err_msg += ": error splitting hostname="; @@ -465,18 +767,29 @@ int resource_reader_rv1exec_t::unpack_rank (resource_graph_t &g, goto error; } } - - // Create and add a node vertex and link with cluster vertex - v = add_vertex (g, m, parent, iden, "containment", - "node", basename, hostname, properties, 1, rank); - if (v == boost::graph_traits::null_vertex ()) - goto error; - if (add_edges (g, m, parent, v, "containment", "contains", "in") < 0) + // Returns the added or updated vertex; null_vtertex on error. + vtx = add_or_update (g, m, parent, id, "containment", "node", basename, + hostname, 1, rank, properties, update_data); + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed unpacking rank for "; + m_err_msg += std::string (hostname) + ".\n"; goto error; + } // Unpack children node-local resources - if (unpack_children (g, m, v, children, rank, pmap) < 0) + if (unpack_children (g, m, vtx, children, rank, pmap, update_data) < 0) goto error; + if (update_data.update) { + // Update the rank's planner if all children allocated + if (update_exclusivity (g, m, vtx, update_data) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": exclusive filter update failed for "; + m_err_msg += std::string (hostname) + ".\n"; + goto error; + } + } + return 0; error: @@ -491,7 +804,8 @@ int resource_reader_rv1exec_t::unpack_rlite_entry (resource_graph_t &g, std::map &rmap, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { int rc = -1; unsigned rank; @@ -501,29 +815,30 @@ int resource_reader_rv1exec_t::unpack_rlite_entry (resource_graph_t &g, if (!entry || !hlist) { errno = EINVAL; - goto ret; + goto error; } if (json_unpack (entry, "{s:s s:o}", "rank", &ranks, "children", &children) < 0) { errno = EINVAL; - goto ret; + goto error; } if ( !(r_ids = idset_decode (ranks))) - goto ret; + goto error; rank = idset_first (r_ids); while (rank != IDSET_INVALID_ID) { - if (unpack_rank (g, m, parent, rank, children, hlist, rmap, pmap) < 0) - goto ret; + if (unpack_rank (g, m, parent, rank, children, hlist, rmap, pmap, + update_data) < 0) + goto error; rank = idset_next (r_ids, rank); } rc = 0; -ret: +error: idset_destroy (r_ids); return rc; } @@ -534,7 +849,8 @@ int resource_reader_rv1exec_t::unpack_rlite (resource_graph_t &g, struct hostlist *hlist, std::map &rmap, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { size_t index; vtx_t cluster_vtx; @@ -551,9 +867,12 @@ int resource_reader_rv1exec_t::unpack_rlite (resource_graph_t &g, } cluster_vtx = m.roots["containment"]; + // Set the cluster "needs" and make the update shared access to the cluster + m.v_rt_edges["containment"].set_for_trav_update (g[cluster_vtx].size, + false, update_data.token); json_array_foreach (rlite, index, entry) { if (unpack_rlite_entry (g, m, cluster_vtx, - entry, hlist, rmap, pmap) < 0) + entry, hlist, rmap, pmap, update_data) < 0) goto error; } return 0; @@ -564,7 +883,8 @@ int resource_reader_rv1exec_t::unpack_rlite (resource_graph_t &g, int resource_reader_rv1exec_t::unpack_internal (resource_graph_t &g, resource_graph_metadata_t &m, - json_t *rv1) + json_t *rv1, + updater_data &update_data) { int rc = -1; int version; @@ -612,7 +932,7 @@ int resource_reader_rv1exec_t::unpack_internal (resource_graph_t &g, if (hostlist_append (hlist, hlist_str) < 0) goto ret; } - if (unpack_rlite (g, m, rlite, hlist, rmap, pmap) < 0) + if (unpack_rlite (g, m, rlite, hlist, rmap, pmap, update_data) < 0) goto ret; rc = 0; @@ -642,6 +962,9 @@ int resource_reader_rv1exec_t::unpack (resource_graph_t &g, json_error_t error; json_t *rv1 = nullptr; int saved_errno; + updater_data null_data; + // Indicate adding, not updating + null_data.update = false; if (str == "") { errno = EINVAL; @@ -654,7 +977,8 @@ int resource_reader_rv1exec_t::unpack (resource_graph_t &g, errno = ENOMEM; goto ret; } - rc = unpack_internal (g, m, rv1); + + rc = unpack_internal (g, m, rv1, null_data); ret: saved_errno = errno; @@ -682,12 +1006,42 @@ int resource_reader_rv1exec_t::remove_subgraph (resource_graph_t &g, int resource_reader_rv1exec_t::update (resource_graph_t &g, resource_graph_metadata_t &m, - const std::string &str, int64_t jobid, + const std::string &R, int64_t jobid, int64_t at, uint64_t dur, bool rsv, uint64_t token) { - errno = ENOTSUP; // RV1Exec reader currently does not support update - return -1; + int rc = -1; + json_error_t error; + json_t *rv1 = nullptr; + int saved_errno; + updater_data update_data; + + if (R == "") { + errno = EINVAL; + goto ret; + } + + if ( !(rv1 = json_loads (R.c_str (), 0, &error))) { + errno = ENOMEM; + goto ret; + } + + update_data.update = true; + update_data.jobid = jobid; + update_data.at = at; + update_data.duration = dur; + update_data.reserved = rsv; + update_data.token = token; + + if ( (rc = unpack_internal (g, m, rv1, update_data)) == -1) { + undo_vertices (g, update_data); + } + +ret: + saved_errno = errno; + json_decref (rv1); + errno = saved_errno; + return rc; } bool resource_reader_rv1exec_t::is_allowlist_supported () diff --git a/resource/readers/resource_reader_rv1exec.hpp b/resource/readers/resource_reader_rv1exec.hpp index 77445918a..2caf9ff4c 100644 --- a/resource/readers/resource_reader_rv1exec.hpp +++ b/resource/readers/resource_reader_rv1exec.hpp @@ -21,6 +21,17 @@ extern "C" { namespace Flux { namespace resource_model { +// Struct to track data for update +struct updater_data { + int64_t jobid = 0; + int64_t at = 0; + uint64_t duration = 0; + bool reserved = false; + uint64_t token = 0; + // track updated vertices to undo upon error + std::map> updated_vertices; + bool update = true; // Updating or adding vertex +}; /*! RV1EXEC resource reader class. */ @@ -120,6 +131,44 @@ class resource_reader_rv1exec_t : public resource_reader_base_t { int add_cluster_vertex (resource_graph_t &g, resource_graph_metadata_t &m); + // Update functions + vtx_t find_vertex (resource_graph_t &g, resource_graph_metadata_t &m, + vtx_t parent, int64_t id, + const std::string &subsys, + const std::string &type, + const std::string &basename, + const std::string &name, + int size, int rank); + + int update_vertex (resource_graph_t &g, + vtx_t vtx, + updater_data &update_data); + + int undo_vertices (resource_graph_t &g, updater_data &update_data); + + int update_edges (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t src, vtx_t dst, + updater_data &update_data); + + int update_exclusivity (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t vtx, + updater_data &update_data); + + // Returns the added or updated vertex; null_vertex on error. + vtx_t add_or_update (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t parent, int64_t id, + const std::string &subsys, + const std::string &type, + const std::string &basename, + const std::string &name, + int size, int rank, + std::map &properties, + updater_data &update_data); + int build_rmap (json_t *rlite, std::map &rmap); int build_pmap (json_t *properties, @@ -129,35 +178,41 @@ class resource_reader_rv1exec_t : public resource_reader_base_t { resource_graph_metadata_t &m, vtx_t parent, const char *resource_type, const char *resource_ids, unsigned rank, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_children (resource_graph_t &g, resource_graph_metadata_t &m, vtx_t parent, json_t *children, unsigned rank, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_rank (resource_graph_t &g, resource_graph_metadata_t &m, vtx_t parent, unsigned rank, json_t *children, struct hostlist *hlist, std::map &rmap, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_rlite_entry (resource_graph_t &g, resource_graph_metadata_t &m, vtx_t parent, json_t *entry, struct hostlist *hlist, std::map &rmap, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_rlite (resource_graph_t &g, resource_graph_metadata_t &m, json_t *rlite, struct hostlist *hlist, std::map &rmap, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_internal (resource_graph_t &g, - resource_graph_metadata_t &m, json_t *rv1); + resource_graph_metadata_t &m, json_t *rv1, + updater_data &update_data); }; } // namespace resource_model diff --git a/resource/utilities/command.cpp b/resource/utilities/command.cpp index 45848c58e..f87236a06 100644 --- a/resource/utilities/command.cpp +++ b/resource/utilities/command.cpp @@ -39,8 +39,8 @@ command_t commands[] = { "allocate_orelse_reserve): " "resource-query> multi-match allocate jobspec1 jobspec2 ..."}, { "update", "u", cmd_update, "Update resources with a JGF subgraph (subcmd: " -"allocate | reserve): " -"resource-query> update allocate jgf_file jobid starttime duration" }, +"allocate | reserve), (reader: jgf | rv1exec): " +"resource-query> update allocate jgf jgf_file jobid starttime duration" }, { "attach", "j", cmd_attach, "Attach a JGF subgraph to the " "resource graph: resource-query> attach jgf_file" }, { "remove", "j", cmd_remove, "Experimental: remove a subgraph to the " @@ -318,7 +318,8 @@ int cmd_match_multi (std::shared_ptr &ctx, static int update_run (std::shared_ptr &ctx, const std::string &fn, const std::string &str, - int64_t id, int64_t at, uint64_t d) + int64_t id, int64_t at, uint64_t d, + const std::string &reader) { int rc = -1; double elapse = 0.0f; @@ -326,9 +327,16 @@ static int update_run (std::shared_ptr &ctx, struct timeval st, et; std::shared_ptr rd; - if ( (rd = create_resource_reader ("jgf")) == nullptr) { - std::cerr << "ERROR: can't create JGF reader " << std::endl; - return -1; + if (reader == "jgf") { + if ( (rd = create_resource_reader ("jgf")) == nullptr) { + std::cerr << "ERROR: can't create JGF reader " << std::endl; + return -1; + } + } else { + if ( (rd = create_resource_reader ("rv1exec")) == nullptr) { + std::cerr << "ERROR: can't create rv1exec reader " << std::endl; + return -1; + } } gettimeofday (&st, NULL); @@ -360,26 +368,31 @@ static int update (std::shared_ptr &ctx, int64_t at = 0; int64_t jobid = 0; std::string subcmd = args[1]; + std::string reader = args[2]; std::stringstream buffer{}; if (!(subcmd == "allocate" || subcmd == "reserve")) { std::cerr << "ERROR: unknown subcmd " << args[1] << std::endl; return -1; } - std::ifstream jgf_file (args[2]); + if (!(reader == "jgf" || reader == "rv1exec")) { + std::cerr << "ERROR: unsupported reader " << args[2] << std::endl; + return -1; + } + std::ifstream jgf_file (args[3]); if (!jgf_file) { - std::cerr << "ERROR: can't open " << args[2] << std::endl; + std::cerr << "ERROR: can't open " << args[3] << std::endl; return -1; } - jobid = static_cast (std::strtoll (args[3].c_str (), NULL, 10)); + jobid = static_cast (std::strtoll (args[4].c_str (), NULL, 10)); if (ctx->allocations.find (jobid) != ctx->allocations.end () || ctx->reservations.find (jobid) != ctx->reservations.end ()) { std::cerr << "ERROR: existing Jobid " << std::endl; return -1; } - at = static_cast (std::strtoll (args[4].c_str (), NULL, 10)); - d = static_cast (std::strtoll (args[5].c_str (), NULL, 10)); + at = static_cast (std::strtoll (args[5].c_str (), NULL, 10)); + d = static_cast (std::strtoll (args[6].c_str (), NULL, 10)); if (at < 0 || d == 0) { std::cerr << "ERROR: invalid time (" << at << ", " << d << ")" << std::endl; @@ -389,14 +402,14 @@ static int update (std::shared_ptr &ctx, buffer << jgf_file.rdbuf (); jgf_file.close (); - return update_run (ctx, args[2], buffer.str (), jobid, at, d); + return update_run (ctx, args[3], buffer.str (), jobid, at, d, reader); } int cmd_update (std::shared_ptr &ctx, std::vector &args) { try { - if (args.size () != 6) { + if (args.size () != 7) { std::cerr << "ERROR: malformed command" << std::endl; return 0; } diff --git a/t/CMakeLists.txt b/t/CMakeLists.txt index 994130237..4fb5f9bb1 100644 --- a/t/CMakeLists.txt +++ b/t/CMakeLists.txt @@ -27,6 +27,7 @@ set(ALL_TESTS t1022-property-constraints.t t1023-multiqueue-constraints.t t1024-alloc-check.t + t1025-rv1-reload.t t3000-jobspec.t t3001-resource-basic.t t3002-resource-prefix.t diff --git a/t/Makefile.am b/t/Makefile.am index af1c3c88e..4714372ac 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -45,6 +45,7 @@ TESTS = \ t1022-property-constraints.t \ t1023-multiqueue-constraints.t \ t1024-alloc-check.t \ + t1025-rv1-reload.t \ t3000-jobspec.t \ t3001-resource-basic.t \ t3002-resource-prefix.t \ diff --git a/t/data/resource/expected/update/014.R.out b/t/data/resource/expected/update/014.R.out new file mode 100644 index 000000000..c3b352436 --- /dev/null +++ b/t/data/resource/expected/update/014.R.out @@ -0,0 +1,58 @@ + ---------core0[1:x] + ---------core1[1:x] + ---------core2[1:x] + ---------core3[1:x] + ---------core4[1:x] + ---------core5[1:x] + ---------core6[1:x] + ---------core7[1:x] + ---------core8[1:x] + ---------core9[1:x] + ---------core10[1:x] + ---------core11[1:x] + ---------core12[1:x] + ---------core13[1:x] + ---------core14[1:x] + ---------core15[1:x] + ---------gpu0[1:x] + ---------gpu1[1:x] + ---------gpu2[1:x] + ---------gpu3[1:x] + ------node[1:x] + ---cluster0[1:s] +INFO: ============================= +INFO: JOBID=1 +INFO: RESOURCES=ALLOCATED +INFO: SCHEDULED AT=Now +INFO: ============================= + ---------core0[1:x] + ---------core1[1:x] + ---------core2[1:x] + ---------core3[1:x] + ---------core4[1:x] + ---------core5[1:x] + ---------core6[1:x] + ---------core7[1:x] + ---------core8[1:x] + ---------core9[1:x] + ---------core10[1:x] + ---------core11[1:x] + ---------core12[1:x] + ---------core13[1:x] + ---------core14[1:x] + ---------core15[1:x] + ---------gpu0[1:x] + ---------gpu1[1:x] + ---------gpu2[1:x] + ---------gpu3[1:x] + ------node[1:x] + ---cluster0[1:s] +INFO: ============================= +INFO: JOBID=2 +INFO: RESOURCES=ALLOCATED +INFO: SCHEDULED AT=Now +INFO: ============================= + ---cluster0[1:x] +INFO: ============================= +INFO: EXPRESSION="sched-now=free" +INFO: ============================= diff --git a/t/data/resource/expected/update/015.R.out b/t/data/resource/expected/update/015.R.out new file mode 100644 index 000000000..aaf7b2405 --- /dev/null +++ b/t/data/resource/expected/update/015.R.out @@ -0,0 +1,32 @@ + ---------core0[1:x] + ---------core1[1:x] + ---------core2[1:x] + ---------core3[1:x] + ---------core4[1:x] + ---------core5[1:x] + ---------core6[1:x] + ---------core7[1:x] + ---------gpu0[1:x] + ---------gpu1[1:x] + ------node[1:x] + ---------core8[1:x] + ---------core9[1:x] + ---------core10[1:x] + ---------core11[1:x] + ---------core12[1:x] + ---------core13[1:x] + ---------core14[1:x] + ---------core15[1:x] + ---------gpu2[1:x] + ---------gpu3[1:x] + ------node[1:x] + ---cluster0[1:s] +INFO: ============================= +INFO: JOBID=1 +INFO: RESOURCES=ALLOCATED +INFO: SCHEDULED AT=Now +INFO: ============================= + ---cluster0[1:x] +INFO: ============================= +INFO: EXPRESSION="sched-now=free" +INFO: ============================= diff --git a/t/data/resource/expected/update/016.R.out b/t/data/resource/expected/update/016.R.out new file mode 100644 index 000000000..508bc77c3 --- /dev/null +++ b/t/data/resource/expected/update/016.R.out @@ -0,0 +1,62 @@ +INFO: ============================= +INFO: No matching resources found +INFO: JOBID=1 +INFO: ============================= + ---------core0[1:x] + ---------core1[1:x] + ---------core2[1:x] + ---------core3[1:x] + ---------core4[1:x] + ---------core5[1:x] + ---------core6[1:x] + ---------core7[1:x] + ---------core8[1:x] + ---------core9[1:x] + ---------core10[1:x] + ---------core11[1:x] + ---------core12[1:x] + ---------core13[1:x] + ---------core14[1:x] + ---------core15[1:x] + ---------gpu0[1:x] + ---------gpu1[1:x] + ---------gpu2[1:x] + ---------gpu3[1:x] + ------node0[1:x] + ---cluster0[1:s] +INFO: ============================= +INFO: JOBID=2 +INFO: RESOURCES=ALLOCATED +INFO: SCHEDULED AT=Now +INFO: ============================= + ---------core0[1:x] + ---------core1[1:x] + ---------core2[1:x] + ---------core3[1:x] + ---------core4[1:x] + ---------core5[1:x] + ---------core6[1:x] + ---------core7[1:x] + ---------core8[1:x] + ---------core9[1:x] + ---------core10[1:x] + ---------core11[1:x] + ---------core12[1:x] + ---------core13[1:x] + ---------core14[1:x] + ---------core15[1:x] + ---------gpu0[1:x] + ---------gpu1[1:x] + ---------gpu2[1:x] + ---------gpu3[1:x] + ------node1[1:x] + ---cluster0[1:s] +INFO: ============================= +INFO: JOBID=3 +INFO: RESOURCES=ALLOCATED +INFO: SCHEDULED AT=Now +INFO: ============================= + ---cluster0[1:x] +INFO: ============================= +INFO: EXPRESSION="sched-now=free" +INFO: ============================= diff --git a/t/data/resource/jobspecs/update/test007.yaml b/t/data/resource/jobspecs/update/test007.yaml new file mode 100644 index 000000000..98e21a452 --- /dev/null +++ b/t/data/resource/jobspecs/update/test007.yaml @@ -0,0 +1,21 @@ +version: 9999 +resources: + - type: node + count: 1 + with: + - type: slot + count: 1 + label: default + with: + - type: core + count: 1 +# a comment +attributes: + system: + duration: 3600 +tasks: + - command: [ "app" ] + slot: default + count: + per_slot: 1 + diff --git a/t/data/resource/jobspecs/update/test008.yaml b/t/data/resource/jobspecs/update/test008.yaml new file mode 100644 index 000000000..f4504515b --- /dev/null +++ b/t/data/resource/jobspecs/update/test008.yaml @@ -0,0 +1,23 @@ +version: 9999 +resources: + - type: node + count: 1 + with: + - type: slot + count: 1 + label: default + with: + - type: core + count: 1 + - type: gpu + count: 1 +# a comment +attributes: + system: + duration: 3600 +tasks: + - command: [ "app" ] + slot: default + count: + per_slot: 1 + diff --git a/t/data/resource/jobspecs/update/test009.json b/t/data/resource/jobspecs/update/test009.json new file mode 100644 index 000000000..44b047755 --- /dev/null +++ b/t/data/resource/jobspecs/update/test009.json @@ -0,0 +1 @@ +{"version": 1, "execution": {"R_lite": [{"rank": "0", "children": {"core": "0-15", "gpu": "0-3"}}], "nodelist": ["node"], "starttime": 0, "expiration": 1000000}} diff --git a/t/data/resource/jobspecs/update/test010.json b/t/data/resource/jobspecs/update/test010.json new file mode 100644 index 000000000..901802c0a --- /dev/null +++ b/t/data/resource/jobspecs/update/test010.json @@ -0,0 +1 @@ +{"version": 1, "execution": {"R_lite": [{"rank": "1", "children": {"core": "0-15", "gpu": "0-3"}}], "nodelist": ["node"], "starttime": 0, "expiration": 1000000}} diff --git a/t/data/resource/jobspecs/update/test011.json b/t/data/resource/jobspecs/update/test011.json new file mode 100644 index 000000000..eabb67a92 --- /dev/null +++ b/t/data/resource/jobspecs/update/test011.json @@ -0,0 +1 @@ +{"version": 1, "execution": {"R_lite": [{"rank": "0", "children": {"core": "0-7", "gpu": "0-1"}}, {"rank": "1", "children": {"core": "8-15", "gpu": "2-3"}}], "nodelist": ["node", "node"], "starttime": 0, "expiration": 1000000}} diff --git a/t/data/resource/jobspecs/update/test012.json b/t/data/resource/jobspecs/update/test012.json new file mode 100644 index 000000000..4e349976e --- /dev/null +++ b/t/data/resource/jobspecs/update/test012.json @@ -0,0 +1 @@ +{"version": 1, "execution": {"R_lite": [{"rank": "0", "children": {"core": "0-15", "gpu": "0-3", "asic": "0"}}], "nodelist": ["node0"], "starttime": 0, "expiration": 1000000}} diff --git a/t/data/resource/jobspecs/update/test013.json b/t/data/resource/jobspecs/update/test013.json new file mode 100644 index 000000000..355b89846 --- /dev/null +++ b/t/data/resource/jobspecs/update/test013.json @@ -0,0 +1 @@ +{"version": 1, "execution": {"R_lite": [{"rank": "0", "children": {"core": "0-15", "gpu": "0-3"}}], "nodelist": ["node0"], "starttime": 0, "expiration": 1000000}} diff --git a/t/data/resource/jobspecs/update/test014.json b/t/data/resource/jobspecs/update/test014.json new file mode 100644 index 000000000..5e5db17ea --- /dev/null +++ b/t/data/resource/jobspecs/update/test014.json @@ -0,0 +1 @@ +{"version": 1, "execution": {"R_lite": [{"rank": "1", "children": {"core": "0-15", "gpu": "0-3"}}], "nodelist": ["node1"], "starttime": 0, "expiration": 1000000}} diff --git a/t/data/resource/rv1exec/tiny_rv1exec.json b/t/data/resource/rv1exec/tiny_rv1exec.json new file mode 100644 index 000000000..508170303 --- /dev/null +++ b/t/data/resource/rv1exec/tiny_rv1exec.json @@ -0,0 +1 @@ +{"version": 1, "execution": {"R_lite": [{"rank": "0-1", "children": {"core": "0-15", "gpu": "0-3"}}], "nodelist": ["node[0-1]"], "starttime": 0, "expiration": 1000000}} diff --git a/t/data/resource/rv1exec/tiny_rv1exec_2-1.json b/t/data/resource/rv1exec/tiny_rv1exec_2-1.json new file mode 100644 index 000000000..8e2e35896 --- /dev/null +++ b/t/data/resource/rv1exec/tiny_rv1exec_2-1.json @@ -0,0 +1 @@ +{"version": 1, "execution": {"R_lite": [{"rank": "0-1", "children": {"core": "0-15", "gpu": "0-3"}}], "nodelist": ["node", "node"], "starttime": 0, "expiration": 1000000}} diff --git a/t/data/resource/rv1exec/tiny_rv1exec_2-1_split.json b/t/data/resource/rv1exec/tiny_rv1exec_2-1_split.json new file mode 100644 index 000000000..eabb67a92 --- /dev/null +++ b/t/data/resource/rv1exec/tiny_rv1exec_2-1_split.json @@ -0,0 +1 @@ +{"version": 1, "execution": {"R_lite": [{"rank": "0", "children": {"core": "0-7", "gpu": "0-1"}}, {"rank": "1", "children": {"core": "8-15", "gpu": "2-3"}}], "nodelist": ["node", "node"], "starttime": 0, "expiration": 1000000}} diff --git a/t/t1007-recovery-full.t b/t/t1007-recovery-full.t index 68e64bd28..280fbd933 100755 --- a/t/t1007-recovery-full.t +++ b/t/t1007-recovery-full.t @@ -146,7 +146,7 @@ test_expect_success 'recovery: a cancel leads to a job schedule (rv1_nosched)' ' ' # flux-framework/flux-sched#991 -test_expect_failure 'recovery: both modules restart (rv1_nosched->rv1_nosched)' ' +test_expect_success 'recovery: both modules restart (rv1_nosched->rv1_nosched)' ' reload_resource match-format=rv1_nosched policy=high && reload_qmanager && flux module stats sched-fluxion-qmanager && diff --git a/t/t1025-rv1-reload.t b/t/t1025-rv1-reload.t new file mode 100755 index 000000000..49045e636 --- /dev/null +++ b/t/t1025-rv1-reload.t @@ -0,0 +1,139 @@ +#!/bin/sh +# + +test_description='Check that fluxion (rv1_nosched) does not kill jobs on reload' + +. `dirname $0`/sharness.sh + +export FLUX_SCHED_MODULE=none +test_under_flux 1 + +test_expect_success 'configure fluxion with rv1_nosched' ' + cat >config.toml <<-EOT && + [sched-fluxion-resource] + match-format = "rv1_nosched" + EOT + flux config load config.toml +' +test_expect_success 'add testqueue property to rank 0 (for later)' ' + flux resource R | flux R set-property testqueue:0 >R && + flux resource reload R +' +# N.B. double booked jobs get a fatal exception on alloc +test_expect_success 'prepare to detect double booked resources' ' + flux jobtap load alloc-check.so +' +test_expect_success 'load fluxion modules' ' + load_resource && + load_qmanager_sync +' +# +# Ensure jobs keep running across scheduler reload (no queues) +# +test_expect_success 'submit a sleep inf job and wait for alloc' ' + flux submit -n1 --flags=debug --wait-event=alloc sleep inf >job.id +' +test_expect_success 'reload fluxion modules' ' + remove_qmanager && + reload_resource && + load_qmanager +' +test_expect_success 'the job is still running' ' + state=$(flux jobs -n -o {state} $(cat config2.toml <<-EOT && + [sched-fluxion-resource] + match-format = "rv1_nosched" + [queues.testqueue] + requires = ["testqueue"] + EOT + flux config load config2.toml +' +test_expect_success 'reload fluxion modules to get new queue config' ' + remove_qmanager && + reload_resource && + load_qmanager +' +test_expect_success 'start testqueue' ' + flux queue start -q testqueue +' +test_expect_success 'submit a sleep inf job to testqueue and wait for alloc' ' + flux submit -vv --wait-event=alloc -n1 -q testqueue sleep inf >job2.id +' +test_expect_success 'reload fluxion modules' ' + remove_qmanager && + reload_resource && + load_qmanager +' +test_expect_success 'the job is still running' ' + state=$(flux jobs -n -o {state} $(cat job3.id +' +# +# A running job that was submitted to the anon queue should get a fatal +# exception when scheduler is reloaded with testqueue configured. +# +test_expect_success 'configure with testqueue' ' + flux config load config2.toml +' +test_expect_success 'reload fluxion modules' ' + remove_qmanager && + reload_resource && + load_qmanager +' +test_expect_success 'running job received a fatal exception' ' + flux job wait-event -v -t 10s $(cat upd_cmds001 <<-EOF && - update allocate cmds001_aa.json 1 0 3600 - update allocate cmds001_ab.json 2 0 3600 - update allocate cmds001_ac.json 3 0 3600 - update allocate cmds001_ad.json 4 0 3600 + update allocate jgf cmds001_aa.json 1 0 3600 + update allocate jgf cmds001_ab.json 2 0 3600 + update allocate jgf cmds001_ac.json 3 0 3600 + update allocate jgf cmds001_ad.json 4 0 3600 quit EOF ${query} -L ${grugs} -S CA -P high -F jgf -t 001.R.out2 < upd_cmds001 && @@ -47,9 +59,9 @@ EOF cat 002.R.out | grep -v INFO | \ split -l 1 --additional-suffix=".json" - cmds002_ && cat >upd_cmds002 <<-EOF && - update allocate cmds002_aa.json 1 0 3600 + update allocate jgf cmds002_aa.json 1 0 3600 match allocate ${unit_job} - update allocate cmds002_ac.json 3 0 3600 + update allocate jgf cmds002_ac.json 3 0 3600 match allocate ${unit_job} quit EOF @@ -76,16 +88,16 @@ EOF cat 003.R.out | grep -v INFO | \ split -l 1 --additional-suffix=".json" - cmds003_ && cat >upd_cmds003 <<-EOF && - update allocate cmds003_aa.json 1 0 3600 - update allocate cmds003_ab.json 2 0 3600 - update allocate cmds003_ac.json 3 0 3600 - update allocate cmds003_ad.json 4 0 3600 - update reserve cmds003_ae.json 5 3600 3600 - update reserve cmds003_af.json 6 3600 3600 - update reserve cmds003_ag.json 7 3600 3600 - update reserve cmds003_ah.json 8 3600 3600 - update reserve cmds003_ai.json 9 7200 3600 - update reserve cmds003_aj.json 10 7200 3600 + update allocate jgf cmds003_aa.json 1 0 3600 + update allocate jgf cmds003_ab.json 2 0 3600 + update allocate jgf cmds003_ac.json 3 0 3600 + update allocate jgf cmds003_ad.json 4 0 3600 + update reserve jgf cmds003_ae.json 5 3600 3600 + update reserve jgf cmds003_af.json 6 3600 3600 + update reserve jgf cmds003_ag.json 7 3600 3600 + update reserve jgf cmds003_ah.json 8 3600 3600 + update reserve jgf cmds003_ai.json 9 7200 3600 + update reserve jgf cmds003_aj.json 10 7200 3600 quit EOF ${query} -L ${grugs} -S CA -P high -F jgf -t 003.R.out2 < upd_cmds003 && @@ -111,16 +123,16 @@ EOF cat 004.R.out | grep -v INFO | \ split -l 1 --additional-suffix=".json" - cmds004_ && cat >upd_cmds004 <<-EOF && - update allocate cmds004_aa.json 1 0 3600 - update allocate cmds004_ab.json 2 0 3600 - update allocate cmds004_ac.json 3 0 3600 + update allocate jgf cmds004_aa.json 1 0 3600 + update allocate jgf cmds004_ab.json 2 0 3600 + update allocate jgf cmds004_ac.json 3 0 3600 match allocate_orelse_reserve ${unit_job} - update reserve cmds004_ae.json 5 3600 3600 - update reserve cmds004_af.json 6 3600 3600 + update reserve jgf cmds004_ae.json 5 3600 3600 + update reserve jgf cmds004_af.json 6 3600 3600 match allocate_orelse_reserve ${unit_job} - update reserve cmds004_ah.json 8 3600 3600 + update reserve jgf cmds004_ah.json 8 3600 3600 match allocate_orelse_reserve ${unit_job} - update reserve cmds004_aj.json 10 7200 3600 + update reserve jgf cmds004_aj.json 10 7200 3600 quit EOF ${query} -L ${grugs} -S CA -P high -F jgf -t 004.R.out2 < upd_cmds004 && @@ -148,18 +160,18 @@ EOF cat 005.R.out | grep -v INFO | \ split -l 1 --additional-suffix=".json" - cmds005_ && cat >upd_cmds005 <<-EOF && - update allocate cmds005_aa.json 1 0 3600 - update allocate cmds005_ab.json 2 0 3600 - update allocate cmds005_ac.json 3 0 3600 - update allocate cmds005_ad.json 4 0 3600 - update reserve cmds005_ae.json 5 3600 3600 + update allocate jgf cmds005_aa.json 1 0 3600 + update allocate jgf cmds005_ab.json 2 0 3600 + update allocate jgf cmds005_ac.json 3 0 3600 + update allocate jgf cmds005_ad.json 4 0 3600 + update reserve jgf cmds005_ae.json 5 3600 3600 cancel 4 - update reserve cmds005_af.json 6 0 3600 - update reserve cmds005_ag.json 7 3600 3600 - update reserve cmds005_ah.json 8 3600 3600 + update reserve jgf cmds005_af.json 6 0 3600 + update reserve jgf cmds005_ag.json 7 3600 3600 + update reserve jgf cmds005_ah.json 8 3600 3600 cancel 6 - update reserve cmds005_ai.json 9 0 3600 - update reserve cmds005_aj.json 10 3600 3600 + update reserve jgf cmds005_ai.json 9 0 3600 + update reserve jgf cmds005_aj.json 10 3600 3600 quit EOF ${query} -L ${grugs} -S CA -P high -F jgf -t 005.R.out2 < upd_cmds005 && @@ -187,10 +199,10 @@ split -l 1 --additional-suffix=".json" - cmds006_ && match allocate ${unit_job} match allocate ${unit_job} match allocate ${unit_job} - update allocate cmds006_aa.json 1 0 3600 - update allocate cmds006_ab.json 2 0 3600 - update allocate cmds006_ac.json 3 0 3600 - update allocate cmds006_ad.json 4 0 3600 + update allocate jgf cmds006_aa.json 1 0 3600 + update allocate jgf cmds006_ab.json 2 0 3600 + update allocate jgf cmds006_ac.json 3 0 3600 + update allocate jgf cmds006_ad.json 4 0 3600 cancel 1 cancel 3 match allocate ${unit_job} @@ -218,10 +230,10 @@ split -l 1 --additional-suffix=".json" - cmds007_ && match allocate ${unit_job} match allocate ${unit_job} match allocate ${unit_job} - update allocate cmds007_aa.json 5 0 3600 - update allocate cmds007_ab.json 6 0 3600 - update allocate cmds007_ac.json 7 0 3600 - update allocate cmds007_ad.json 8 0 3600 + update allocate jgf cmds007_aa.json 5 0 3600 + update allocate jgf cmds007_ab.json 6 0 3600 + update allocate jgf cmds007_ac.json 7 0 3600 + update allocate jgf cmds007_ad.json 8 0 3600 quit EOF ${query} -L ${grugs} -S CA -P high -F jgf -t 007.R.out2 < upd_cmds007 && @@ -251,7 +263,7 @@ EOF split -l 1 --additional-suffix=".json" - cmds008_ && cat >upd_cmds008 <<-EOF && match allocate ${unit_job} - update allocate cmds008_aa.json 2 0 3600 + update allocate jgf cmds008_aa.json 2 0 3600 match allocate ${unit_job} match allocate ${unit_job} match allocate ${unit_job} @@ -262,7 +274,7 @@ EOF cat 008.R.out2 | grep -v INFO > 008.R.out2.filtered && test_cmp 008.R.out2.filtered 008.R.out.filtered && cat >upd_cmds008.2 <<-EOF && - update allocate cmds008_aa.json 1 0 3600 + update allocate jgf cmds008_aa.json 1 0 3600 match allocate ${unit_job} match allocate ${unit_job} quit @@ -296,10 +308,10 @@ EOF ${query} -L ${grugs} -S CA -P high -F jgf -t 009.2.R.out < cmds009.2 && cat >upd_cmds009 <<-EOF && match allocate ${job3} - update allocate cmds009_aa.json 2 0 3600 - update allocate cmds009_ab.json 3 0 3600 - update allocate cmds009_ac.json 4 0 3600 - update allocate cmds009_ad.json 5 0 3600 + update allocate jgf cmds009_aa.json 2 0 3600 + update allocate jgf cmds009_ab.json 3 0 3600 + update allocate jgf cmds009_ac.json 4 0 3600 + update allocate jgf cmds009_ad.json 5 0 3600 quit EOF ${query} -L ${grugs} -S CA -P high -F jgf -t 009.R.out2 < upd_cmds009 && @@ -319,8 +331,8 @@ EOF cat 010.R.out | grep -v INFO | \ split -l 1 --additional-suffix=".json" - cmds010_ && cat >upd_cmds010 <<-EOF && - update allocate cmds010_aa.json 1 0 3600 - update allocate cmds010_ab.json 2 0 3600 + update allocate jgf cmds010_aa.json 1 0 3600 + update allocate jgf cmds010_ab.json 2 0 3600 quit EOF ${query} -L ${grugs} -S CA -P high -F jgf -t 010.R.out2 < upd_cmds010 && @@ -340,15 +352,97 @@ EOF cat 011.R.out | grep -v INFO | \ split -l 1 --additional-suffix=".json" - cmds011_ && cat >upd_cmds011 <<-EOF && - update allocate cmds001_aa.json 1 0 3600 - update allocate cmds001_ab.json 2 0 3600 - update allocate cmds001_ac.json 3 0 3600 - update allocate cmds001_ad.json 4 0 3600 + update allocate jgf cmds001_aa.json 1 0 3600 + update allocate jgf cmds001_ab.json 2 0 3600 + update allocate jgf cmds001_ac.json 3 0 3600 + update allocate jgf cmds001_ad.json 4 0 3600 quit EOF ${query} -L ${grugs} -S CA -P high -F rv1 -t 011.R.out2 < upd_cmds011 && test_cmp 011.R.out2 011.R.out ' +test012_desc="match-allocate/update-allocate result in same output for rv1exec" +test_expect_success "${test012_desc}" ' + cat >cmds012 <<-EOF && + match allocate ${job5} + match allocate ${job5} + match allocate ${job5} + match allocate ${job5} + quit +EOF + ${query} -L ${rv1exec_graph} -f rv1exec -S CA -P high -F rv1_nosched -t 012.R.out < cmds012 && + cat 012.R.out | grep -v INFO | \ +split -l 1 --additional-suffix=".json" - cmds012_ && + cat >upd_cmds012 <<-EOF && + update allocate rv1exec cmds012_aa.json 1 0 3600 + update allocate rv1exec cmds012_ab.json 2 0 3600 + update allocate rv1exec cmds012_ac.json 3 0 3600 + update allocate rv1exec cmds012_ad.json 4 0 3600 + quit +EOF + ${query} -L ${rv1exec_graph} -f rv1exec -S CA -P high -F rv1_nosched -t 012.R.out2 < upd_cmds012 && + test_cmp 012.R.out2 012.R.out +' + +test013_desc="match-allocate/update-allocate result in same output for rv1exec with gpus" +test_expect_success "${test013_desc}" ' + cat >cmds013 <<-EOF && + match allocate ${job6} + match allocate ${job6} + match allocate ${job6} + match allocate ${job6} + quit +EOF + ${query} -L ${rv1exec_graph} -f rv1exec -S CA -P high -F rv1_nosched -t 013.R.out < cmds013 && + cat 013.R.out | grep -v INFO | \ +split -l 1 --additional-suffix=".json" - cmds013_ && + cat >upd_cmds013 <<-EOF && + update allocate rv1exec cmds013_aa.json 1 0 3600 + update allocate rv1exec cmds013_ab.json 2 0 3600 + update allocate rv1exec cmds013_ac.json 3 0 3600 + update allocate rv1exec cmds013_ad.json 4 0 3600 + quit +EOF + ${query} -L ${rv1exec_graph} -f rv1exec -S CA -P high -F rv1_nosched -t 013.R.out2 < upd_cmds013 && + test_cmp 013.R.out2 013.R.out +' + +test014_desc="update-allocate with rv1exec correctly allocates exclusively with 2 ranks/host" +test_expect_success "${test014_desc}" ' + cat >cmds014 <<-EOF && + update allocate rv1exec ${job7} 1 0 3600 + update allocate rv1exec ${job8} 2 0 3600 + find sched-now=free + quit +EOF + ${query} -L ${rv1exec_graph_2_1} -f rv1exec -S CA -P high -t 014.R.out < cmds014 && + test_cmp 014.R.out ${exp_dir}/014.R.out +' + +test015_desc="update-allocate with rv1exec correctly allocates exclusively with 2 ranks/host and split resources" +test_expect_success "${test015_desc}" ' + cat >cmds015 <<-EOF && + update allocate rv1exec ${job9} 1 0 3600 + find sched-now=free + quit +EOF + ${query} -L ${rv1exec_graph_2_1_split} -f rv1exec -S CA -P high -t 015.R.out < cmds015 && + test_cmp 015.R.out ${exp_dir}/015.R.out +' + +test016_desc="failed update-allocate exhibits correct rollback" +test_expect_success "${test015_desc}" ' + cat >cmds016 <<-EOF && + update allocate rv1exec ${job10} 1 0 3600 + update allocate rv1exec ${job11} 2 0 3600 + update allocate rv1exec ${job12} 3 0 3600 + find sched-now=free + quit +EOF + ${query} -L ${rv1exec_graph} -f rv1exec -S CA -P high -t 016.R.out < cmds016 && + test_cmp 016.R.out ${exp_dir}/016.R.out +' + test_done diff --git a/t/t3023-resource-update2.t b/t/t3023-resource-update2.t index 7193dfa29..40a0a25fb 100755 --- a/t/t3023-resource-update2.t +++ b/t/t3023-resource-update2.t @@ -22,8 +22,8 @@ EOF cat 001.R.out | grep -v INFO | \ split -l 1 --additional-suffix=".json" - cmds001_ && cat >upd_cmds001 <<-EOF && - update allocate cmds001_aa.json 1 0 3600 - update allocate cmds001_ab.json 2 0 3600 + update allocate jgf cmds001_aa.json 1 0 3600 + update allocate jgf cmds001_ab.json 2 0 3600 quit EOF ${query} -L ${grugs} -S CA -P high -F jgf -t 001.R.out2 < upd_cmds001 && @@ -41,8 +41,8 @@ EOF cat 002.R.out | grep -v INFO | \ split -l 1 --additional-suffix=".json" - cmds002_ && cat >upd_cmds002 <<-EOF && - update allocate cmds002_aa.json 1 0 3600 - update allocate cmds002_ab.json 2 0 3600 + update allocate jgf cmds002_aa.json 1 0 3600 + update allocate jgf cmds002_ab.json 2 0 3600 quit EOF ${query} -L ${grugs} -S PA -P high -F jgf -t 002.R.out2 < upd_cmds002 && @@ -60,7 +60,7 @@ EOF cat 003.R.out | grep -v INFO | \ split -l 1 --additional-suffix=".json" - cmds003_ && cat >upd_cmds003 <<-EOF && - update allocate cmds003_aa.json 1 0 3600 + update allocate jgf cmds003_aa.json 1 0 3600 match allocate ${job5} match allocate ${job5} quit @@ -80,7 +80,7 @@ EOF cat 004.R.out | grep -v INFO | \ split -l 1 --additional-suffix=".json" - cmds004_ && cat >upd_cmds004 <<-EOF && - update allocate cmds004_aa.json 1 0 3600 + update allocate jgf cmds004_aa.json 1 0 3600 match allocate ${job5} match allocate ${job5} cancel 1