Skip to content

Commit 5993420

Browse files
authored
Merge pull request #6197 from garlick/free_final_flag
job-manager: add final flag to sched.free
2 parents 823417f + b1f14fe commit 5993420

File tree

5 files changed

+108
-59
lines changed

5 files changed

+108
-59
lines changed

src/modules/job-manager/alloc.c

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,16 +98,20 @@ static void interface_teardown (struct alloc *alloc, char *s, int errnum)
9898

9999
/* Send sched.free request.
100100
*/
101-
int free_request (struct alloc *alloc, flux_jobid_t id, json_t *R)
101+
int free_request (struct alloc *alloc,
102+
flux_jobid_t id,
103+
json_t *R,
104+
bool final)
102105
{
103106
flux_msg_t *msg;
104107

105108
if (!(msg = flux_request_encode ("sched.free", NULL)))
106109
return -1;
107110
if (flux_msg_pack (msg,
108-
"{s:I s:O}",
111+
"{s:I s:O s:b}",
109112
"id", id,
110-
"R", R) < 0)
113+
"R", R,
114+
"final", final) < 0)
111115
goto error;
112116
if (flux_send (alloc->ctx->h, msg, 0) < 0)
113117
goto error;
@@ -180,7 +184,7 @@ static void alloc_response_cb (flux_t *h,
180184
(void)json_object_del (R, "scheduling");
181185

182186
if (!job) {
183-
(void)free_request (alloc, id, R);
187+
(void)free_request (alloc, id, R, true);
184188
break;
185189
}
186190
if (job_priority_queue_delete (alloc->sent, job) < 0)
@@ -486,10 +490,13 @@ static void check_cb (flux_reactor_t *r,
486490
NULL);
487491
}
488492

489-
int alloc_send_free_request (struct alloc *alloc, json_t *R, flux_jobid_t id)
493+
int alloc_send_free_request (struct alloc *alloc,
494+
json_t *R,
495+
flux_jobid_t id,
496+
bool final)
490497
{
491498
if (alloc->scheduler_is_online) {
492-
if (free_request (alloc, id, R) < 0)
499+
if (free_request (alloc, id, R, final) < 0)
493500
return -1;
494501
}
495502
return 0;

src/modules/job-manager/alloc.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ int alloc_pending_count (struct alloc *alloc);
5050

5151
/* Release resources back to the scheduler.
5252
*/
53-
int alloc_send_free_request (struct alloc *alloc, json_t *R, flux_jobid_t id);
53+
int alloc_send_free_request (struct alloc *alloc,
54+
json_t *R,
55+
flux_jobid_t id,
56+
bool final);
5457

5558
/* List pending jobs
5659
*/

src/modules/job-manager/housekeeping.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,16 +227,19 @@ static void allocation_release (struct allocation *a)
227227
struct idset *ranks = NULL;
228228
struct rlist *rl = NULL;
229229
json_t *R = NULL;
230+
bool final = false;
230231

231232
if ((ranks = get_housekept_ranks (a)) && idset_count (ranks) == 0) {
232233
idset_destroy (ranks);
233234
return; // nothing to do
234235
}
236+
if (idset_empty (a->pending))
237+
final = true;
235238

236239
if (!ranks
237240
|| !(rl = rlist_copy_ranks (a->rl, ranks))
238241
|| !(R = rlist_to_R (rl))
239-
|| alloc_send_free_request (ctx->alloc, R, a->id) < 0
242+
|| alloc_send_free_request (ctx->alloc, R, a->id, final) < 0
240243
|| rlist_remove_ranks (a->rl, ranks) < 0) {
241244
char *s = idset_encode (ranks, IDSET_FLAG_RANGE);
242245
flux_log (ctx->h,
@@ -457,7 +460,7 @@ int housekeeping_start (struct housekeeping *hk,
457460
}
458461
return 0;
459462
skip:
460-
return alloc_send_free_request (hk->ctx->alloc, R, id);
463+
return alloc_send_free_request (hk->ctx->alloc, R, id, true);
461464
}
462465

463466
static int housekeeping_hello_respond_one (struct housekeeping *hk,

src/modules/sched-simple/sched.c

Lines changed: 83 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ jobreq_create (const flux_msg_t *msg)
113113
if (job == NULL)
114114
return NULL;
115115

116-
if (flux_msg_unpack (msg, "{s:I s:i s:i s:f s:o}",
116+
if (flux_msg_unpack (msg,
117+
"{s:I s:i s:i s:f s:o}",
117118
"id", &job->id,
118119
"priority", &job->priority,
119120
"userid", &job->uid,
@@ -133,8 +134,8 @@ jobreq_create (const flux_msg_t *msg)
133134
if (json_unpack (jobspec,
134135
"{s:{s?{s?O}}}",
135136
"attributes",
136-
"system",
137-
"constraints", &job->constraints) < 0) {
137+
"system",
138+
"constraints", &job->constraints) < 0) {
138139
job->errnum = errno;
139140
goto err;
140141
}
@@ -253,15 +254,14 @@ static int try_alloc (flux_t *h, struct simple_sched *ss)
253254
flux_log_error (h, "try_alloc: rlist_free");
254255
rlist_destroy (alloc);
255256
alloc = NULL;
256-
} else if (errno == ENOSPC)
257+
}
258+
else if (errno == ENOSPC)
257259
return rc;
258260
else if (errno == EOVERFLOW)
259261
note = "unsatisfiable request";
260262
else if (fail_alloc)
261263
note = "DEBUG_FAIL_ALLOC";
262-
if (schedutil_alloc_respond_deny (ss->util_ctx,
263-
job->msg,
264-
note) < 0)
264+
if (schedutil_alloc_respond_deny (ss->util_ctx, job->msg, note) < 0)
265265
flux_log_error (h, "schedutil_alloc_respond_deny");
266266
goto out;
267267
}
@@ -272,9 +272,9 @@ static int try_alloc (flux_t *h, struct simple_sched *ss)
272272
R,
273273
"{ s:{s:s s:n s:n} }",
274274
"sched",
275-
"resource_summary", s,
276-
"reason_pending",
277-
"jobs_ahead") < 0)
275+
"resource_summary", s,
276+
"reason_pending",
277+
"jobs_ahead") < 0)
278278
flux_log_error (h, "schedutil_alloc_respond_success_pack");
279279

