|
| 1 | +/************************************************************\ |
| 2 | + * Copyright 2023 Lawrence Livermore National Security, LLC |
| 3 | + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) |
| 4 | + * |
| 5 | + * This file is part of the Flux resource manager framework. |
| 6 | + * For details, see https://github.com/flux-framework. |
| 7 | + * |
| 8 | + * SPDX-License-Identifier: LGPL-3.0 |
| 9 | +\************************************************************/ |
| 10 | + |
| 11 | +/* alloc-check.c - plugin to ensure resources are never double booked |
| 12 | + * |
| 13 | + * A fatal exception is raised on jobs that are granted resources already |
| 14 | + * granted to another. |
| 15 | + * |
| 16 | + * In order to be sure that the exception can be raised before a short job |
| 17 | + * becomes inactive, R is looked up in the KVS synchronously, causing the |
| 18 | + * job manager to be briefly unresponsive. Hence, this plugin is primarily |
| 19 | + * suited for debug/test situations. |
| 20 | + * |
| 21 | + * N.B. This plugin does not account for any jobs that might already have |
| 22 | + * allocations when the plugin is loaded. |
| 23 | + */ |
| 24 | + |
| 25 | +#if HAVE_CONFIG_H |
| 26 | +#include "config.h" |
| 27 | +#endif |
| 28 | + |
| 29 | +#include <jansson.h> |
| 30 | +#include <flux/core.h> |
| 31 | +#include <flux/jobtap.h> |
| 32 | + |
| 33 | +#include "ccan/str/str.h" |
| 34 | +#include "src/common/librlist/rlist.h" |
| 35 | +#include "src/common/libjob/idf58.h" |
| 36 | + |
| 37 | +#define PLUGIN_NAME "alloc-check" |
| 38 | +static const char *auxname = PLUGIN_NAME "::resdb"; |
| 39 | + |
| 40 | +/* Start out with empty resource set. Add resources on job.event.alloc |
| 41 | + * (scheduler has allocated resources to job). Subtract resources on |
| 42 | + * job.event.free (job manager has returned resources to the scheduler). |
| 43 | + */ |
| 44 | +struct resdb { |
| 45 | + struct rlist *allocated; |
| 46 | +}; |
| 47 | + |
| 48 | +static void resdb_destroy (struct resdb *resdb) |
| 49 | +{ |
| 50 | + if (resdb) { |
| 51 | + int saved_errno = errno; |
| 52 | + rlist_destroy (resdb->allocated); |
| 53 | + free (resdb); |
| 54 | + errno = saved_errno; |
| 55 | + } |
| 56 | +} |
| 57 | + |
| 58 | +static struct resdb *resdb_create (void) |
| 59 | +{ |
| 60 | + struct resdb *resdb; |
| 61 | + |
| 62 | + if (!(resdb = calloc (1, sizeof (*resdb)))) |
| 63 | + return NULL; |
| 64 | + if (!(resdb->allocated = rlist_create())) { |
| 65 | + free (resdb); |
| 66 | + errno = ENOMEM; |
| 67 | + return NULL; |
| 68 | + } |
| 69 | + return resdb; |
| 70 | +} |
| 71 | + |
| 72 | +/* Generate the kvs path to R for a given job |
| 73 | + */ |
| 74 | +static int res_makekey (flux_jobid_t id, char *buf, size_t size) |
| 75 | +{ |
| 76 | + char dir[128]; |
| 77 | + if (flux_job_id_encode (id, "kvs", dir, sizeof (dir)) < 0) |
| 78 | + return -1; |
| 79 | + if (snprintf (buf, size, "%s.R", dir) >= size) { |
| 80 | + errno = EOVERFLOW; |
| 81 | + return -1; |
| 82 | + } |
| 83 | + return 0; |
| 84 | +} |
| 85 | + |
| 86 | +/* Synchronously look up R for a given job and convert it to an rlist object |
| 87 | + * which the caller must destroy with rlist_destroy(). |
| 88 | + */ |
| 89 | +static struct rlist *res_lookup (flux_t *h, flux_jobid_t id) |
| 90 | +{ |
| 91 | + char key[128]; |
| 92 | + flux_future_t *f = NULL; |
| 93 | + const char *R; |
| 94 | + struct rlist *rlist; |
| 95 | + |
| 96 | + if (res_makekey (id, key, sizeof (key)) < 0 |
| 97 | + || !(f = flux_kvs_lookup (h, NULL, 0, key)) |
| 98 | + || flux_kvs_lookup_get (f, &R) < 0 |
| 99 | + || !(rlist = rlist_from_R (R))) { |
| 100 | + flux_future_destroy (f); |
| 101 | + return NULL; |
| 102 | + } |
| 103 | + flux_future_destroy (f); |
| 104 | + return rlist; |
| 105 | +} |
| 106 | + |
| 107 | +static int jobtap_cb (flux_plugin_t *p, |
| 108 | + const char *topic, |
| 109 | + flux_plugin_arg_t *args, |
| 110 | + void *arg) |
| 111 | +{ |
| 112 | + struct resdb *resdb = flux_plugin_aux_get (p, auxname); |
| 113 | + flux_t *h = flux_jobtap_get_flux (p); |
| 114 | + flux_jobid_t id; |
| 115 | + |
| 116 | + if (flux_plugin_arg_unpack (args, |
| 117 | + FLUX_PLUGIN_ARG_IN, |
| 118 | + "{s:I}", |
| 119 | + "id", &id) < 0) { |
| 120 | + flux_log (h, |
| 121 | + LOG_ERR, |
| 122 | + "%s %s: unpack: %s", |
| 123 | + PLUGIN_NAME, |
| 124 | + topic, |
| 125 | + flux_plugin_arg_strerror (args)); |
| 126 | + return -1; |
| 127 | + } |
| 128 | + /* job.event.* callbacks are not received unless subscribed on a per-job |
| 129 | + * basis, so subscribe to them in the job.new callback. |
| 130 | + */ |
| 131 | + if (streq (topic, "job.new")) { |
| 132 | + if (flux_jobtap_job_subscribe (p, id) < 0) { |
| 133 | + flux_log_error (h, |
| 134 | + "%s(%s) %s: subscribe", |
| 135 | + PLUGIN_NAME, |
| 136 | + idf58 (id), |
| 137 | + topic); |
| 138 | + } |
| 139 | + } |
| 140 | + /* Look up R that was just allocated to the job and attach it to the job |
| 141 | + * aux container so we don't have to look it up again on free. Call |
| 142 | + * rlist_append() to add the resources to resdb->allocated. If that |
| 143 | + * fails, some resources are already allocated so raise a fatal exception |
| 144 | + * on the job. |
| 145 | + */ |
| 146 | + else if (streq (topic, "job.event.alloc")) { |
| 147 | + struct rlist *R; |
| 148 | + if (!(R = res_lookup (h, id)) |
| 149 | + || flux_jobtap_job_aux_set (p, |
| 150 | + id, |
| 151 | + PLUGIN_NAME "::R", |
| 152 | + R, |
| 153 | + (flux_free_f)rlist_destroy) < 0) { |
| 154 | + flux_log_error (h, |
| 155 | + "%s(%s) %s: failed to lookup or cache R", |
| 156 | + PLUGIN_NAME, |
| 157 | + idf58 (id), |
| 158 | + topic); |
| 159 | + rlist_destroy (R); |
| 160 | + return -1; |
| 161 | + } |
| 162 | + if (rlist_append (resdb->allocated, R) < 0) { |
| 163 | + flux_jobtap_raise_exception (p, |
| 164 | + id, |
| 165 | + "alloc-check", |
| 166 | + 0, |
| 167 | + "resources already allocated"); |
| 168 | + } |
| 169 | + } |
| 170 | + /* Get R that was just freed from the job's aux container and remove it |
| 171 | + * from resdb->allocated. Any jobs that had allocations before the module |
| 172 | + * will not have the R aux item, so silently return success in that case. |
| 173 | + */ |
| 174 | + else if (streq (topic, "job.event.free")) { |
| 175 | + struct rlist *R = flux_jobtap_job_aux_get (p, id, PLUGIN_NAME "::R"); |
| 176 | + if (R) { |
| 177 | + struct rlist *diff; |
| 178 | + if (!(diff = rlist_diff (resdb->allocated, R))) { |
| 179 | + flux_log_error (h, |
| 180 | + "%s(%s) %s: rlist_diff", |
| 181 | + PLUGIN_NAME, |
| 182 | + idf58 (id), |
| 183 | + topic); |
| 184 | + return -1; |
| 185 | + } |
| 186 | + rlist_destroy (resdb->allocated); |
| 187 | + resdb->allocated = diff; |
| 188 | + } |
| 189 | + } |
| 190 | + return 0; |
| 191 | +} |
| 192 | + |
| 193 | +static const struct flux_plugin_handler tab[] = { |
| 194 | + { "job.event.alloc", jobtap_cb, NULL }, |
| 195 | + { "job.event.free", jobtap_cb, NULL }, |
| 196 | + { "job.new", jobtap_cb, NULL }, |
| 197 | + { 0 } |
| 198 | +}; |
| 199 | + |
| 200 | +int flux_plugin_init (flux_plugin_t *p) |
| 201 | +{ |
| 202 | + struct resdb *resdb; |
| 203 | + |
| 204 | + if (!(resdb = resdb_create ()) |
| 205 | + || flux_plugin_aux_set (p, |
| 206 | + auxname, |
| 207 | + resdb, |
| 208 | + (flux_free_f)resdb_destroy) < 0) { |
| 209 | + resdb_destroy (resdb); |
| 210 | + return -1; |
| 211 | + } |
| 212 | + return flux_plugin_register (p, "alloc-check", tab); |
| 213 | +} |
| 214 | + |
| 215 | +// vi:ts=4 sw=4 expandtab |
0 commit comments