From f3537d1d86bf29cc9f69e88a71736300860f2cd9 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Wed, 5 Jun 2024 11:21:12 -0700 Subject: [PATCH 1/9] job-list: remove double semicolon Problem: There was an errant double semicolon. Solution: Remove it! --- src/modules/job-list/match.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/modules/job-list/match.h b/src/modules/job-list/match.h index 5589a671fdbc..1d22bf63aaba 100644 --- a/src/modules/job-list/match.h +++ b/src/modules/job-list/match.h @@ -52,6 +52,6 @@ int job_match (const struct job *job, int job_match_config_reload (struct match_ctx *mctx, const flux_conf_t *conf, - flux_error_t *errp);; + flux_error_t *errp); #endif /* !HAVE_JOB_LIST_MATCH_H */ From 0015f2ba2f69ddf91ee19fd219ec9df4a876d37c Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Mon, 10 Jun 2024 10:34:46 -0700 Subject: [PATCH 2/9] job-list: remove unnecessary variable args Problem: Several helper functions take variable arguments, which are unnecessary. Remove the variable args to these helper functions. --- src/modules/job-list/test/match.c | 11 ++--------- src/modules/job-list/test/state_match.c | 11 ++--------- 2 files changed, 4 insertions(+), 18 deletions(-) diff --git a/src/modules/job-list/test/match.c b/src/modules/job-list/test/match.c index 6c2f0307fecd..a4515805ba6e 100644 --- a/src/modules/job-list/test/match.c +++ b/src/modules/job-list/test/match.c @@ -27,26 +27,19 @@ struct match_ctx mctx = { .h = NULL, .max_comparisons = 0 }; static void list_constraint_create_corner_case (const char *str, - const char *fmt, - ...) + const char *msg) { struct list_constraint *c; - char buf[1024]; flux_error_t error; json_error_t jerror; json_t *jc; - va_list ap; if (!(jc = json_loads (str, 0, &jerror))) BAIL_OUT ("json constraint invalid: %s", jerror.text); - va_start (ap, fmt); - vsnprintf(buf, sizeof (buf), fmt, ap); - va_end (ap); - c = list_constraint_create (&mctx, jc, &error); - ok (c == NULL, "list_constraint_create fails on %s", buf); + ok (c == NULL, "list_constraint_create fails on %s", msg); diag ("error: %s", error.text); json_decref (jc); } diff --git a/src/modules/job-list/test/state_match.c b/src/modules/job-list/test/state_match.c index 0265634a6de6..2a47ed18d97b 100644 --- a/src/modules/job-list/test/state_match.c +++ b/src/modules/job-list/test/state_match.c @@ -20,25 +20,18 @@ #include "ccan/str/str.h" static void state_constraint_create_corner_case (const char *str, - const char *fmt, - ...) + const char *msg) { struct state_constraint *c; - char buf[1024]; flux_error_t error; json_error_t jerror; json_t *jc; - va_list ap; if (!(jc = json_loads (str, 0, &jerror))) BAIL_OUT ("json constraint invalid: %s", jerror.text); - va_start (ap, fmt); - vsnprintf(buf, sizeof (buf), fmt, ap); - va_end (ap); - c = state_constraint_create (jc, &error); - ok (c == NULL, "state_constraint_create fails on %s", buf); + ok (c == NULL, "state_constraint_create fails on %s", msg); diag ("error: %s", error.text); json_decref (jc); } From a0d28ce55ce0739edb3659a900916169a31593e1 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Wed, 5 Jun 2024 11:22:40 -0700 Subject: [PATCH 3/9] job-list: move convenience function to util file Problem: In the new future the array_to_results_bitmask() will be needed in multiple files. Move it from match.c to match_util.c. --- src/modules/job-list/match.c | 39 ------------------------------- src/modules/job-list/match_util.c | 39 +++++++++++++++++++++++++++++++ src/modules/job-list/match_util.h | 2 ++ 3 files changed, 41 insertions(+), 39 deletions(-) diff --git a/src/modules/job-list/match.c b/src/modules/job-list/match.c index f60764bc6923..56ec3002ab11 100644 --- a/src/modules/job-list/match.c +++ b/src/modules/job-list/match.c @@ -379,45 +379,6 @@ static int match_results (struct list_constraint *c, return ((*results) & job->result) ? 1 : 0; } -static int array_to_results_bitmask (json_t *values, flux_error_t *errp) -{ - int results = 0; - json_t *entry; - size_t index; - int valid_results = (FLUX_JOB_RESULT_COMPLETED - | FLUX_JOB_RESULT_FAILED - | FLUX_JOB_RESULT_CANCELED - | FLUX_JOB_RESULT_TIMEOUT); - - json_array_foreach (values, index, entry) { - flux_job_result_t result; - if (json_is_string (entry)) { - const char *resultstr = json_string_value (entry); - if (flux_job_strtoresult (resultstr, &result) < 0) { - errprintf (errp, - "invalid results value '%s' specified", - resultstr); - return -1; - } - } - else if (json_is_integer (entry)) { - result = json_integer_value (entry); - if (result & ~valid_results) { - errprintf (errp, - "invalid results value '%Xh' specified", - result); - return -1; - } - } - else { - errprintf (errp, "results value invalid type"); - return -1; - } - results |= result; - } - return results; -} - static struct list_constraint *create_results_constraint (struct match_ctx *mctx, json_t *values, flux_error_t *errp) diff --git a/src/modules/job-list/match_util.c b/src/modules/job-list/match_util.c index e7a0a6f36b2d..09703f8ef8e2 100644 --- a/src/modules/job-list/match_util.c +++ b/src/modules/job-list/match_util.c @@ -60,5 +60,44 @@ int array_to_states_bitmask (json_t *values, flux_error_t *errp) return states; } +int array_to_results_bitmask (json_t *values, flux_error_t *errp) +{ + int results = 0; + json_t *entry; + size_t index; + int valid_results = (FLUX_JOB_RESULT_COMPLETED + | FLUX_JOB_RESULT_FAILED + | FLUX_JOB_RESULT_CANCELED + | FLUX_JOB_RESULT_TIMEOUT); + + json_array_foreach (values, index, entry) { + flux_job_result_t result; + if (json_is_string (entry)) { + const char *resultstr = json_string_value (entry); + if (flux_job_strtoresult (resultstr, &result) < 0) { + errprintf (errp, + "invalid results value '%s' specified", + resultstr); + return -1; + } + } + else if (json_is_integer (entry)) { + result = json_integer_value (entry); + if (result & ~valid_results) { + errprintf (errp, + "invalid results value '%Xh' specified", + result); + return -1; + } + } + else { + errprintf (errp, "results value invalid type"); + return -1; + } + results |= result; + } + return results; +} + /* vi: ts=4 sw=4 expandtab */ diff --git a/src/modules/job-list/match_util.h b/src/modules/job-list/match_util.h index 59dbcd9c500c..0402955bd779 100644 --- a/src/modules/job-list/match_util.h +++ b/src/modules/job-list/match_util.h @@ -26,4 +26,6 @@ typedef int (*array_to_bitmask_f) (json_t *, flux_error_t *); int array_to_states_bitmask (json_t *values, flux_error_t *errp); +int array_to_results_bitmask (json_t *values, flux_error_t *errp); + #endif /* !HAVE_JOB_LIST_MATCH_UTIL_H */ From e2e7ee82046fc30a2f1d8296c0f0bbcabef9f251 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Thu, 15 Sep 2022 16:29:46 -0700 Subject: [PATCH 4/9] job-list: rebuild and store eventlog Problem: In the near future we will need access to the job's eventlog when a job goes inactive. Solution: Rebuild the job eventlog from the events journal and store it internally in struct job. --- src/modules/job-list/job_data.c | 1 + src/modules/job-list/job_data.h | 2 ++ src/modules/job-list/job_state.c | 47 ++++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+) diff --git a/src/modules/job-list/job_data.c b/src/modules/job-list/job_data.c index a48f7bdca52a..66ab79a9ba6c 100644 --- a/src/modules/job-list/job_data.c +++ b/src/modules/job-list/job_data.c @@ -40,6 +40,7 @@ void job_destroy (void *data) grudgeset_destroy (job->dependencies); json_decref (job->jobspec); json_decref (job->R); + free (job->eventlog); json_decref (job->exception_context); free (job); errno = save_errno; diff --git a/src/modules/job-list/job_data.h b/src/modules/job-list/job_data.h index 512f796ff1e4..b473d0f0993d 100644 --- a/src/modules/job-list/job_data.h +++ b/src/modules/job-list/job_data.h @@ -70,6 +70,8 @@ struct job { /* cache of job information */ json_t *jobspec; json_t *R; + char *eventlog; + size_t eventlog_len; json_t *exception_context; /* Track which states we have seen and have completed transition diff --git a/src/modules/job-list/job_state.c b/src/modules/job-list/job_state.c index 2ce0519ef03d..889c4c81636d 100644 --- a/src/modules/job-list/job_state.c +++ b/src/modules/job-list/job_state.c @@ -409,6 +409,42 @@ void job_state_unpause_cb (flux_t *h, flux_msg_handler_t *mh, flux_log_error (h, "error responding to unpause request"); } +static int store_eventlog_entry (struct job_state_ctx *jsctx, + struct job *job, + json_t *entry) +{ + char *s = json_dumps (entry, 0); + int rv = -1; + + /* entry should have been verified via eventlog_entry_parse() + * earlier */ + assert (s); + + if (!job->eventlog) { + job->eventlog_len = strlen (s) + 2; /* +2 for \n and \0 */ + if (!(job->eventlog = calloc (1, job->eventlog_len))) { + flux_log_error (jsctx->h, "calloc"); + goto error; + + } + strcpy (job->eventlog, s); + strcat (job->eventlog, "\n"); + } + else { + job->eventlog_len += strlen (s) + 1; /* +1 for \n */ + if (!(job->eventlog = realloc (job->eventlog, job->eventlog_len))) { + flux_log_error (jsctx->h, "realloc"); + goto error; + } + strcat (job->eventlog, s); + strcat (job->eventlog, "\n"); + } + rv = 0; +error: + free (s); + return rv; +} + static int job_transition_state (struct job_state_ctx *jsctx, struct job *job, flux_job_state_t newstate, @@ -487,12 +523,17 @@ static int journal_submit_event (struct job_state_ctx *jsctx, struct job *job, flux_jobid_t id, double timestamp, + json_t *entry, json_t *context, json_t *jobspec) { if (!job) { if (!(job = job_create (jsctx->h, id))) return -1; + if (store_eventlog_entry (jsctx, job, entry) < 0) { + job_destroy (job); + return -1; + } if (jobspec) job->jobspec = json_incref (jobspec); if (zhashx_insert (jsctx->index, &job->id, job) < 0) { @@ -895,11 +936,17 @@ static int journal_process_event (struct job_state_ctx *jsctx, return 0; } + if (job && job->eventlog) { + if (store_eventlog_entry (jsctx, job, event) < 0) + return -1; + } + if (streq (name, "submit")) { if (journal_submit_event (jsctx, job, id, timestamp, + event, context, jobspec) < 0) return -1; From 190099463ed0c0481710be34e2135e0372be9ab2 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Mon, 16 May 2022 13:17:51 -0700 Subject: [PATCH 5/9] job-list: add job db Problem: We would like to store inactive jobs on disk for retrieval at a later time. Solution: Add an sqlite job db to the job-list module. If the job-list module is configured with a database path or if the flux broker is configured with a statedir, store all inactive job data to the db. --- src/modules/Makefile.am | 6 +- src/modules/job-list/Makefile.am | 6 +- src/modules/job-list/config.c | 105 +++++++ src/modules/job-list/job-list.c | 12 +- src/modules/job-list/job-list.h | 2 + src/modules/job-list/job_db.c | 525 +++++++++++++++++++++++++++++++ src/modules/job-list/job_db.h | 44 +++ src/modules/job-list/job_state.c | 18 +- src/modules/job-list/job_state.h | 1 + src/modules/job-list/job_util.c | 32 ++ src/modules/job-list/job_util.h | 4 + src/modules/job-list/util.c | 51 +++ src/modules/job-list/util.h | 26 ++ 13 files changed, 826 insertions(+), 6 deletions(-) create mode 100644 src/modules/job-list/config.c create mode 100644 src/modules/job-list/job_db.c create mode 100644 src/modules/job-list/job_db.h create mode 100644 src/modules/job-list/util.c create mode 100644 src/modules/job-list/util.h diff --git a/src/modules/Makefile.am b/src/modules/Makefile.am index 3dd5a3628ceb..26a94994b2f4 100644 --- a/src/modules/Makefile.am +++ b/src/modules/Makefile.am @@ -190,7 +190,8 @@ job_list_la_SOURCES = job_list_la_CPPFLAGS = \ $(AM_CPPFLAGS) \ $(FLUX_SECURITY_CFLAGS) \ - $(HWLOC_CFLAGS) + $(HWLOC_CFLAGS) \ + $(SQLITE_CFLAGS) job_list_la_LIBADD = \ $(builddir)/job-list/libjob-list.la \ $(top_builddir)/src/common/libjob/libjob.la \ @@ -199,7 +200,8 @@ job_list_la_LIBADD = \ $(top_builddir)/src/common/libflux-optparse.la \ $(top_builddir)/src/common/librlist/librlist.la \ $(JANSSON_LIBS) \ - $(HWLOC_LIBS) + $(HWLOC_LIBS) \ + $(SQLITE_LIBS) job_list_la_LDFLAGS = $(fluxmod_ldflags) -module job_ingest_la_SOURCES = diff --git a/src/modules/job-list/Makefile.am b/src/modules/job-list/Makefile.am index 7278d7b9f812..234f1a9f0260 100644 --- a/src/modules/job-list/Makefile.am +++ b/src/modules/job-list/Makefile.am @@ -22,6 +22,8 @@ libjob_list_la_SOURCES = \ job_state.c \ job_data.h \ job_data.c \ + job_db.h \ + job_db.c \ list.h \ list.c \ job_util.h \ @@ -35,7 +37,9 @@ libjob_list_la_SOURCES = \ state_match.h \ state_match.c \ match_util.h \ - match_util.c + match_util.c \ + util.h \ + util.c TESTS = \ test_job_data.t \ diff --git a/src/modules/job-list/config.c b/src/modules/job-list/config.c new file mode 100644 index 000000000000..d421cfd9d7ba --- /dev/null +++ b/src/modules/job-list/config.c @@ -0,0 +1,105 @@ +static void config_reload_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct conf *conf = arg; + const flux_conf_t *instance_conf; + struct conf_callback *ccb; + flux_error_t error; + const char *errstr = NULL; + + if (flux_conf_reload_decode (msg, &instance_conf) < 0) + goto error; + if (policy_validate (instance_conf, &error) < 0) { + errstr = error.text; + goto error; + } + ccb = zlistx_first (conf->callbacks); + while (ccb) { + if (ccb->cb (instance_conf, &error, ccb->arg) < 0) { + errstr = error.text; + errno = EINVAL; + goto error; + } + ccb = zlistx_next (conf->callbacks); + } + if (flux_set_conf (h, flux_conf_incref (instance_conf)) < 0) { + errstr = "error updating cached configuration"; + flux_conf_decref (instance_conf); + goto error; + } + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "error responding to config-reload request"); + return; +error: + if (flux_respond_error (h, msg, errno, errstr) < 0) + flux_log_error (h, "error responding to config-reload request"); +} + +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "job-manager.config-reload", config_reload_cb, 0 }, + FLUX_MSGHANDLER_TABLE_END, +}; + + + + + +static int process_config (struct kvs_ctx *ctx) +{ + flux_error_t error; + if (kvs_checkpoint_config_parse (ctx->kcp, + flux_get_conf (ctx->h), + &error) < 0) { + flux_log (ctx->h, LOG_ERR, "%s", error.text); + return -1; + } + return 0; +} + + + +static int checkpoint_period_parse (const flux_conf_t *conf, + flux_error_t *errp, + double *checkpoint_period) +{ + flux_error_t error; + const char *str = NULL; + + if (flux_conf_unpack (conf, + &error, + "{s?{s?s}}", + "kvs", + "checkpoint-period", &str) < 0) { + errprintf (errp, + "error reading config for kvs: %s", + error.text); + return -1; + } + + if (str) { + if (fsd_parse_duration (str, checkpoint_period) < 0) { + errprintf (errp, + "invalid checkpoint-period config: %s", + str); + return -1; + } + } + + return 0; +} + +int kvs_checkpoint_config_parse (kvs_checkpoint_t *kcp, + const flux_conf_t *conf, + flux_error_t *errp) +{ + if (kcp) { + double checkpoint_period = kcp->checkpoint_period; + if (checkpoint_period_parse (conf, errp, &checkpoint_period) < 0) + return -1; + kcp->checkpoint_period = checkpoint_period; + } + return 0; +} + diff --git a/src/modules/job-list/job-list.c b/src/modules/job-list/job-list.c index 476553b9e4f3..5525ed8d6fa6 100644 --- a/src/modules/job-list/job-list.c +++ b/src/modules/job-list/job-list.c @@ -210,6 +210,8 @@ static void list_ctx_destroy (struct list_ctx *ctx) flux_msglist_destroy (ctx->deferred_requests); if (ctx->jsctx) job_state_destroy (ctx->jsctx); + if (ctx->dbctx) + job_db_ctx_destroy (ctx->dbctx); if (ctx->isctx) idsync_ctx_destroy (ctx->isctx); if (ctx->mctx) @@ -219,7 +221,7 @@ static void list_ctx_destroy (struct list_ctx *ctx) } } -static struct list_ctx *list_ctx_create (flux_t *h) +static struct list_ctx *list_ctx_create (flux_t *h, int argc, char **argv) { struct list_ctx *ctx = calloc (1, sizeof (*ctx)); if (!ctx) @@ -231,6 +233,12 @@ static struct list_ctx *list_ctx_create (flux_t *h) goto error; if (!(ctx->isctx = idsync_ctx_create (ctx->h))) goto error; + /* job_db_setup() performs a job_db_ctx_create() and some + * initialization */ + if (!(ctx->dbctx = job_db_setup (h, argc, argv))) { + if (errno != ENOTBLK) + goto error; + } if (!(ctx->jsctx = job_state_create (ctx))) goto error; if (!(ctx->deferred_requests = flux_msglist_create ())) @@ -248,7 +256,7 @@ int mod_main (flux_t *h, int argc, char **argv) struct list_ctx *ctx; int rc = -1; - if (!(ctx = list_ctx_create (h))) { + if (!(ctx = list_ctx_create (h, argc, argv))) { flux_log_error (h, "initialization error"); goto done; } diff --git a/src/modules/job-list/job-list.h b/src/modules/job-list/job-list.h index 6a239275171d..a0680b3de828 100644 --- a/src/modules/job-list/job-list.h +++ b/src/modules/job-list/job-list.h @@ -16,6 +16,7 @@ #include "src/common/libczmqcontainers/czmq_containers.h" #include "job_state.h" +#include "job_db.h" #include "idsync.h" #include "match.h" @@ -23,6 +24,7 @@ struct list_ctx { flux_t *h; flux_msg_handler_t **handlers; struct job_state_ctx *jsctx; + struct job_db_ctx *dbctx; struct idsync_ctx *isctx; struct flux_msglist *deferred_requests; struct match_ctx *mctx; diff --git a/src/modules/job-list/job_db.c b/src/modules/job-list/job_db.c new file mode 100644 index 000000000000..a38f9dddee56 --- /dev/null +++ b/src/modules/job-list/job_db.c @@ -0,0 +1,525 @@ +/************************************************************\ + * Copyright 2016 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* job_db: support storing inactive jobs to db */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/common/libutil/log.h" +#include "src/common/libutil/fsd.h" +#include "src/common/libutil/tstat.h" +#include "src/common/libutil/monotime.h" + +#include "job_db.h" +#include "job_util.h" +#include "util.h" + +#define BUSY_TIMEOUT_DEFAULT 50 +#define BUFSIZE 1024 + +/* N.B. "state" is always INACTIVE, but added in case of future changes */ + +const char *sql_create_table = "CREATE TABLE if not exists jobs(" + " id CHAR(16) PRIMARY KEY," + " userid INT," + " name TEXT," + " queue TEXT," + " state INT," + " result INT," + " nodelist TEXT," + " t_submit REAL," + " t_depend REAL," + " t_run REAL," + " t_cleanup REAL," + " t_inactive REAL," + " jobdata JSON," + " eventlog TEXT," + " jobspec JSON," + " R JSON" + ");"; + +const char *sql_store = \ + "INSERT INTO jobs" \ + "(" \ + " id," \ + " userid," \ + " name," \ + " queue," \ + " state," \ + " result," \ + " nodelist," \ + " t_submit," \ + " t_depend," \ + " t_run," \ + " t_cleanup," \ + " t_inactive," \ + " jobdata," \ + " eventlog," \ + " jobspec," \ + " R" \ + ") values (" \ + " ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16 " \ + ")"; + +void job_db_ctx_destroy (struct job_db_ctx *ctx) +{ + if (ctx) { + free (ctx->dbpath); + if (ctx->store_stmt) { + if (sqlite3_finalize (ctx->store_stmt) != SQLITE_OK) + log_sqlite_error (ctx, "sqlite_finalize store_stmt"); + } + if (ctx->db) { + if (sqlite3_close (ctx->db) != SQLITE_OK) + log_sqlite_error (ctx, "sqlite3_close"); + } + if (ctx->handlers) + flux_msg_handler_delvec (ctx->handlers); + free (ctx); + } +} + +static struct job_db_ctx * job_db_ctx_create (flux_t *h) +{ + struct job_db_ctx *ctx = calloc (1, sizeof (*ctx)); + + if (!ctx) { + flux_log_error (h, "job_db_ctx_create"); + goto error; + } + + ctx->h = h; + ctx->busy_timeout = BUSY_TIMEOUT_DEFAULT; + + return ctx; + error: + job_db_ctx_destroy (ctx); + return (NULL); +} + +static unsigned long long get_file_size (const char *path) +{ + struct stat sb; + + if (stat (path, &sb) < 0) + return 0; + return sb.st_size; +} + +static void db_stats_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct job_db_ctx *ctx = arg; + + if (flux_respond_pack (h, + msg, + "{s:I s:{s:i s:f s:f s:f s:f}}", + "dbfile_size", get_file_size (ctx->dbpath), + "store", + "count", tstat_count (&ctx->sqlstore), + "min", tstat_min (&ctx->sqlstore), + "max", tstat_max (&ctx->sqlstore), + "mean", tstat_mean (&ctx->sqlstore), + "stddev", tstat_stddev (&ctx->sqlstore)) < 0) + flux_log_error (h, "error responding to db-stats request"); + return; +} + +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "job-list.db-stats", db_stats_cb, 0 }, + FLUX_MSGHANDLER_TABLE_END, +}; + +static int get_max_inactive(struct job_db_ctx *ctx) +{ + char *inactive_max_query = "SELECT MAX(t_inactive) FROM jobs"; + sqlite3_stmt *res = NULL; + int save_errno, rv = -1; + + if (sqlite3_prepare_v2 (ctx->db, + inactive_max_query, + -1, + &res, + 0) != SQLITE_OK) { + log_sqlite_error (ctx, "sqlite3_prepare_v2"); + goto error; + } + + while (sqlite3_step (res) == SQLITE_ROW) { + const char *s = (const char *)sqlite3_column_text (res, 0); + if (s) { + char *endptr; + double d; + errno = 0; + d = strtod (s, &endptr); + if (errno || *endptr != '\0') + goto error; + ctx->initial_max_inactive = d; + break; + } + } + + rv = 0; +error: + save_errno = errno; + sqlite3_finalize (res); + errno = save_errno; + return rv; +} + +int job_db_init (struct job_db_ctx *ctx) +{ + int flags = SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE; + char buf[1024]; + int rc = -1; + + if (sqlite3_open_v2 (ctx->dbpath, &ctx->db, flags, NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "opening %s", ctx->dbpath); + goto error; + } + + if (sqlite3_exec (ctx->db, + "PRAGMA journal_mode=WAL", + NULL, + NULL, + NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "setting sqlite 'journal_mode' pragma"); + goto error; + } + if (sqlite3_exec (ctx->db, + "PRAGMA synchronous=NORMAL", + NULL, + NULL, + NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "setting sqlite 'synchronous' pragma"); + goto error; + } + snprintf (buf, 1024, "PRAGMA busy_timeout=%u;", ctx->busy_timeout); + if (sqlite3_exec (ctx->db, + buf, + NULL, + NULL, + NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "setting sqlite 'busy_timeout' pragma"); + goto error; + } + if (sqlite3_exec (ctx->db, + sql_create_table, + NULL, + NULL, + NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "creating object table"); + goto error; + } + + if (sqlite3_prepare_v2 (ctx->db, + sql_store, + -1, + &ctx->store_stmt, + NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "preparing store stmt"); + goto error; + } + + if (flux_msg_handler_addvec (ctx->h, htab, ctx, &ctx->handlers) < 0) { + flux_log_error (ctx->h, "flux_msg_handler_addvec"); + goto error; + } + + if (get_max_inactive (ctx) < 0) + goto error; + + rc = 0; +error: + return rc; +} + +int job_db_store (struct job_db_ctx *ctx, struct job *job) +{ + json_t *o = NULL; + flux_error_t err; + char *job_str = NULL; + char *jobspec = NULL; + char *R = NULL; + char idbuf[64]; + struct timespec t0; + int rv = -1; + + /* when job-list is initialized from the journal, we could + * re-store duplicate entries into the db. Do not do this if the + * t_inactive is less than the max we read from the db upon module + * initialization + * + * Note, small chance of floating point rounding errors here, but + * if 1 job is added twice to the DB, we can live with it. + */ + if (job->t_inactive <= ctx->initial_max_inactive) + return 0; + + monotime (&t0); + + snprintf (idbuf, 64, "%llu", (unsigned long long)job->id); + if (sqlite3_bind_text (ctx->store_stmt, + 1, + idbuf, + strlen (idbuf), + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding id"); + goto out; + } + if (sqlite3_bind_int (ctx->store_stmt, + 2, + job->userid) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding userid"); + goto out; + } + /* N.B. name can be NULL. sqlite_bind_text() same as + * sqlite_bind_null() if pointer is NULL */ + if (sqlite3_bind_text (ctx->store_stmt, + 3, + job->name, + job->name ? strlen (job->name) : 0, + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding job name"); + goto out; + } + /* N.B. queue can be NULL. sqlite_bind_text() same as + * sqlite_bind_null() if pointer is NULL */ + if (sqlite3_bind_text (ctx->store_stmt, + 4, + job->queue, + job->queue ? strlen (job->queue) : 0, + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding job queue"); + goto out; + } + if (sqlite3_bind_int (ctx->store_stmt, + 5, + job->state) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding state"); + goto out; + } + if (sqlite3_bind_int (ctx->store_stmt, + 6, + job->result) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding result"); + goto out; + } + /* N.B. nodelist can be NULL. sqlite_bind_text() same as + * sqlite_bind_null() if pointer is NULL */ + if (sqlite3_bind_text (ctx->store_stmt, + 7, + job->nodelist, + job->nodelist ? strlen (job->nodelist) : 0, + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding job nodelist"); + goto out; + } + if (sqlite3_bind_double (ctx->store_stmt, + 8, + job->t_submit) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding t_submit"); + goto out; + } + if (sqlite3_bind_double (ctx->store_stmt, + 9, + job->t_depend) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding t_depend"); + goto out; + } + if (sqlite3_bind_double (ctx->store_stmt, + 10, + job->t_run) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding t_run"); + goto out; + } + if (sqlite3_bind_double (ctx->store_stmt, + 11, + job->t_cleanup) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding t_cleanup"); + goto out; + } + if (sqlite3_bind_double (ctx->store_stmt, + 12, + job->t_inactive) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding t_inactive"); + goto out; + } + if (!(o = job_to_json_dbdata (job, &err))) + goto out; + if (!(job_str = json_dumps (o, JSON_COMPACT))) { + errno = ENOMEM; + goto out; + } + if (sqlite3_bind_text (ctx->store_stmt, + 13, + job_str, + strlen (job_str), + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding jobdata"); + goto out; + } + if (sqlite3_bind_text (ctx->store_stmt, + 14, + job->eventlog, + strlen (job->eventlog), + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding eventlog"); + goto out; + } + if (!(jobspec = json_dumps (job->jobspec, 0))) { + flux_log_error (ctx->h, "json_dumps jobspec"); + goto out; + } + if (sqlite3_bind_text (ctx->store_stmt, + 15, + jobspec, + strlen (jobspec), + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding jobspec"); + goto out; + } + if (job->R) { + if (!(R = json_dumps (job->R, 0))) { + flux_log_error (ctx->h, "json_dumps R"); + goto out; + } + } + /* N.B. R can be NULL. sqlite_bind_text() same as + * sqlite_bind_null() if pointer is NULL */ + if (sqlite3_bind_text (ctx->store_stmt, + 16, + R, + R ? strlen (R) : 0, + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding R"); + goto out; + } + while (sqlite3_step (ctx->store_stmt) != SQLITE_DONE) { + /* due to rounding errors in sqlite, duplicate entries could be + * written out on occasion leading to a SQLITE_CONSTRAINT error. + * We accept this and move on. + */ + int errcode = sqlite3_errcode (ctx->db); + if (errcode == SQLITE_CONSTRAINT) + break; + else if (errcode == SQLITE_BUSY) { + /* In the rare case this cannot complete within the normal + * busytimeout, we elect to spin till it completes. This + * may need to be revisited in the future: */ + flux_log (ctx->h, LOG_DEBUG, "%s: BUSY", __FUNCTION__); + usleep (1000); + continue; + } + else { + log_sqlite_error (ctx, "store: executing stmt"); + goto out; + } + } + + tstat_push (&ctx->sqlstore, monotime_since (t0)); + + rv = 0; +out: + sqlite3_reset (ctx->store_stmt); + json_decref (o); + free (job_str); + free (jobspec); + free (R); + return rv; +} + +static int process_config (struct job_db_ctx *ctx) +{ + flux_error_t err; + const char *dbpath = NULL; + const char *busytimeout = NULL; + + if (flux_conf_unpack (flux_get_conf (ctx->h), + &err, + "{s?{s?s s?s}}", + "job-list", + "dbpath", &dbpath, + "busytimeout", &busytimeout) < 0) { + flux_log (ctx->h, LOG_ERR, + "error reading db config: %s", + err.text); + return -1; + } + + if (dbpath) { + if (!(ctx->dbpath = strdup (dbpath))) + flux_log_error (ctx->h, "dbpath not configured"); + } + else { + const char *dbdir = flux_attr_get (ctx->h, "statedir"); + if (dbdir) { + if (asprintf (&ctx->dbpath, "%s/job-db.sqlite", dbdir) < 0) { + flux_log_error (ctx->h, "asprintf"); + return -1; + } + } + } + if (busytimeout) { + double tmp; + if (fsd_parse_duration (busytimeout, &tmp) < 0) + flux_log_error (ctx->h, "busytimeout not configured"); + else + ctx->busy_timeout = (int)(1000 * tmp); + } + + return 0; +} + +struct job_db_ctx * job_db_setup (flux_t *h, int ac, char **av) +{ + struct job_db_ctx *ctx = job_db_ctx_create (h); + + if (!ctx) + return NULL; + + if (process_config (ctx) < 0) + goto done; + + if (!ctx->dbpath) { + errno = ENOTBLK; + goto done; + } + + if (job_db_init (ctx) < 0) + goto done; + + return ctx; + +done: + job_db_ctx_destroy (ctx); + return NULL; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/modules/job-list/job_db.h b/src/modules/job-list/job_db.h new file mode 100644 index 000000000000..5b6cc10baa7f --- /dev/null +++ b/src/modules/job-list/job_db.h @@ -0,0 +1,44 @@ +/************************************************************\ + * Copyright 2018 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_JOB_DB_H +#define _FLUX_JOB_DB_H + +#include +#include + +#include "src/common/libczmqcontainers/czmq_containers.h" +#include "src/common/libutil/tstat.h" + +#include "job_data.h" + +struct job_db_ctx { + flux_t *h; + char *dbpath; + unsigned int busy_timeout; + sqlite3 *db; + sqlite3_stmt *store_stmt; + flux_msg_handler_t **handlers; + tstat_t sqlstore; + double initial_max_inactive; /* when db initially loaded */ +}; + +struct job_db_ctx *job_db_setup (flux_t *h, int ac, char **av); + +void job_db_ctx_destroy (struct job_db_ctx *ctx); + +int job_db_store (struct job_db_ctx *ctx, struct job *job); + +#endif /* _FLUX_JOB_DB_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/modules/job-list/job_state.c b/src/modules/job-list/job_state.c index 889c4c81636d..0877a48b055d 100644 --- a/src/modules/job-list/job_state.c +++ b/src/modules/job-list/job_state.c @@ -31,6 +31,7 @@ #include "job-list.h" #include "job_state.h" #include "job_data.h" +#include "job_db.h" #include "idsync.h" #include "job_util.h" @@ -309,11 +310,23 @@ static void process_state_transition_update (struct job_state_ctx *jsctx, /* FLUX_JOB_STATE_SCHED */ /* FLUX_JOB_STATE_CLEANUP */ /* FLUX_JOB_STATE_INACTIVE */ + bool inactive = false; - if (state == FLUX_JOB_STATE_INACTIVE) + if (state == FLUX_JOB_STATE_INACTIVE) { eventlog_inactive_complete (job); + inactive = true; + } update_job_state_and_list (jsctx, job, state, timestamp); + + if (inactive) { + assert (job->state == FLUX_JOB_STATE_INACTIVE); + if (job->eventlog && jsctx->ctx->dbctx) { + if (job_db_store (jsctx->ctx->dbctx, job) < 0) + flux_log_error (jsctx->h, "%s: job_db_store", + __FUNCTION__); + } + } } } @@ -1162,6 +1175,9 @@ struct job_state_ctx *job_state_create (struct list_ctx *ctx) { struct job_state_ctx *jsctx = NULL; + /* dbctx can be NULL */ + assert (ctx->isctx); + if (!(jsctx = calloc (1, sizeof (*jsctx)))) { flux_log_error (ctx->h, "calloc"); return NULL; diff --git a/src/modules/job-list/job_state.h b/src/modules/job-list/job_state.h index 56d6f6ca0a72..43607bd0920e 100644 --- a/src/modules/job-list/job_state.h +++ b/src/modules/job-list/job_state.h @@ -16,6 +16,7 @@ #include "src/common/libczmqcontainers/czmq_containers.h" +#include "job_db.h" #include "idsync.h" #include "stats.h" diff --git a/src/modules/job-list/job_util.c b/src/modules/job-list/job_util.c index 163ab84d9462..e4a4276a2487 100644 --- a/src/modules/job-list/job_util.c +++ b/src/modules/job-list/job_util.c @@ -290,6 +290,38 @@ json_t *job_to_json (struct job *job, json_t *attrs, flux_error_t *errp) return NULL; } +json_t *job_to_json_dbdata (struct job *job, flux_error_t *errp) +{ + json_t *val = NULL; + json_t *o; + + memset (errp, 0, sizeof (*errp)); + + if (!(o = json_object ())) + goto error_nomem; + if (!(val = json_integer (job->id))) + goto error_nomem; + if (json_object_set_new (o, "id", val) < 0) { + json_decref (val); + goto error_nomem; + } + if (store_all_attr (job, o, errp) < 0) + goto error; + if (!(val = json_integer (job->states_mask))) + goto error_nomem; + if (json_object_set_new (o, "states_mask", val) < 0) { + json_decref (val); + goto error_nomem; + } + return o; +error_nomem: + errno = ENOMEM; +error: + ERRNO_SAFE_WRAP (json_decref, o); + return NULL; +} + + /* * vi:tabstop=4 shiftwidth=4 expandtab */ diff --git a/src/modules/job-list/job_util.h b/src/modules/job-list/job_util.h index 8e09fc47e722..422ca618c4f5 100644 --- a/src/modules/job-list/job_util.h +++ b/src/modules/job-list/job_util.h @@ -17,6 +17,10 @@ json_t *job_to_json (struct job *job, json_t *attrs, flux_error_t *errp); +/* similar to job_to_json(), but all data to be stored in db. + */ +json_t *job_to_json_dbdata (struct job *job, flux_error_t *errp); + #endif /* ! _FLUX_JOB_LIST_JOB_UTIL_H */ /* diff --git a/src/modules/job-list/util.c b/src/modules/job-list/util.c new file mode 100644 index 000000000000..7cafe5851763 --- /dev/null +++ b/src/modules/job-list/util.c @@ -0,0 +1,51 @@ +/************************************************************\ + * Copyright 2022 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* util.c - utility functions */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#include + +#include "util.h" + +void log_sqlite_error (struct job_db_ctx *dbctx, const char *fmt, ...) +{ + char buf[128]; + va_list ap; + + va_start (ap, fmt); + (void)vsnprintf (buf, sizeof (buf), fmt, ap); + va_end (ap); + + if (dbctx->db) { + const char *errmsg = sqlite3_errmsg (dbctx->db); + flux_log (dbctx->h, + LOG_ERR, + "%s: %s(%d)", + buf, + errmsg ? errmsg : "unknown error code", + sqlite3_extended_errcode (dbctx->db)); + } + else + flux_log (dbctx->h, + LOG_ERR, + "%s: unknown error, no sqlite3 handle", + buf); +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/modules/job-list/util.h b/src/modules/job-list/util.h new file mode 100644 index 000000000000..59a78899462a --- /dev/null +++ b/src/modules/job-list/util.h @@ -0,0 +1,26 @@ +/************************************************************\ + * Copyright 2018 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_JOB_LIST_UTIL_H +#define _FLUX_JOB_LIST_UTIL_H + +#include + +#include "job_db.h" +#include "job_data.h" + +void __attribute__((format (printf, 2, 3))) +log_sqlite_error (struct job_db_ctx *dbctx, const char *fmt, ...); + +#endif /* ! _FLUX_JOB_LIST_UTIL_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ From a705fd352dab8ffbf21b62162b1d3899f86ef30f Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 11 Jun 2024 16:02:57 -0700 Subject: [PATCH 6/9] job-list: support constraint -> sql conversion Problem: In the near future we would like to query jobs from the job db. In order to do so efficiently, we need to convert the job list constraint into equivalent SQL that can be used in a WHERE statement. Support a new helper library "constraint_sql" that supports converting a constraint into conditions that can be passed in via an SQL WHERE. Add unit tests as well. --- src/modules/job-list/Makefile.am | 13 +- src/modules/job-list/constraint_sql.c | 342 ++++++++++++++ src/modules/job-list/constraint_sql.h | 23 + src/modules/job-list/test/constraint_sql.c | 492 +++++++++++++++++++++ 4 files changed, 869 insertions(+), 1 deletion(-) create mode 100644 src/modules/job-list/constraint_sql.c create mode 100644 src/modules/job-list/constraint_sql.h create mode 100644 src/modules/job-list/test/constraint_sql.c diff --git a/src/modules/job-list/Makefile.am b/src/modules/job-list/Makefile.am index 234f1a9f0260..86c500c185fa 100644 --- a/src/modules/job-list/Makefile.am +++ b/src/modules/job-list/Makefile.am @@ -38,13 +38,16 @@ libjob_list_la_SOURCES = \ state_match.c \ match_util.h \ match_util.c \ + constraint_sql.h \ + constraint_sql.c \ util.h \ util.c TESTS = \ test_job_data.t \ test_match.t \ - test_state_match.t + test_state_match.t \ + test_constraint_sql.t test_ldadd = \ $(builddir)/libjob-list.la \ @@ -93,6 +96,14 @@ test_state_match_t_LDADD = \ test_state_match_t_LDFLAGS = \ $(test_ldflags) +test_constraint_sql_t_SOURCES = test/constraint_sql.c +test_constraint_sql_t_CPPFLAGS = \ + $(test_cppflags) +test_constraint_sql_t_LDADD = \ + $(test_ldadd) +test_constraint_sql_t_LDFLAGS = \ + $(test_ldflags) + EXTRA_DIST = \ test/R/1node_1core.R \ test/R/1node_4core.R \ diff --git a/src/modules/job-list/constraint_sql.c b/src/modules/job-list/constraint_sql.c new file mode 100644 index 000000000000..be6cb163cb5e --- /dev/null +++ b/src/modules/job-list/constraint_sql.c @@ -0,0 +1,342 @@ +/************************************************************\ + * Copyright 2023 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include +#include + +#include "src/common/libutil/errprintf.h" +#include "ccan/str/str.h" + +#include "constraint_sql.h" +#include "match_util.h" + +static int create_userid_query (json_t *values, char **query, flux_error_t *errp) +{ + json_t *entry; + size_t index; + char *q = NULL; + json_array_foreach (values, index, entry) { + uint32_t userid; + if (!json_is_integer (entry)) { + errprintf (errp, "userid value must be an integer"); + goto error; + } + userid = json_integer_value (entry); + /* special case FLUX_USERID_UNKNOWN matches all, so no query result */ + if (userid == FLUX_USERID_UNKNOWN) + continue; + if (!q) { + if (asprintf (&q, "userid = %u", userid) < 0) { + errno = ENOMEM; + goto error; + } + } + else { + char *tmp; + if (asprintf (&tmp, "%s OR userid = %u", q, userid) < 0) { + errno = ENOMEM; + goto error; + } + free (q); + q = tmp; + } + } + (*query) = q; + return 0; +error: + free (q); + return -1; +} + +static int create_string_query (json_t *values, + const char *op, + char **query, + flux_error_t *errp) +{ + json_t *entry; + size_t index; + char *q = NULL; + json_array_foreach (values, index, entry) { + const char *str; + if (!json_is_string (entry)) { + errprintf (errp, "%s value must be a string", op); + goto error; + } + str = json_string_value (entry); + if (!q) { + if (asprintf (&q, "%s = '%s'", op, str) < 0) { + errno = ENOMEM; + goto error; + } + } + else { + char *tmp; + if (asprintf (&tmp, "%s OR %s = '%s'", q, op, str) < 0) { + errno = ENOMEM; + goto error; + } + free (q); + q = tmp; + } + } + (*query) = q; + return 0; +error: + free (q); + return -1; +} + +static int create_name_query (json_t *values, + char **query, + flux_error_t *errp) +{ + return create_string_query (values, "name", query, errp); +} + +static int create_queue_query (json_t *values, + char **query, + flux_error_t *errp) +{ + return create_string_query (values, "queue", query, errp); +} + +static int create_bitmask_query (const char *col, + json_t *values, + array_to_bitmask_f array_to_bitmask_cb, + char **query, + flux_error_t *errp) +{ + char *q = NULL; + int tmp; + if ((tmp = array_to_bitmask_cb (values, errp)) < 0) + return -1; + if (asprintf (&q, "(%s & %d) > 0", col, tmp) < 0) { + errno = ENOMEM; + return -1; + } + (*query) = q; + return 0; +} + +static int create_states_query (json_t *values, + char **query, + flux_error_t *errp) +{ + return create_bitmask_query ("state", + values, + array_to_states_bitmask, + query, + errp); +} + +static int create_results_query (json_t *values, + char **query, + flux_error_t *errp) +{ + return create_bitmask_query ("result", + values, + array_to_results_bitmask, + query, + errp); +} + +static int create_timestamp_query (const char *type, + json_t *values, + char **query, + flux_error_t *errp) +{ + const char *str; + const char *comp; + char *q = NULL; + double t; + char *endptr; + json_t *v = json_array_get (values, 0); + + if (!v) { + errprintf (errp, "timestamp value not specified"); + return -1; + } + if (!json_is_string (v)) { + errprintf (errp, "%s value must be a string", type); + return -1; + } + str = json_string_value (v); + if (strstarts (str, ">=")) { + comp = ">="; + str += 2; + } + else if (strstarts (str, "<=")) { + comp = "<="; + str += 2; + } + else if (strstarts (str, ">")) { + comp = ">"; + str +=1; + } + else if (strstarts (str, "<")) { + comp = "<"; + str += 1; + } + else { + errprintf (errp, "timestamp comparison operator not specified"); + return -1; + } + + errno = 0; + t = strtod (str, &endptr); + if (errno != 0 || *endptr != '\0') { + errprintf (errp, "Invalid timestamp value specified"); + return -1; + } + if (t < 0.0) { + errprintf (errp, "timestamp value must be >= 0.0"); + return -1; + } + + if (asprintf (&q, "%s %s %s", type, comp, str) < 0) { + errno = ENOMEM; + return -1; + } + + (*query) = q; + return 0; +} + +static int conditional_query (const char *type, + json_t *values, + char **query, + flux_error_t *errp) +{ + char *q = NULL; + char *cond; + json_t *entry; + size_t index; + + if (streq (type, "and")) + cond = "AND"; + else if (streq (type, "or")) + cond = "OR"; + else /* streq (type, "not") */ + /* we will "NOT" it at the end */ + cond = "AND"; + + json_array_foreach (values, index, entry) { + char *subquery; + if (constraint2sql (entry, &subquery, errp) < 0) + goto error; + if (!q) + q = subquery; + else if (subquery) { + char *tmp; + if (asprintf (&tmp, "%s %s %s", q, cond, subquery) < 0) { + free (subquery); + errno = ENOMEM; + goto error; + } + free (q); + q = tmp; + } + } + if (q && streq (type, "not")) { + char *tmp; + if (asprintf (&tmp, "NOT (%s)", q) < 0) { + errno = ENOMEM; + goto error; + } + free (q); + q = tmp; + } + (*query) = q; + return 0; + +error: + free (q); + return -1; +} + +int constraint2sql (json_t *constraint, char **query, flux_error_t *errp) +{ + char *q = NULL; + char *rv = NULL; + const char *op; + json_t *values; + + if (!query) + return -1; + + if (!constraint) + return 0; + + if (!json_is_object (constraint)) { + errprintf (errp, "constraint must be JSON object"); + return -1; + } + if (json_object_size (constraint) > 1) { + errprintf (errp, "constraint must only contain 1 element"); + return -1; + } + json_object_foreach (constraint, op, values) { + int ret; + if (!json_is_array (values)) { + errprintf (errp, "operator %s values not an array", op); + goto error; + } + if (streq (op, "userid")) + ret = create_userid_query (values, &q, errp); + else if (streq (op, "name")) + ret = create_name_query (values, &q, errp); + else if (streq (op, "queue")) + ret = create_queue_query (values, &q, errp); + else if (streq (op, "states")) + ret = create_states_query (values, &q, errp); + else if (streq (op, "results")) + ret = create_results_query (values, &q, errp); + else if (streq (op, "hostlist")) { + /* no hostlist column, no conversion to be done */ + ret = 0; + } + else if (streq (op, "t_submit") + || streq (op, "t_depend") + || streq (op, "t_run") + || streq (op, "t_cleanup") + || streq (op, "t_inactive")) + ret = create_timestamp_query (op, values, &q, errp); + else if (streq (op, "or") || streq (op, "and") || streq (op, "not")) + ret = conditional_query (op, values, &q, errp); + else { + errprintf (errp, "unknown constraint operator: %s", op); + goto error; + } + if (ret < 0) + goto error; + } + if (q) { + if (asprintf (&rv, "(%s)", q) < 0) { + errno = ENOMEM; + goto error; + } + free (q); + } + (*query) = rv; + return 0; + +error: + free (q); + return -1; +} + +/* vi: ts=4 sw=4 expandtab + */ diff --git a/src/modules/job-list/constraint_sql.h b/src/modules/job-list/constraint_sql.h new file mode 100644 index 000000000000..c46134ef475b --- /dev/null +++ b/src/modules/job-list/constraint_sql.h @@ -0,0 +1,23 @@ +/************************************************************\ + * Copyright 2023 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef HAVE_JOB_LIST_CONSTRAINT_SQL_H +#define HAVE_JOB_LIST_CONSTRAINT_SQL_H 1 + +#if HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include + +int constraint2sql (json_t *constraint, char **query, flux_error_t *errp); + +#endif /* !HAVE_JOB_LIST_MATCH_H */ diff --git a/src/modules/job-list/test/constraint_sql.c b/src/modules/job-list/test/constraint_sql.c new file mode 100644 index 000000000000..48f7cbf68da5 --- /dev/null +++ b/src/modules/job-list/test/constraint_sql.c @@ -0,0 +1,492 @@ +/************************************************************\ + * Copyright 2023 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include + +#include "src/common/libtap/tap.h" +#include "src/modules/job-list/job_data.h" +#include "src/modules/job-list/constraint_sql.h" +#include "ccan/str/str.h" + +static void constraint2sql_corner_case (const char *str, + const char *msg) +{ + flux_error_t error; + json_error_t jerror; + json_t *jc; + char *query; + int ret; + + if (!(jc = json_loads (str, 0, &jerror))) + BAIL_OUT ("json constraint invalid: %s", jerror.text); + + ret = constraint2sql (jc, &query, &error); + + ok (ret < 0, "constraint2sql fails on %s", msg); + diag ("error: %s", error.text); + json_decref (jc); +} + +static void test_corner_case (void) +{ + constraint2sql_corner_case ("{\"userid\":[1], \"name\":[\"foo\"] }", + "object with too many keys"); + constraint2sql_corner_case ("{\"userid\":1}", + "object with values not array"); + constraint2sql_corner_case ("{\"foo\":[1]}", + "object with invalid operation"); + constraint2sql_corner_case ("{\"userid\":[\"foo\"]}", + "userid value not integer"); + constraint2sql_corner_case ("{\"name\":[1]}", + "name value not string"); + constraint2sql_corner_case ("{\"queue\":[1]}", + "queue value not string"); + constraint2sql_corner_case ("{\"states\":[0.0]}", + "states value not integer or string"); + constraint2sql_corner_case ("{\"states\":[\"foo\"]}", + "states value not valid string"); + constraint2sql_corner_case ("{\"states\":[8192]}", + "states value not valid integer"); + constraint2sql_corner_case ("{\"results\":[0.0]}", + "results value not integer or string"); + constraint2sql_corner_case ("{\"results\":[\"foo\"]}", + "results value not valid string"); + constraint2sql_corner_case ("{\"results\":[8192]}", + "results value not valid integer"); + constraint2sql_corner_case ("{\"t_depend\":[]}", + "t_depend value not specified"); + constraint2sql_corner_case ("{\"t_depend\":[1.0]}", + "t_depend value in invalid format (int)"); + constraint2sql_corner_case ("{\"t_depend\":[\"0.0\"]}", + "t_depend no comparison operator"); + constraint2sql_corner_case ("{\"t_depend\":[\">=foof\"]}", + "t_depend value invalid (str)"); + constraint2sql_corner_case ("{\"t_depend\":[\">=-1.0\"]}", + "t_depend value < 0.0 (str)"); + constraint2sql_corner_case ("{\"not\":[1]}", + "sub constraint not a constraint"); +} + +void test_constraint2sql (const char *constraint, const char *expected) +{ + flux_error_t error; + json_error_t jerror; + json_t *jc = NULL; + char *query = NULL; + char *constraint_compact = NULL; + int ret; + + if (constraint) { + if (!(jc = json_loads (constraint, 0, &jerror))) + BAIL_OUT ("json constraint invalid: %s", jerror.text); + /* b/c tests written below have spacing, alignment, + * etc. etc. which outputs poorly */ + if (!(constraint_compact = json_dumps (jc, JSON_COMPACT))) + BAIL_OUT ("json_dumps"); + } + + ret = constraint2sql (jc, &query, &error); + + ok (ret == 0, "constraint2sql success"); + if (ret < 0) + diag ("error: %s", error.text); + if (expected) { + bool pass = query && streq (query, expected); + ok (pass == true, + "constraint2sql on \"%s\" success", constraint_compact); + if (!pass) + diag ("unexpected result: %s", query); + } + else { + ok (query == NULL, + "constraint2sql on \"%s\" success", constraint_compact); + } + json_decref (jc); +} + +static void test_special_cases (void) +{ + test_constraint2sql (NULL, NULL); +} + +struct constraint2sql_test { + const char *constraint; + const char *expected; +}; + +/* N.B. These constraints are copied from the tests in match.c */ +struct constraint2sql_test tests[] = { + /* + * userid tests + */ + /* matches "all", so no query result */ + { + "{}", + NULL, + }, + /* no sql query possible, return is NULL */ + { + "{ \"userid\": [ ] }", + NULL, + }, + { + "{ \"userid\": [ 42 ] }", + "(userid = 42)", + }, + { + "{ \"userid\": [ 42, 43 ] }", + "(userid = 42 OR userid = 43)", + }, + /* FLUX_USERID_UNKNOWN = 0xFFFFFFFF + * matches "all", so no query result + */ + { + "{ \"userid\": [ -1 ] }", + NULL, + }, + /* + * name tests + */ + /* no sql query possible, return is NULL */ + { + "{ \"name\": [ ] }", + NULL, + }, + { + "{ \"name\": [ \"foo\" ] }", + "(name = 'foo')", + }, + { + "{ \"name\": [ \"foo\", \"bar\" ] }", + "(name = 'foo' OR name = 'bar')", + }, + /* + * queue tests + */ + /* no sql query possible, return is NULL */ + { + "{ \"queue\": [ ] }", + NULL, + }, + { + "{ \"queue\": [ \"foo\" ] }", + "(queue = 'foo')", + }, + { + "{ \"queue\": [ \"foo\", \"bar\" ] }", + "(queue = 'foo' OR queue = 'bar')", + }, + /* + * states tests + */ + /* matches "nothing" */ + { + "{ \"states\": [ ] }", + "((state & 0) > 0)", + }, + { + /* sanity check integer inputs work, we assume FLUX_JOB_STATE_NEW + * will always be 1, use strings everywhere else + */ + "{ \"states\": [ 1 ] }", + "((state & 1) > 0)", + }, + { + "{ \"states\": [ \"sched\" ] }", + "((state & 8) > 0)", + }, + { + "{ \"states\": [ \"sched\", \"RUN\" ] }", + "((state & 24) > 0)", + }, + /* + * results tests + */ + /* matches "nothing" */ + { + "{ \"results\": [ ] }", + "((result & 0) > 0)", + }, + { + /* sanity check integer inputs work, we assume + * FLUX_JOB_RESULT_COMPLETED will always be 1, use strings + * everywhere else + */ + "{ \"results\": [ 1 ] }", + "((result & 1) > 0)", + }, + { + "{ \"results\": [ \"completed\" ] }", + "((result & 1) > 0)", + }, + { + "{ \"results\": [ \"completed\", \"FAILED\" ] }", + "((result & 3) > 0)", + }, + /* + * hostlist tests + * + * N.B. hostlist cannot be converted to SQL query, so all return + * NULL + */ + { + "{ \"hostlist\": [ ] }", + NULL, + }, + { + "{ \"hostlist\": [ \"foo1\" ] }", + NULL, + }, + { + "{ \"hostlist\": [ \"foo[1-2]\" ] }", + NULL, + }, + { + "{ \"hostlist\": [ \"foo1\", \"foo2\", \"foo3\" ] }", + NULL, + }, + /* + * timestamp tests + */ + { + "{ \"t_submit\": [ \">=0\" ] }", + "(t_submit >= 0)", + }, + { + "{ \"t_depend\": [ \">=0.0\" ] }", + "(t_depend >= 0.0)", + }, + { + "{ \"t_run\": [ \">=0\" ] }", + "(t_run >= 0)", + }, + { + "{ \"t_cleanup\": [ \">=0.0\" ] }", + "(t_cleanup >= 0.0)", + }, + { + "{ \"t_inactive\": [ \">=0.0\" ] }", + "(t_inactive >= 0.0)", + }, + { + "{ \"t_inactive\": [ \"<100.0\" ] }", + "(t_inactive < 100.0)", + }, + { + "{ \"t_inactive\": [ \"<=100.0\" ] }", + "(t_inactive <= 100.0)", + }, + { + "{ \"t_inactive\": [ \">=100.0\" ] }", + "(t_inactive >= 100.0)", + }, + { + "{ \"or\": [] }", + NULL, + }, + { + "{ \"and\": [] }", + NULL, + }, + { + "{ \"not\": [] }", + NULL, + }, + { + "{ \"not\": [ { \"userid\": [ 42 ] } ] }", + "(NOT ((userid = 42)))", + }, + { + "{ \"or\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"name\": [ \"foo\" ] } \ + ] \ + }", + "((userid = 42) OR (name = 'foo'))", + }, + { + "{ \"or\": \ + [ \ + { \"not\": [ { \"userid\": [ 42 ] } ] }, \ + { \"name\": [ \"foo\" ] } \ + ] \ + }", + "((NOT ((userid = 42))) OR (name = 'foo'))", + }, + { + "{ \"not\": \ + [ \ + { \"or\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"name\": [ \"foo\" ] } \ + ] \ + } \ + ] \ + }", + "(NOT (((userid = 42) OR (name = 'foo'))))", + }, + { + "{ \"and\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"name\": [ \"foo\" ] } \ + ] \ + }", + "((userid = 42) AND (name = 'foo'))", + }, + { + "{ \"and\": \ + [ \ + { \"not\": [ { \"userid\": [ 42 ] } ] }, \ + { \"name\": [ \"foo\" ] } \ + ] \ + }", + "((NOT ((userid = 42))) AND (name = 'foo'))", + }, + { + "{ \"not\": \ + [ \ + { \"and\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"name\": [ \"foo\" ] } \ + ] \ + } \ + ] \ + }", + "(NOT (((userid = 42) AND (name = 'foo'))))", + }, + { + "{ \"and\": \ + [ \ + { \"or\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"userid\": [ 43 ] } \ + ] \ + }, \ + { \"name\": [ \"foo\" ] } \ + ] \ + }", + "(((userid = 42) OR (userid = 43)) AND (name = 'foo'))", + }, + { + /* all the jobs in all states for a specific user */ + "{ \"and\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"states\": [ \"pending\", \"running\", \"inactive\" ] } \ + ] \ + }", + "((userid = 42) AND ((state & 126) > 0))", + }, + { + /* all the unsuccessful jobs for a specific user */ + "{ \"and\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"results\": [ \"failed\", \"canceled\", \"timeout\" ] } \ + ] \ + }", + "((userid = 42) AND ((result & 14) > 0))", + }, + { + /* all the pending and running jobs for a user, in two specific queues */ + "{ \"and\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"states\" : [ \"pending\", \"running\" ] }, \ + { \"queue\": [ \"batch\", \"debug\" ] } \ + ] \ + }", + "((userid = 42) AND ((state & 62) > 0) AND (queue = 'batch' OR queue = 'debug'))", + }, + { + /* jobs for a user, in queue batch, with specific job name, are running */ + "{ \"and\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"queue\": [ \"batch\" ] }, \ + { \"name\": [ \"foo\" ] }, \ + { \"states\": [ \"running\" ] } \ + ] \ + }", + "((userid = 42) AND (queue = 'batch') AND (name = 'foo') AND ((state & 48) > 0))", + }, + { + /* all the inactive jobs since a specific time (via t_inactve) */ + "{ \"and\": \ + [ \ + { \"states\": [ \"inactive\" ] }, \ + { \"t_inactive\": [ \">=500.0\" ] } \ + ] \ + }", + "(((state & 64) > 0) AND (t_inactive >= 500.0))", + }, + { + /* jobs for a user that ran on specific hostlist */ + /* N.B. "hostlist" can't be converted into query, so is dropped */ + "{ \"and\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"hostlist\": [ \"node1\", \"node2\" ] } \ + ] \ + }", + "((userid = 42))", + }, + { + /* jobs that ran on specific hostlist during a time period + */ + /* N.B. "hostlist" can't be converted into query, so is dropped */ + "{ \"and\": \ + [ \ + { \"hostlist\": [ \"node1\", \"node2\" ] }, \ + { \"t_run\": [ \">=500.0\" ] }, \ + { \"t_inactive\": [ \"<=5000.0\" ] } \ + ] \ + }", + "((t_run >= 500.0) AND (t_inactive <= 5000.0))", + }, + { + NULL, + NULL, + }, +}; + +static void run_constraint2sql_tests (void) +{ + struct constraint2sql_test *ltests = tests; + + while (ltests->constraint) { + test_constraint2sql (ltests->constraint, ltests->expected); + ltests++; + } +} + +int main (int argc, char *argv[]) +{ + plan (NO_PLAN); + + test_corner_case (); + test_special_cases (); + run_constraint2sql_tests (); + + done_testing (); +} + +/* + * vi:ts=4 sw=4 expandtab + */ From e4a8ad611b67c0aaa4c68275b856bbc23b4bce71 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Mon, 16 May 2022 16:49:08 -0700 Subject: [PATCH 7/9] job-list: read from sqlite if necessary Problem: Now that inactive jobs are stored to an sqlite database, it is possible to retrieve additional job information that is no longer stored in memory. Solution: When possible, read from sqlite to get additional job information about inactive jobs. --- src/modules/job-list/job_data.c | 2 + src/modules/job-list/job_data.h | 7 + src/modules/job-list/job_util.c | 5 +- src/modules/job-list/list.c | 356 +++++++++++++++++++++++++++++++- 4 files changed, 358 insertions(+), 12 deletions(-) diff --git a/src/modules/job-list/job_data.c b/src/modules/job-list/job_data.c index 66ab79a9ba6c..2c28064df62b 100644 --- a/src/modules/job-list/job_data.c +++ b/src/modules/job-list/job_data.c @@ -38,10 +38,12 @@ void job_destroy (void *data) hostlist_destroy (job->nodelist_hl); json_decref (job->annotations); grudgeset_destroy (job->dependencies); + json_decref (job->dependencies_db); json_decref (job->jobspec); json_decref (job->R); free (job->eventlog); json_decref (job->exception_context); + json_decref (job->job_dbdata); free (job); errno = save_errno; } diff --git a/src/modules/job-list/job_data.h b/src/modules/job-list/job_data.h index b473d0f0993d..3a6d6b6260e6 100644 --- a/src/modules/job-list/job_data.h +++ b/src/modules/job-list/job_data.h @@ -65,7 +65,11 @@ struct job { const char *exception_note; flux_job_result_t result; json_t *annotations; + /* dependencies - built up + * dependencies_db - recovered from db + */ struct grudgeset *dependencies; + json_t *dependencies_db; /* cache of job information */ json_t *jobspec; @@ -74,6 +78,9 @@ struct job { size_t eventlog_len; json_t *exception_context; + /* all job data from db */ + json_t *job_dbdata; + /* Track which states we have seen and have completed transition * to. States we've processed via the states_mask and states seen * via events stream in states_events_mask. diff --git a/src/modules/job-list/job_util.c b/src/modules/job-list/job_util.c index e4a4276a2487..695b168c6e67 100644 --- a/src/modules/job-list/job_util.c +++ b/src/modules/job-list/job_util.c @@ -205,7 +205,10 @@ static int store_attr (struct job *job, else if (streq (attr, "dependencies")) { if (!job->dependencies) return 0; - val = json_incref (grudgeset_tojson (job->dependencies)); + if (job->dependencies_db) + val = json_incref (job->dependencies_db); + else + val = json_incref (grudgeset_tojson (job->dependencies)); } else { errprintf (errp, "%s is not a valid attribute", attr); diff --git a/src/modules/job-list/list.c b/src/modules/job-list/list.c index e952e4fb0185..dcc7664f77be 100644 --- a/src/modules/job-list/list.c +++ b/src/modules/job-list/list.c @@ -15,6 +15,8 @@ #endif #include #include +#include +#include #include #include "src/common/libutil/errno_safe.h" @@ -29,6 +31,9 @@ #include "job_data.h" #include "match.h" #include "state_match.h" +#include "constraint_sql.h" +#include "job_db.h" +#include "util.h" json_t *get_job_by_id (struct job_state_ctx *jsctx, flux_error_t *errp, @@ -50,7 +55,8 @@ int get_jobs_from_list (json_t *jobs, int max_entries, json_t *attrs, double since, - struct list_constraint *c) + struct list_constraint *c, + double *t_inactive_last) { struct job *job; @@ -80,6 +86,8 @@ int get_jobs_from_list (json_t *jobs, errno = ENOMEM; return -1; } + if (t_inactive_last) + (*t_inactive_last) = job->t_inactive; if (json_array_size (jobs) == max_entries) return 1; } @@ -89,6 +97,268 @@ int get_jobs_from_list (json_t *jobs, return 0; } +struct job *sqliterow_2_job (struct job_state_ctx *jsctx, sqlite3_stmt *res) +{ + struct job *job = NULL; + const char *s; + int success; + int exception_occurred; + const char *ranks = NULL; + const char *nodelist = NULL; + + /* initially set job id to 0 */ + if (!(job = job_create (jsctx->h, 0))) + return NULL; + + /* most job fields will be handled by parsing it out of job data */ + + /* index 0 - id + * index 1 - userid + * index 2 - name + * index 3 - queue + * index 4 - state + * index 5 - result + * index 6 - nodelist + * index 7 - t_submit + * index 8 - t_depend + * index 9 - t_run + * index 10 - t_cleanup + * index 11 - t_inactive + */ + + s = (const char *)sqlite3_column_text (res, 12); + assert (s); + job->job_dbdata = json_loads (s, 0, NULL); + assert (job->job_dbdata); + + s = (const char *)sqlite3_column_text (res, 13); + assert (s); + job->eventlog = strdup (s); + assert (job->eventlog); + + s = (const char *)sqlite3_column_text (res, 14); + assert (s); + job->jobspec = json_loads (s, 0, NULL); + assert (job->jobspec); + + s = (const char *)sqlite3_column_text (res, 15); + if (s) { + job->R = json_loads (s, 0, NULL); + assert (job->R); + } + + if (json_unpack (job->job_dbdata, + "{s:I s:i s:i s?:I s:f s?:f s?:f s?:f s:f s:i s:i}", + "id", &job->id, + "userid", &job->userid, + "urgency", &job->urgency, + "priority", &job->priority, + "t_submit", &job->t_submit, + "t_depend", &job->t_depend, + "t_run", &job->t_run, + "t_cleanup", &job->t_cleanup, + "t_inactive", &job->t_inactive, + "state", &job->state, + "states_mask", &job->states_mask) < 0) { + flux_log (jsctx->h, LOG_ERR, "json_unpack"); + return NULL; + } + + if (json_unpack (job->job_dbdata, + "{s?:s s?:s s?:s s?:s s?:s}", + "name", &job->name, + "cwd", &job->cwd, + "queue", &job->queue, + "project", &job->project, + "bank", &job->bank, + "ntasks", &job->ntasks, + "ncores", &job->ncores, + "nnodes", &job->nnodes, + "ranks", &ranks, + "nodelist", &nodelist, + "expiration", &job->expiration, + "waitstatus", &job->wait_status, + "success", &success) < 0) { + flux_log (jsctx->h, LOG_ERR, "json_unpack"); + return NULL; + } + + /* N.B. success required for inactive job */ + if (json_unpack (job->job_dbdata, + "{s?:i s?:i s?:i s?:s s?:s s?:f s?:i s:b}", + "ntasks", &job->ntasks, + "ncores", &job->ncores, + "nnodes", &job->nnodes, + "ranks", &ranks, + "nodelist", &nodelist, + "expiration", &job->expiration, + "waitstatus", &job->wait_status, + "success", &success) < 0) { + flux_log (jsctx->h, LOG_ERR, "json_unpack"); + return NULL; + } + + /* N.B. result required for inactive job */ + if (json_unpack (job->job_dbdata, + "{s:b s?:s s?:i s?:s s:i s?:O s?:O}", + "exception_occurred", &exception_occurred, + "exception_type", &job->exception_type, + "exception_severity", &job->exception_severity, + "exception_note", &job->exception_note, + "result", &job->result, + "annotations", &job->annotations, + "dependencies", &job->dependencies_db) < 0) { + flux_log (jsctx->h, LOG_ERR, "json_unpack"); + return NULL; + } + + job->success = success; + job->exception_occurred = exception_occurred; + + if (ranks) { + job->ranks = strdup (ranks); + assert (job->ranks); + } + if (nodelist) { + job->nodelist = strdup (nodelist); + assert (job->nodelist); + } + + return job; +} + +int get_jobs_from_sqlite (struct job_state_ctx *jsctx, + json_t *jobs, + flux_error_t *errp, + int max_entries, + json_t *attrs, + double since, + struct list_constraint *c, + json_t *constraint, + double t_inactive_last) +{ + char *sql_query = NULL; + char *sql_select = "SELECT *"; + char *sql_from = "FROM jobs"; + char *sql_constraint = NULL; + char *sql_where = NULL; + char *sql_order = NULL; + sqlite3_stmt *res = NULL; + flux_error_t error; + int save_errno, rv = -1; + + if (!jsctx->ctx->dbctx) + return 0; + + if (constraint2sql (constraint, &sql_constraint, &error) < 0) { + flux_log (jsctx->h, LOG_ERR, "constraint2sql: %s", error.text); + goto out; + } + + /* N.B. timestamp precision largely depends on how the jansson + * library encodes timestamps in the eventlog. Trial and error + * indicates it averages 7 decimal places. Default "%f" is 6 + * decimal places and can lead to rounding errors in the SQL + * query. We'll up it to 9 decimal places to be on the safe side. + * + * It's possible on some systems / builds, this could not be the case. + */ + + if (since > 0.0) { + if (asprintf (&sql_where, + "WHERE t_inactive < %.9f AND t_inactive > %.9f%s%s", + t_inactive_last, + since, + sql_constraint ? " AND " : "", + sql_constraint ? sql_constraint : "") < 0) { + errno = ENOMEM; + goto out; + } + } + else { + if (asprintf (&sql_where, + "WHERE t_inactive < %.9f%s%s", + t_inactive_last, + sql_constraint ? " AND " : "", + sql_constraint ? sql_constraint : "") < 0) { + errno = ENOMEM; + goto out; + } + } + + if (max_entries) { + if (asprintf (&sql_order, + "ORDER BY t_inactive DESC LIMIT %d", + max_entries) < 0) { + errno = ENOMEM; + goto out; + } + } + else { + if (asprintf (&sql_order, "ORDER BY t_inactive DESC") < 0) { + errno = ENOMEM; + goto out; + } + } + + if (asprintf (&sql_query, + "%s %s %s %s", + sql_select, + sql_from, + sql_where, + sql_order) < 0) { + errno = ENOMEM; + goto out; + } + + if (sqlite3_prepare_v2 (jsctx->ctx->dbctx->db, + sql_query, + -1, + &res, + 0) != SQLITE_OK) { + log_sqlite_error (jsctx->ctx->dbctx, "sqlite3_prepare_v2"); + goto out; + } + + while (sqlite3_step (res) == SQLITE_ROW) { + struct job *job; + if (!(job = sqliterow_2_job (jsctx, res))) { + log_sqlite_error (jsctx->ctx->dbctx, "sqliterow_2_job"); + goto out; + } + if (job_match (job, c, errp)) { + json_t *o; + if (!(o = job_to_json (job, attrs, errp))) { + job_destroy (job); + goto out; + } + if (json_array_append_new (jobs, o) < 0) { + json_decref (o); + job_destroy (job); + errno = ENOMEM; + goto out; + } + if (json_array_size (jobs) == max_entries) { + job_destroy (job); + rv = 1; + goto out; + } + } + + job_destroy (job); + } + + rv = 0; + out: + save_errno = errno; + free (sql_query); + free (sql_where); + free (sql_order); + sqlite3_finalize (res); + errno = save_errno; + return rv; +} + /* Create a JSON array of 'job' objects. 'max_entries' determines the * max number of jobs to return, 0=unlimited. 'since' limits jobs returned * to those with t_inactive greater than timestamp. Returns JSON object @@ -103,6 +373,7 @@ json_t *get_jobs (struct job_state_ctx *jsctx, double since, json_t *attrs, struct list_constraint *c, + json_t *constraint, struct state_constraint *statec) { json_t *jobs = NULL; @@ -122,7 +393,8 @@ json_t *get_jobs (struct job_state_ctx *jsctx, max_entries, attrs, 0., - c)) < 0) + c, + NULL)) < 0) goto error; } @@ -134,21 +406,38 @@ json_t *get_jobs (struct job_state_ctx *jsctx, max_entries, attrs, 0., - c)) < 0) + c, + NULL)) < 0) goto error; } } if (state_match (FLUX_JOB_STATE_INACTIVE, statec)) { if (!ret) { + double t_inactive_last = DBL_MAX; + if ((ret = get_jobs_from_list (jobs, errp, jsctx->inactive, max_entries, attrs, since, - c)) < 0) + c, + &t_inactive_last)) < 0) goto error; + + if (!ret) { + if ((ret = get_jobs_from_sqlite (jsctx, + jobs, + errp, + max_entries, + attrs, + since, + c, + constraint, + t_inactive_last)) < 0) + goto error; + } } } @@ -336,7 +625,7 @@ void list_cb (flux_t *h, } if (!(jobs = get_jobs (ctx->jsctx, &err, max_entries, since, - attrs, c, statec))) + attrs, c, constraint, statec))) goto error; if (flux_respond_pack (h, msg, "{s:O}", "jobs", jobs) < 0) @@ -436,6 +725,49 @@ int check_id_valid (struct job_state_ctx *jsctx, return 0; } +struct job *get_job_by_id_sqlite (struct job_state_ctx *jsctx, + flux_error_t *errp, + const flux_msg_t *msg, + flux_jobid_t id, + json_t *attrs, + bool *stall) +{ + char *sql = "SELECT * FROM jobs WHERE id = ?;"; + sqlite3_stmt *res = NULL; + struct job *job = NULL; + struct job *rv = NULL; + + if (!jsctx->ctx->dbctx) + return NULL; + + if (sqlite3_prepare_v2 (jsctx->ctx->dbctx->db, + sql, + -1, + &res, + 0) != SQLITE_OK) { + log_sqlite_error (jsctx->ctx->dbctx, "sqlite3_prepare_v2"); + goto error; + } + + if (sqlite3_bind_int64 (res, 1, id) != SQLITE_OK) { + log_sqlite_error (jsctx->ctx->dbctx, "sqlite3_bind_int64"); + goto error; + } + + if (sqlite3_step (res) == SQLITE_ROW) { + if (!(job = sqliterow_2_job (jsctx, res))) { + log_sqlite_error (jsctx->ctx->dbctx, "sqliterow_2_job"); + goto error; + } + } + + rv = job; +error: + sqlite3_finalize (res); + return rv; +} + + /* Returns JSON object which the caller must free. On error, return * NULL with errno set: * @@ -454,14 +786,16 @@ json_t *get_job_by_id (struct job_state_ctx *jsctx, struct job *job; if (!(job = zhashx_lookup (jsctx->index, &id))) { - if (stall) { - if (check_id_valid (jsctx, msg, id, attrs, state) < 0) { - flux_log_error (jsctx->h, "%s: check_id_valid", __FUNCTION__); - return NULL; + if (!(job = get_job_by_id_sqlite (jsctx, errp, msg, id, attrs, stall))) { + if (stall) { + if (check_id_valid (jsctx, msg, id, attrs, state) < 0) { + flux_log_error (jsctx->h, "%s: check_id_valid", __FUNCTION__); + return NULL; + } + (*stall) = true; } - (*stall) = true; + return NULL; } - return NULL; } /* Always return job in inactive state, even if a requested state was From e9cf2c742f00b05f4d5d3f5bf13c597e1b61d4a9 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Fri, 13 May 2022 18:34:11 -0700 Subject: [PATCH 8/9] testsuite: update purge tests for job-list changes Problem: The job-list module now stores / recovers inactive job data from its internal database. Tests that assume job-list only stores jobs in memory do not account for this, leading to errors. Solution: Update tests to read the job-list module stats instead of using `flux jobs`. This gathers internal data to the job-list module, which allows purge tests to pass. --- t/t2809-job-purge.t | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/t2809-job-purge.t b/t/t2809-job-purge.t index f6ff0fc29b0f..1c8cd92203d4 100755 --- a/t/t2809-job-purge.t +++ b/t/t2809-job-purge.t @@ -13,7 +13,7 @@ inactive_count() { if test $how = "job-manager"; then flux module stats --parse=inactive_jobs job-manager elif test $how = "job-list"; then - flux jobs --no-header --filter=inactive|wc -l + flux module stats -p jobs.inactive job-list elif test $how = "job-list-stats"; then flux job stats | jq .job_states.inactive else From 975240f3b76d5633adc4fb5b120080f67b758d56 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 17 May 2022 14:57:47 -0700 Subject: [PATCH 9/9] testsuite: add job-list database tests Problem: There are no tests to cover the new job-list DB. Add coverage in new t/t2263-job-list-db.t. --- t/Makefile.am | 1 + t/t2263-job-list-db.t | 496 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 497 insertions(+) create mode 100755 t/t2263-job-list-db.t diff --git a/t/Makefile.am b/t/Makefile.am index bb3c31e81c02..b481cf5b55e8 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -155,6 +155,7 @@ TESTSCRIPTS = \ t2260-job-list.t \ t2261-job-list-update.t \ t2262-job-list-stats.t \ + t2263-job-list-db.t \ t2270-job-dependencies.t \ t2271-job-dependency-after.t \ t2272-job-begin-time.t \ diff --git a/t/t2263-job-list-db.t b/t/t2263-job-list-db.t new file mode 100755 index 000000000000..7006097c8188 --- /dev/null +++ b/t/t2263-job-list-db.t @@ -0,0 +1,496 @@ +#!/bin/sh + +test_description='Test flux job list db' + +. $(dirname $0)/job-list/job-list-helper.sh + +. $(dirname $0)/sharness.sh + +export FLUX_CONF_DIR=$(pwd) +if test -z "${TEST_UNDER_FLUX_ACTIVE}"; then + STATEDIR=$(mktemp -d) +fi +test_under_flux 4 job -o,-Sstatedir=${STATEDIR} + +RPC=${FLUX_BUILD_DIR}/t/request/rpc + +QUERYCMD="flux python ${FLUX_SOURCE_DIR}/t/scripts/sqlite-query.py" + +fj_wait_event() { + flux job wait-event --timeout=20 "$@" +} + +get_statedir_dbpath() { + statedir=`flux getattr statedir` && + echo ${statedir}/job-db.sqlite +} + +# count number of entries in database +# arg1 - database path +db_count_entries() { + local dbpath=$1 + query="select id from jobs;" + count=`${QUERYCMD} -t 10000 ${dbpath} "${query}" | grep "^id =" | wc -l` + echo $count +} + +# verify entries stored in database +# arg1 - jobid +# arg2 - database path +db_check_entries() { + local id=$(flux job id $1) + local dbpath=$2 + query="select * from jobs where id=$id;" + ${QUERYCMD} -t 10000 ${dbpath} "${query}" > query.out + if grep -q "^id = " query.out \ + && grep -q "t_inactive = " query.out \ + && grep -q "jobdata = " query.out \ + && grep -q "eventlog = " query.out \ + && grep -q "jobspec = " query.out \ + && grep -q "R = " query.out + then + return 0 + fi + return 1 +} + +# get job values via job list +# arg1 - jobid +# arg2 - database path +get_job_list_values() { + local id=$(flux job id $1) + jobdata=`flux job list-ids ${id}` + list_userid=`echo ${jobdata} | jq ".userid"` + list_urgency=`echo ${jobdata} | jq ".urgency"` + list_priority=`echo ${jobdata} | jq ".priority"` + list_state=`echo ${jobdata} | jq ".state"` + list_ranks=`echo ${jobdata} | jq ".ranks"` + list_nnodes=`echo ${jobdata} | jq ".nnodes"` + list_nodelist=`echo ${jobdata} | jq -r ".nodelist"` + list_ntasks=`echo ${jobdata} | jq ".ntasks"` + list_ncores=`echo ${jobdata} | jq ".ncores"` + list_name=`echo ${jobdata} | jq -r ".name"` + list_cwd=`echo ${jobdata} | jq -r ".cwd"` + list_queue=`echo ${jobdata} | jq -r ".queue"` + list_project=`echo ${jobdata} | jq -r ".project"` + list_bank=`echo ${jobdata} | jq -r ".bank"` + list_waitstatus=`echo ${jobdata} | jq ".waitstatus"` + list_success=`echo ${jobdata} | jq ".success"` + list_result=`echo ${jobdata} | jq ".result"` + list_expiration=`echo ${jobdata} | jq ".expiration"` + list_annotations=`echo ${jobdata} | jq ".annotations"` + list_dependencies=`echo ${jobdata} | jq ".dependencies"` + list_exception_occurred=`echo ${jobdata} | jq ".exception_occurred"` + list_exception_type=`echo ${jobdata} | jq -r ".exception_type"` + list_exception_severity=`echo ${jobdata} | jq ".exception_severity"` + list_exception_note=`echo ${jobdata} | jq -r ".exception_note"` + list_t_submit=`echo ${jobdata} | jq ".t_submit"` + list_t_depend=`echo ${jobdata} | jq ".t_depend"` + list_t_run=`echo ${jobdata} | jq ".t_run"` + list_t_cleanup=`echo ${jobdata} | jq ".t_cleanup"` + list_t_inactive=`echo ${jobdata} | jq ".t_inactive"` +} + +# get job values from database +# arg1 - jobid +# arg2 - database path +get_db_values() { + local id=$(flux job id $1) + local dbpath=$2 + query="select * from jobs where id=$id;" + ${QUERYCMD} -t 10000 ${dbpath} "${query}" > query.out + db_jobdata=`grep "jobdata = " query.out | cut -f3- -d' '` + db_eventlog=`grep "eventlog = " query.out | awk '{print \$3}'` + db_jobspec=`grep "jobspec = " query.out | awk '{print \$3}'` + db_R=`grep "R = " query.out | awk '{print \$3}'` + db_userid=`echo ${jobdata} | jq ".userid"` + db_urgency=`echo ${jobdata} | jq ".urgency"` + db_priority=`echo ${jobdata} | jq ".priority"` + db_state=`echo ${jobdata} | jq ".state"` + db_ranks=`echo ${jobdata} | jq ".ranks"` + db_nnodes=`echo ${jobdata} | jq ".nnodes"` + db_nodelist=`echo ${jobdata} | jq -r ".nodelist"` + db_ntasks=`echo ${jobdata} | jq ".ntasks"` + db_ncores=`echo ${jobdata} | jq ".ncores"` + db_name=`echo ${jobdata} | jq -r ".name"` + db_cwd=`echo ${jobdata} | jq -r ".cwd"` + db_queue=`echo ${jobdata} | jq -r ".queue"` + db_project=`echo ${jobdata} | jq -r ".project"` + db_bank=`echo ${jobdata} | jq -r ".bank"` + db_waitstatus=`echo ${jobdata} | jq ".waitstatus"` + db_success=`echo ${jobdata} | jq ".success"` + db_result=`echo ${jobdata} | jq ".result"` + db_expiration=`echo ${jobdata} | jq ".expiration"` + db_annotations=`echo ${jobdata} | jq ".annotations"` + db_dependencies=`echo ${jobdata} | jq ".dependencies"` + db_exception_occurred=`echo ${jobdata} | jq ".exception_occurred"` + db_exception_type=`echo ${jobdata} | jq -r ".exception_type"` + db_exception_severity=`echo ${jobdata} | jq ".exception_severity"` + db_exception_note=`echo ${jobdata} | jq -r ".exception_note"` + db_t_submit=`echo ${jobdata} | jq ".t_submit"` + db_t_depend=`echo ${jobdata} | jq ".t_depend"` + db_t_run=`echo ${jobdata} | jq ".t_run"` + db_t_cleanup=`echo ${jobdata} | jq ".t_cleanup"` + db_t_inactive=`echo ${jobdata} | jq ".t_inactive"` +} + +# compare data from job list and job db +# arg1 - job id +# arg2 - dbpath +db_compare_data() { + local id=$(flux job id $1) + local dbpath=$2 + get_job_list_values ${id} + get_db_values ${id} ${dbpath} + if [ "${list_userid}" != "${db_userid}" ] \ + || [ "${list_urgency}" != "${db_urgency}" ] \ + || [ "${list_priority}" != "${db_priority}" ] \ + || [ "${list_state}" != "${db_state}" ] \ + || [ "${list_ranks}" != "${db_ranks}" ] \ + || [ "${list_nnodes}" != "${db_nnodes}" ] \ + || [ "${list_nodelist}" != "${db_nodelist}" ] \ + || [ "${list_ntasks}" != "${db_ntasks}" ] \ + || [ "${list_ncores}" != "${db_ncores}" ] \ + || [ "${list_name}" != "${db_name}" ] \ + || [ "${list_cwd}" != "${db_cwd}" ] \ + || [ "${list_queue}" != "${db_queue}" ] \ + || [ "${list_project}" != "${db_project}" ] \ + || [ "${list_bank}" != "${db_bank}" ] \ + || [ "${list_waitstatus}" != "${db_waitstatus}" ] \ + || [ "${list_success}" != "${db_success}" ] \ + || [ "${list_result}" != "${db_result}" ] \ + || [ "${list_expiration}" != "${db_expiration}" ] \ + || [ "${list_annotations}" != "${db_annotations}" ] \ + || [ "${list_dependencies}" != "${db_dependencies}" ] \ + || [ "${list_exception_occurred}" != "${db_exception_occurred}" ] \ + || [ "${list_exception_type}" != "${db_exception_type}" ] \ + || [ "${list_exception_severity}" != "${db_exception_severity}" ] \ + || [ "${list_exception_note}" != "${db_exception_note}" ] \ + || [ "${list_t_submit}" != "${db_t_submit}" ] \ + || [ "${list_t_depend}" != "${db_t_depend}" ] \ + || [ "${list_t_run}" != "${db_t_run}" ] \ + || [ "${list_t_cleanup}" != "${db_t_cleanup}" ] \ + || [ "${list_t_inactive}" != "${db_t_inactive}" ] + then + return 1 + fi + return 0 +} + +# wait for inactive job list to reach expected count +# arg1 - expected final count +# Usage: wait_inactive_count method target tries +# where method is job-manager, job-list, or job-list-stats (jq required) +wait_inactive_list_count() { + local target=$1 + local tries=50 + local count + while test $tries -gt 0; do + count=$(flux module stats -p jobs.inactive job-list) + test $count -eq $target && return 0 + sleep 0.25 + tries=$(($tries-1)) + done + return 1 +} + +# submit jobs for job list & job-list db testing + +test_expect_success 'configure testing queues' ' + flux config load <<-EOF && +[policy] +jobspec.defaults.system.queue = "defaultqueue" +[queues.defaultqueue] +[queues.altqueue] +EOF + flux queue start --all +' + +test_expect_success 'submit jobs for job list testing' ' + # Create `hostname` jobspec + # N.B. Used w/ `flux job submit` for serial job submission + # for efficiency (vs serial `flux submit`. + # + flux submit --dry-run hostname >hostname.json && + # + # submit jobs that will complete + # + for i in $(seq 0 3); do + flux job submit hostname.json >> inactiveids + fj_wait_event `tail -n 1 inactiveids` clean + done && + # + # Currently all inactive ids are "completed" + # + tac inactiveids | flux job id > completed.ids && + # + # Hold a job and cancel it, ensuring it never gets resources + # + jobid=`flux submit --urgency=hold /bin/true` && + flux cancel $jobid && + echo $jobid >> inactiveids && + # + # Run a job that will fail, copy its JOBID to both inactive and + # failed lists. + # + ! jobid=`flux submit --wait nosuchcommand` && + echo $jobid >> inactiveids && + tac inactiveids | flux job id > inactive.ids && + cat inactive.ids > all.ids && + # + # The job-list module has eventual consistency with the jobs stored in + # the job-manager queue. To ensure no raciness in tests, ensure + # jobs above have reached expected states in job-list before continuing. + # + flux job list-ids --wait-state=inactive $(job_list_state_ids inactive) > /dev/null +' + +test_expect_success 'flux job list inactive jobs (pre purge)' ' + flux job list -s inactive | jq .id > listI.out && + test_cmp listI.out inactive.ids +' + +test_expect_success 'flux job list inactive jobs w/ count (pre purge)' ' + count=$(job_list_state_count inactive) && + count=$((count - 2)) && + flux job list -s inactive -c ${count} | jq .id > listI_count.out && + head -n ${count} inactive.ids > listI_count.exp && + test_cmp listI_count.out listI_count.exp +' + +test_expect_success 'flux job list-inactive jobs (pre purge)' ' + flux job list-inactive | jq .id > list_inactive.out && + test_cmp list_inactive.out inactive.ids +' + +test_expect_success 'flux job list-inactive jobs w/ count (pre purge)' ' + count=$(job_list_state_count inactive) && + count=$((count - 1)) && + flux job list-inactive -c ${count} | jq .id > list_inactive_count.out && + head -n ${count} inactive.ids > list_inactive_count.exp && + test_cmp list_inactive_count.out list_inactive_count.exp +' + +test_expect_success 'flux job list-inactive jobs w/ since 1 (pre purge)' ' + timestamp=`flux job list -s inactive | head -n 2 | tail -n 1 | jq .t_inactive` && + flux job list-inactive --since=${timestamp} | jq .id > list_inactive_since1.out && + head -n 1 inactive.ids > list_inactive_since1.exp && + test_cmp list_inactive_since1.out list_inactive_since1.exp +' + +test_expect_success 'flux job list-inactive jobs w/ since 2 (pre purge)' ' + count=$(job_list_state_count inactive) && + count=$((count - 1)) && + timestamp=`flux job list -s inactive | tail -n 1 | jq .t_inactive` && + flux job list-inactive --since=${timestamp} | jq .id > list_inactive_since2.out && + head -n ${count} inactive.ids > list_inactive_since2.exp && + test_cmp list_inactive_since2.out list_inactive_since2.exp +' + +test_expect_success 'flux job list-ids jobs (pre purge)' ' + for id in `cat inactive.ids` + do + flux job list-ids ${id} | jq -e ".id == ${id}" + done +' + +test_expect_success 'flux job list has all inactive jobs cached' ' + cached=`flux module stats -p jobs.inactive job-list` && + test ${cached} -eq $(job_list_state_count inactive) +' + +test_expect_success 'job-list db: db stored in statedir' ' + dbpath=$(get_statedir_dbpath) && + ls ${dbpath} +' + +test_expect_success 'job-list db: has correct number of entries' ' + dbpath=$(get_statedir_dbpath) && + entries=$(db_count_entries ${dbpath}) && + test ${entries} -eq $(job_list_state_count inactive) +' + +test_expect_success 'job-list db: make sure job data looks ok' ' + dbpath=$(get_statedir_dbpath) && + for id in `cat list_inactive.out` + do + db_check_entries ${id} ${dbpath} + done +' + +test_expect_success 'job-list db: make sure job data correct' ' + dbpath=$(get_statedir_dbpath) && + for id in `cat list_inactive.out` + do + db_compare_data ${id} ${dbpath} + done +' + +test_expect_success 'reload the job-list module' ' + flux module reload job-list +' + +test_expect_success 'job-list db: still has correct number of entries' ' + dbpath=$(get_statedir_dbpath) && + entries=$(db_count_entries ${dbpath}) && + test ${entries} -eq $(job_list_state_count inactive) +' + +test_expect_success 'job-list db: purge 2 jobs' ' + len=$(job_list_state_count inactive) && + len=$((len - 2)) && + flux job purge --force --num-limit=${len} && + mv inactive.ids inactive.ids.orig && + head -n ${len} inactive.ids.orig > inactive.ids && + wait_inactive_list_count ${len} +' + +test_expect_success 'flux job list inactive jobs (post purge)' ' + flux job list -s inactive | jq .id > listIPP.out && + test_cmp listIPP.out inactive.ids.orig +' + +test_expect_success 'flux job list inactive jobs w/ count (post purge)' ' + count=$(cat inactive.ids.orig | wc -l) && + count=$((count - 2)) && + flux job list -s inactive -c ${count} | jq .id > listI_countPP.out && + head -n ${count} inactive.ids.orig > listI_countPP.exp && + test_cmp listI_countPP.out listI_countPP.exp +' + +test_expect_success 'flux job list-inactive jobs (post purge)' ' + flux job list-inactive | jq .id > list_inactivePP.out && + test_cmp list_inactivePP.out inactive.ids.orig +' + +test_expect_success 'flux job list-inactive jobs w/ count (post purge)' ' + count=$(cat inactive.ids.orig | wc -l) && + count=$((count - 1)) && + flux job list-inactive -c ${count} | jq .id > list_inactive_countPP.out && + head -n ${count} inactive.ids.orig > list_inactive_countPP.exp && + test_cmp list_inactive_countPP.out list_inactive_countPP.exp +' + +test_expect_success 'flux job list-inactive jobs w/ since 1 (post purge)' ' + timestamp=`flux job list -s inactive | head -n 2 | tail -n 1 | jq .t_inactive` && + flux job list-inactive --since=${timestamp} | jq .id > list_inactive_since1PP.out && + head -n 1 inactive.ids.orig > list_inactive_since1PP.exp && + test_cmp list_inactive_since1PP.out list_inactive_since1PP.exp +' + +test_expect_success 'flux job list-inactive jobs w/ since 2 (post purge)' ' + count=$(cat inactive.ids.orig | wc -l) && + count=$((count - 1)) && + timestamp=`flux job list -s inactive | tail -n 1 | jq .t_inactive` && + flux job list-inactive --since=${timestamp} | jq .id > list_inactive_since2PP.out && + head -n ${count} inactive.ids.orig > list_inactive_since2PP.exp && + test_cmp list_inactive_since2PP.out list_inactive_since2PP.exp +' + +test_expect_success 'flux job list-ids jobs (post purge)' ' + for id in `cat inactive.ids.orig` + do + flux job list-ids ${id} | jq -e ".id == ${id}" + done +' + +test_expect_success 'job-list db: purge all jobs' ' + len=$(job_list_state_count inactive) && + flux job purge --force --num-limit=0 && + : > inactive.ids && + wait_inactive_list_count 0 +' + +test_expect_success 'flux job list inactive jobs (all purge)' ' + flux job list -s inactive | jq .id > listI3.out && + test_cmp listI3.out inactive.ids.orig +' + +test_expect_success 'flux job list inactive jobs w/ count (all purge)' ' + count=$(cat inactive.ids.orig | wc -l) && + count=$((count - 2)) && + flux job list -s inactive -c ${count} | jq .id > listI_countAP.out && + head -n ${count} inactive.ids.orig > listI_countAP.exp && + test_cmp listI_countAP.out listI_countAP.exp +' + +test_expect_success 'flux job list-inactive jobs (all purge)' ' + flux job list-inactive | jq .id > list_inactiveAP.out && + test_cmp list_inactiveAP.out inactive.ids.orig +' + +test_expect_success 'flux job list-inactive jobs w/ count (all purge)' ' + count=$(cat inactive.ids.orig | wc -l) && + count=$((count - 1)) && + flux job list-inactive -c ${count} | jq .id > list_inactive_countAP.out && + head -n ${count} inactive.ids.orig > list_inactive_countAP.exp && + test_cmp list_inactive_countAP.out list_inactive_countAP.exp +' + +test_expect_success 'flux job list-inactive jobs w/ since 1 (all purge)' ' + timestamp=`flux job list -s inactive | head -n 2 | tail -n 1 | jq .t_inactive` && + flux job list-inactive --since=${timestamp} | jq .id > list_inactive_since1AP.out && + head -n 1 inactive.ids.orig > list_inactive_since1AP.exp && + test_cmp list_inactive_since1AP.out list_inactive_since1AP.exp +' + +test_expect_success 'flux job list-inactive jobs w/ since 2 (all purge)' ' + count=$(cat inactive.ids.orig | wc -l) && + count=$((count - 1)) && + timestamp=`flux job list -s inactive | tail -n 1 | jq .t_inactive` && + flux job list-inactive --since=${timestamp} | jq .id > list_inactive_since2AP.out && + head -n ${count} inactive.ids.orig > list_inactive_since2AP.exp && + test_cmp list_inactive_since2AP.out list_inactive_since2AP.exp +' + +test_expect_success 'flux job list-ids jobs (all purge)' ' + for id in `cat inactive.ids.orig` + do + flux job list-ids ${id} | jq -e ".id == ${id}" + done +' + +test_expect_success 'flux jobs gets all jobs with various constraints' ' + countA=$(cat inactive.ids.orig | wc -l) && + countB=$(flux jobs -n -a | wc -l) && + countC=$(flux jobs -n -a --queue=defaultqueue | wc -l) && + countD=$(flux jobs -n -a --user=$USER | wc -l) && + countE=$(flux jobs -n -a --since=-1h | wc -l) && + countF=$(flux jobs -n --filter=pending,running,inactive | wc -l) && + test $countA = $countB && + test $countA = $countC && + test $countA = $countD && + test $countA = $countE && + test $countA = $countF +' + +test_expect_success 'flux job db stats works' ' + ${RPC} job-list.db-stats 0 < /dev/null +' + +test_expect_success 'reload the job-list module with alternate config' ' + statedir=`flux getattr statedir` && + cat >job-list.toml <