280280
flux_log (h, LOG_DEBUG, "alloc: %s: %s", idf58 (job->id), s);
@@ -310,8 +310,10 @@ static void annotate_reason_pending (struct simple_sched *ss)
310310
}
311311
}
312312

313-
static void prep_cb (flux_reactor_t *r, flux_watcher_t *w,
314-
int revents, void *arg)
313+
static void prep_cb (flux_reactor_t *r,
314+
flux_watcher_t *w,
315+
int revents,
316+
void *arg)
315317
{
316318
struct simple_sched *ss = arg;
317319
/* if there is at least one job to schedule, start check and idle */
@@ -322,8 +324,10 @@ static void prep_cb (flux_reactor_t *r, flux_watcher_t *w,
322324
}
323325
}
324326

325-
static void check_cb (flux_reactor_t *r, flux_watcher_t *w,
326-
int revents, void *arg)
327+
static void check_cb (flux_reactor_t *r,
328+
flux_watcher_t *w,
329+
int revents,
330+
void *arg)
327331
{
328332
struct simple_sched *ss = arg;
329333
flux_watcher_stop (ss->idle);
@@ -339,7 +343,11 @@ static void check_cb (flux_reactor_t *r, flux_watcher_t *w,
339343
}
340344
}
341345

342-
static int try_free (flux_t *h, struct simple_sched *ss, json_t *R)
346+
static int try_free (flux_t *h,
347+
struct simple_sched *ss,
348+
flux_jobid_t id,
349+
json_t *R,
350+
bool final)
343351
{
344352
int rc = -1;
345353
char *r = NULL;
@@ -354,8 +362,14 @@ static int try_free (flux_t *h, struct simple_sched *ss, json_t *R)
354362
r = rlist_dumps (alloc);
355363
if ((rc = rlist_free (ss->rlist, alloc)) < 0)
356364
flux_log_error (h, "free: %s", r);
357-
else
358-
flux_log (h, LOG_DEBUG, "free: %s", r);
365+
else {
366+
flux_log (h,
367+
LOG_DEBUG,
368+
"free: %s %s%s",
369+
r,
370+
idf58 (id),
371+
final ? " (final)" : "");
372+
}
359373
free (r);
360374
rlist_destroy (alloc);
361375
return rc;
@@ -365,13 +379,20 @@ void free_cb (flux_t *h, const flux_msg_t *msg, const char *R_str, void *arg)
365379
{
366380
struct simple_sched *ss = arg;
367381
json_t *R;
382+
flux_jobid_t id;
383+
int final = 0;
368384

369-
if (flux_request_unpack (msg, NULL, "{s:o}", "R", &R) < 0) {
385+
if (flux_request_unpack (msg,
386+
NULL,
387+
"{s:I s:o s?b}",
388+
"id", &id,
389+
"R", &R,
390+
"final", &final) < 0) {
370391
flux_log (h, LOG_ERR, "free: error unpacking sched.free request");
371392
return;
372393
}
373394

374-
if (try_free (h, ss, R) < 0) {
395+
if (try_free (h, ss, id, R, final) < 0) {
375396
flux_log_error (h, "free: could not free R");
376397
return;
377398
}
@@ -393,7 +414,8 @@ static void alloc_cb (flux_t *h, const flux_msg_t *msg, void *arg)
393414

394415
if (ss->alloc_limit
395416
&& zlistx_size (ss->queue) >= ss->alloc_limit) {
396-
flux_log (h, LOG_ERR,
417+
flux_log (h,
418+
LOG_ERR,
397419
"alloc received above max concurrency: %d",
398420
ss->alloc_limit);
399421
errno = EINVAL;
@@ -411,14 +433,18 @@ static void alloc_cb (flux_t *h, const flux_msg_t *msg, void *arg)
411433
jobreq_destroy (job);
412434
return;
413435
}
414-
flux_log (h, LOG_DEBUG, "req: %s: spec={%d,%d,%d} duration=%.1f",
415-
idf58 (job->id), job->jj.nnodes,
416-
job->jj.nslots, job->jj.slot_size,
417-
job->jj.duration);
436+
437+
flux_log (h,
438+
LOG_DEBUG,
439+
"req: %s: spec={%d,%d,%d} duration=%.1f",
440+
idf58 (job->id),
441+
job->jj.nnodes,
442+
job->jj.nslots,
443+
job->jj.slot_size,
444+
job->jj.duration);
445+
418446
search_dir = job->priority > FLUX_JOB_URGENCY_DEFAULT;
419-
job->handle = zlistx_insert (ss->queue,
420-
job,
421-
search_dir);
447+
job->handle = zlistx_insert (ss->queue, job, search_dir);
422448
flux_watcher_start (ss->prep);
423449
return;
424450
err:
@@ -430,9 +456,7 @@ static void alloc_cb (flux_t *h, const flux_msg_t *msg, void *arg)
430456
* If a matching job found in queue, respond to the alloc request
431457
* and "dequeue" it.
432458
*/
433-
static void cancel_cb (flux_t *h,
434-
const flux_msg_t *msg,
435-
void *arg)
459+
static void cancel_cb (flux_t *h, const flux_msg_t *msg, void *arg)
436460
{
437461
struct simple_sched *ss = arg;
438462
flux_jobid_t id;
@@ -457,9 +481,7 @@ static void cancel_cb (flux_t *h,
457481
* matching job found in queue, update the priority and reorder queue
458482
* as necessary.
459483
*/
460-
static void prioritize_cb (flux_t *h,
461-
const flux_msg_t *msg,
462-
void *arg)
484+
static void prioritize_cb (flux_t *h, const flux_msg_t *msg, void *arg)
463485
{
464486
static int min_sort_size = 4;
465487
struct simple_sched *ss = arg;
@@ -520,7 +542,8 @@ static int hello_cb (flux_t *h,
520542
uint32_t userid;
521543
double t_submit;
522544

523-
if (flux_msg_unpack (msg, "{s:I s:i s:i s:f}",
545+
if (flux_msg_unpack (msg,
546+
"{s:I s:i s:i s:f}",
524547
"id", &id,
525548
"priority", &priority,
526549
"userid", &userid,
@@ -529,7 +552,8 @@ static int hello_cb (flux_t *h,
529552
return -1;
530553
}
531554

532-
flux_log (h, LOG_DEBUG,
555+
flux_log (h,
556+
LOG_DEBUG,
533557
"hello: id=%s priority=%u userid=%u t_submit=%0.1f",
534558
idf58 (id),
535559
priority,
@@ -594,7 +618,9 @@ static void status_cb (flux_t *h, flux_msg_handler_t *mh,
594618
}
595619
rlist_destroy (rl);
596620

597-
if (flux_respond_pack (h, msg, "{s:o s:o s:o}",
621+
if (flux_respond_pack (h,
622+
msg,
623+
"{s:o s:o s:o}",
598624
"all", all,
599625
"allocated", alloc,
600626
"down", down) < 0)
@@ -622,14 +648,16 @@ static void feasibility_cb (flux_t *h,
622648
const char *errmsg = NULL;
623649
flux_error_t error;
624650

625-
if (flux_request_unpack (msg, NULL, "{s:o}",
651+
if (flux_request_unpack (msg,
652+
NULL,
653+
"{s:o}",
626654
"jobspec", &jobspec) < 0)
627655
goto err;
628656
if (json_unpack (jobspec,
629657
"{s:{s?{s?o}}}",
630658
"attributes",
631-
"system",
632-
"constraints", &constraints) < 0)
659+
"system",
660+
"constraints", &constraints) < 0)
633661
goto err;
634662

635663
if (jj_get_counts_json (jobspec, &jj) < 0) {
@@ -701,9 +729,7 @@ static void expiration_cb (flux_t *h,
701729
errno = EINVAL;
702730
goto err;
703731
}
704-
if (flux_module_debug_test (ss->h,
705-
DEBUG_EXPIRATION_UPDATE_DENY,
706-
false)) {
732+
if (flux_module_debug_test (ss->h, DEBUG_EXPIRATION_UPDATE_DENY, false)) {
707733
errmsg = "Rejecting expiration update for testing";
708734
goto err;
709735
}
@@ -722,7 +748,8 @@ static int ss_resource_update (struct simple_sched *ss, flux_future_t *f)
722748
double expiration = -1.;
723749
const char *s;
724750

725-
int rc = flux_rpc_get_unpack (f, "{s?s s?s s?F}",
751+
int rc = flux_rpc_get_unpack (f,
752+
"{s?s s?s s?F}",
726753
"up", &up,
727754
"down", &down,
728755
"expiration", &expiration);
@@ -760,7 +787,8 @@ static void acquire_continuation (flux_future_t *f, void *arg)
760787
{
761788
struct simple_sched *ss = arg;
762789
if (flux_future_get (f, NULL) < 0) {
763-
flux_log (ss->h, LOG_ERR,
790+
flux_log (ss->h,
791+
LOG_ERR,
764792
"exiting due to resource update failure: %s",
765793
future_strerror (f, errno));
766794
flux_reactor_stop (flux_get_reactor (ss->h));
@@ -780,7 +808,8 @@ static int ss_acquire_resources (flux_t *h, struct simple_sched *ss)
780808
json_t *R;
781809
json_error_t e;
782810

783-
if (!(f = flux_rpc (h, "resource.acquire",
811+
if (!(f = flux_rpc (h,
812+
"resource.acquire",
784813
NULL,
785814
FLUX_NODEID_ANY,
786815
FLUX_RPC_STREAMING))) {
@@ -789,7 +818,9 @@ static int ss_acquire_resources (flux_t *h, struct simple_sched *ss)
789818
}
790819
ss->acquire_f = f;
791820
if (flux_rpc_get_unpack (f, "{s:o}", "resources", &R) < 0) {
792-
flux_log (h, LOG_ERR, "resource.acquire failed: %s",
821+
flux_log (h,
822+
LOG_ERR,
823+
"resource.acquire failed: %s",
793824
future_strerror (f, errno));
794825
goto out;
795826
}
@@ -846,8 +877,12 @@ static int simple_sched_init (flux_t *h, struct simple_sched *ss)
846877
goto out;
847878
}
848879
s = rlist_dumps (ss->rlist);
849-
flux_log (h, LOG_DEBUG, "ready: %d of %d cores: %s",
850-
ss->rlist->avail, ss->rlist->total, s);
880+
flux_log (h,
881+
LOG_DEBUG,
882+
"ready: %d of %d cores: %s",
883+
ss->rlist->avail,
884+
ss->rlist->total,
885+
s);
851886
free (s);
852887
rc = 0;
853888
out:

0 commit comments

Comments
 (0)