|
20 | 20 | #include "src/common/libczmqcontainers/czmq_containers.h" |
21 | 21 | #include "src/common/libeventlog/eventlog.h" |
22 | 22 | #include "src/common/libjob/idf58.h" |
23 | | -#include "src/common/libutil/jpath.h" |
24 | 23 | #include "ccan/str/str.h" |
25 | 24 |
|
26 | 25 | #include "job-info.h" |
27 | 26 | #include "lookup.h" |
| 27 | +#include "update.h" |
28 | 28 | #include "allow.h" |
| 29 | +#include "util.h" |
29 | 30 |
|
30 | 31 | struct lookup_ctx { |
31 | 32 | struct info_ctx *ctx; |
@@ -154,29 +155,6 @@ static int lookup_keys (struct lookup_ctx *l) |
154 | 155 | return -1; |
155 | 156 | } |
156 | 157 |
|
157 | | -static void apply_updates_R (struct lookup_ctx *l, |
158 | | - const char *key, |
159 | | - json_t *update_object, |
160 | | - json_t *context) |
161 | | -{ |
162 | | - const char *context_key; |
163 | | - json_t *value; |
164 | | - |
165 | | - json_object_foreach (context, context_key, value) { |
166 | | - /* RFC 21 resource-update event only allows update |
167 | | - * to: |
168 | | - * - expiration |
169 | | - */ |
170 | | - if (streq (context_key, "expiration")) |
171 | | - if (jpath_set (update_object, |
172 | | - "execution.expiration", |
173 | | - value) < 0) |
174 | | - flux_log (l->ctx->h, LOG_INFO, |
175 | | - "%s: failed to update job %s %s", |
176 | | - __FUNCTION__, idf58 (l->id), key); |
177 | | - } |
178 | | -} |
179 | | - |
180 | 158 | static int lookup_current (struct lookup_ctx *l, |
181 | 159 | flux_future_t *fall, |
182 | 160 | const char *key, |
@@ -226,7 +204,7 @@ static int lookup_current (struct lookup_ctx *l, |
226 | 204 | goto error; |
227 | 205 | if (streq (name, update_event_name)) { |
228 | 206 | if (streq (key, "R")) |
229 | | - apply_updates_R (l, key, value_object, context); |
| 207 | + apply_updates_R (l->ctx->h, l->id, key, value_object, context); |
230 | 208 | } |
231 | 209 | } |
232 | 210 |
|
@@ -413,11 +391,148 @@ static int check_to_lookup_eventlog (struct lookup_ctx *l) |
413 | 391 | return 0; |
414 | 392 | } |
415 | 393 |
|
| 394 | +static json_t *get_json_string (json_t *o) |
| 395 | +{ |
| 396 | + char *s = json_dumps (o, JSON_ENCODE_ANY); |
| 397 | + json_t *tmp = NULL; |
| 398 | + /* We assume json is internally valid, thus this is an ENOMEM error */ |
| 399 | + if (!s) { |
| 400 | + errno = ENOMEM; |
| 401 | + goto cleanup; |
| 402 | + } |
| 403 | + if (!(tmp = json_string (s))) { |
| 404 | + errno = ENOMEM; |
| 405 | + goto cleanup; |
| 406 | + } |
| 407 | +cleanup: |
| 408 | + free (s); |
| 409 | + return tmp; |
| 410 | +} |
| 411 | + |
| 412 | +/* returns -1 on error, 1 on cached response returned, 0 on no cache */ |
| 413 | +static int lookup_cached (struct lookup_ctx *l) |
| 414 | +{ |
| 415 | + json_t *current_object = NULL; |
| 416 | + json_t *key; |
| 417 | + const char *key_str; |
| 418 | + int ret, rv = -1; |
| 419 | + |
| 420 | + /* Special optimization, looking for a single updated value that |
| 421 | + * could be cached via an update-watch |
| 422 | + * |
| 423 | + * - Caller must want current / updated value |
| 424 | + * - This lookup is already allowed (i.e. if we have to do a |
| 425 | + * "allow" KVS lookup, there is little benefit to returning the |
| 426 | + * cached value). |
| 427 | + * - The caller only wants one key (i.e. if we have to do lookup |
| 428 | + * on another value anyways, there is little benefit to |
| 429 | + * returning the cached value). |
| 430 | + */ |
| 431 | + |
| 432 | + if (!(l->flags & FLUX_JOB_LOOKUP_CURRENT) |
| 433 | + || !l->allow |
| 434 | + || json_array_size (l->keys) != 1) |
| 435 | + return 0; |
| 436 | + |
| 437 | + key = json_array_get (l->keys, 0); |
| 438 | + if (!key) { |
| 439 | + errno = EINVAL; |
| 440 | + goto cleanup; |
| 441 | + } |
| 442 | + |
| 443 | + key_str = json_string_value (key); |
| 444 | + |
| 445 | + if (!streq (key_str, "R")) |
| 446 | + return 0; |
| 447 | + |
| 448 | + if ((ret = update_watch_get_cached (l->ctx, |
| 449 | + l->id, |
| 450 | + key_str, |
| 451 | + ¤t_object)) < 0) |
| 452 | + goto cleanup; |
| 453 | + |
| 454 | + if (ret) { |
| 455 | + if (l->flags & FLUX_JOB_LOOKUP_JSON_DECODE) { |
| 456 | + if (flux_respond_pack (l->ctx->h, l->msg, "{s:I s:O}", |
| 457 | + "id", l->id, |
| 458 | + key_str, current_object) < 0) { |
| 459 | + flux_log_error (l->ctx->h, "%s: flux_respond", __FUNCTION__); |
| 460 | + goto cleanup; |
| 461 | + } |
| 462 | + rv = 1; |
| 463 | + goto cleanup; |
| 464 | + } |
| 465 | + else { |
| 466 | + json_t *o = get_json_string (current_object); |
| 467 | + if (!o) { |
| 468 | + errno = ENOMEM; |
| 469 | + goto cleanup; |
| 470 | + } |
| 471 | + if (flux_respond_pack (l->ctx->h, l->msg, "{s:I s:O}", |
| 472 | + "id", l->id, |
| 473 | + key_str, o) < 0) { |
| 474 | + json_decref (o); |
| 475 | + flux_log_error (l->ctx->h, "%s: flux_respond", __FUNCTION__); |
| 476 | + goto cleanup; |
| 477 | + } |
| 478 | + rv = 1; |
| 479 | + json_decref (o); |
| 480 | + goto cleanup; |
| 481 | + } |
| 482 | + } |
| 483 | + |
| 484 | + rv = 0; |
| 485 | +cleanup: |
| 486 | + json_decref (current_object); |
| 487 | + return rv; |
| 488 | +} |
| 489 | + |
| 490 | +static int lookup (flux_t *h, |
| 491 | + const flux_msg_t *msg, |
| 492 | + struct info_ctx *ctx, |
| 493 | + flux_jobid_t id, |
| 494 | + json_t *keys, |
| 495 | + int flags) |
| 496 | +{ |
| 497 | + struct lookup_ctx *l = NULL; |
| 498 | + int ret; |
| 499 | + |
| 500 | + if (!(l = lookup_ctx_create (ctx, msg, id, keys, flags))) |
| 501 | + goto error; |
| 502 | + |
| 503 | + if (check_allow (l) < 0) |
| 504 | + goto error; |
| 505 | + |
| 506 | + if ((ret = lookup_cached (l)) < 0) |
| 507 | + goto error; |
| 508 | + |
| 509 | + if (ret) { |
| 510 | + lookup_ctx_destroy (l); |
| 511 | + return 0; |
| 512 | + } |
| 513 | + |
| 514 | + if (check_to_lookup_eventlog (l) < 0) |
| 515 | + goto error; |
| 516 | + |
| 517 | + if (lookup_keys (l) < 0) |
| 518 | + goto error; |
| 519 | + |
| 520 | + if (zlist_append (ctx->lookups, l) < 0) { |
| 521 | + flux_log_error (h, "%s: zlist_append", __FUNCTION__); |
| 522 | + goto error; |
| 523 | + } |
| 524 | + zlist_freefn (ctx->lookups, l, lookup_ctx_destroy, true); |
| 525 | + return 0; |
| 526 | + |
| 527 | +error: |
| 528 | + lookup_ctx_destroy (l); |
| 529 | + return -1; |
| 530 | +} |
| 531 | + |
416 | 532 | void lookup_cb (flux_t *h, flux_msg_handler_t *mh, |
417 | 533 | const flux_msg_t *msg, void *arg) |
418 | 534 | { |
419 | 535 | struct info_ctx *ctx = arg; |
420 | | - struct lookup_ctx *l = NULL; |
421 | 536 | size_t index; |
422 | 537 | json_t *key; |
423 | 538 | json_t *keys; |
@@ -453,30 +568,65 @@ void lookup_cb (flux_t *h, flux_msg_handler_t *mh, |
453 | 568 | } |
454 | 569 | } |
455 | 570 |
|
456 | | - if (!(l = lookup_ctx_create (ctx, msg, id, keys, flags))) |
| 571 | + if (lookup (h, msg, ctx, id, keys, flags) < 0) |
457 | 572 | goto error; |
458 | 573 |
|
459 | | - if (check_allow (l) < 0) |
460 | | - goto error; |
| 574 | + return; |
461 | 575 |
|
462 | | - if (check_to_lookup_eventlog (l) < 0) |
463 | | - goto error; |
| 576 | +error: |
| 577 | + if (flux_respond_error (h, msg, errno, errmsg) < 0) |
| 578 | + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); |
| 579 | +} |
464 | 580 |
|
465 | | - if (lookup_keys (l) < 0) |
| 581 | +/* legacy rpc target */ |
| 582 | +void update_lookup_cb (flux_t *h, flux_msg_handler_t *mh, |
| 583 | + const flux_msg_t *msg, void *arg) |
| 584 | +{ |
| 585 | + struct info_ctx *ctx = arg; |
| 586 | + flux_jobid_t id; |
| 587 | + const char *key = NULL; |
| 588 | + json_t *keys = NULL; |
| 589 | + int flags; |
| 590 | + int valid_flags = 0; |
| 591 | + const char *errmsg = NULL; |
| 592 | + |
| 593 | + if (flux_request_unpack (msg, NULL, "{s:I s:s s:i}", |
| 594 | + "id", &id, |
| 595 | + "key", &key, |
| 596 | + "flags", &flags) < 0) { |
| 597 | + flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__); |
466 | 598 | goto error; |
| 599 | + } |
| 600 | + if ((flags & ~valid_flags)) { |
| 601 | + errno = EPROTO; |
| 602 | + errmsg = "update-lookup request rejected with invalid flag"; |
| 603 | + goto error; |
| 604 | + } |
| 605 | + if (!streq (key, "R")) { |
| 606 | + errno = EINVAL; |
| 607 | + errmsg = "update-lookup unsupported key specified"; |
| 608 | + goto error; |
| 609 | + } |
467 | 610 |
|
468 | | - if (zlist_append (ctx->lookups, l) < 0) { |
469 | | - flux_log_error (h, "%s: zlist_append", __FUNCTION__); |
| 611 | + if (!(keys = json_pack ("[s]", key))) { |
| 612 | + errno = ENOMEM; |
470 | 613 | goto error; |
471 | 614 | } |
472 | | - zlist_freefn (ctx->lookups, l, lookup_ctx_destroy, true); |
| 615 | + |
| 616 | + if (lookup (h, |
| 617 | + msg, |
| 618 | + ctx, |
| 619 | + id, |
| 620 | + keys, |
| 621 | + FLUX_JOB_LOOKUP_JSON_DECODE | FLUX_JOB_LOOKUP_CURRENT) < 0) |
| 622 | + goto error; |
473 | 623 |
|
474 | 624 | return; |
475 | 625 |
|
476 | 626 | error: |
477 | 627 | if (flux_respond_error (h, msg, errno, errmsg) < 0) |
478 | 628 | flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); |
479 | | - lookup_ctx_destroy (l); |
| 629 | + json_decref (keys); |
480 | 630 | } |
481 | 631 |
|
482 | 632 | /* |
|
0 commit comments