diff --git a/src/modules/Makefile.am b/src/modules/Makefile.am index 3dd5a3628ceb..26a94994b2f4 100644 --- a/src/modules/Makefile.am +++ b/src/modules/Makefile.am @@ -190,7 +190,8 @@ job_list_la_SOURCES = job_list_la_CPPFLAGS = \ $(AM_CPPFLAGS) \ $(FLUX_SECURITY_CFLAGS) \ - $(HWLOC_CFLAGS) + $(HWLOC_CFLAGS) \ + $(SQLITE_CFLAGS) job_list_la_LIBADD = \ $(builddir)/job-list/libjob-list.la \ $(top_builddir)/src/common/libjob/libjob.la \ @@ -199,7 +200,8 @@ job_list_la_LIBADD = \ $(top_builddir)/src/common/libflux-optparse.la \ $(top_builddir)/src/common/librlist/librlist.la \ $(JANSSON_LIBS) \ - $(HWLOC_LIBS) + $(HWLOC_LIBS) \ + $(SQLITE_LIBS) job_list_la_LDFLAGS = $(fluxmod_ldflags) -module job_ingest_la_SOURCES = diff --git a/src/modules/job-list/Makefile.am b/src/modules/job-list/Makefile.am index 7278d7b9f812..86c500c185fa 100644 --- a/src/modules/job-list/Makefile.am +++ b/src/modules/job-list/Makefile.am @@ -22,6 +22,8 @@ libjob_list_la_SOURCES = \ job_state.c \ job_data.h \ job_data.c \ + job_db.h \ + job_db.c \ list.h \ list.c \ job_util.h \ @@ -35,12 +37,17 @@ libjob_list_la_SOURCES = \ state_match.h \ state_match.c \ match_util.h \ - match_util.c + match_util.c \ + constraint_sql.h \ + constraint_sql.c \ + util.h \ + util.c TESTS = \ test_job_data.t \ test_match.t \ - test_state_match.t + test_state_match.t \ + test_constraint_sql.t test_ldadd = \ $(builddir)/libjob-list.la \ @@ -89,6 +96,14 @@ test_state_match_t_LDADD = \ test_state_match_t_LDFLAGS = \ $(test_ldflags) +test_constraint_sql_t_SOURCES = test/constraint_sql.c +test_constraint_sql_t_CPPFLAGS = \ + $(test_cppflags) +test_constraint_sql_t_LDADD = \ + $(test_ldadd) +test_constraint_sql_t_LDFLAGS = \ + $(test_ldflags) + EXTRA_DIST = \ test/R/1node_1core.R \ test/R/1node_4core.R \ diff --git a/src/modules/job-list/config.c b/src/modules/job-list/config.c new file mode 100644 index 000000000000..d421cfd9d7ba --- /dev/null +++ b/src/modules/job-list/config.c @@ -0,0 +1,105 @@ +static void config_reload_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct conf *conf = arg; + const flux_conf_t *instance_conf; + struct conf_callback *ccb; + flux_error_t error; + const char *errstr = NULL; + + if (flux_conf_reload_decode (msg, &instance_conf) < 0) + goto error; + if (policy_validate (instance_conf, &error) < 0) { + errstr = error.text; + goto error; + } + ccb = zlistx_first (conf->callbacks); + while (ccb) { + if (ccb->cb (instance_conf, &error, ccb->arg) < 0) { + errstr = error.text; + errno = EINVAL; + goto error; + } + ccb = zlistx_next (conf->callbacks); + } + if (flux_set_conf (h, flux_conf_incref (instance_conf)) < 0) { + errstr = "error updating cached configuration"; + flux_conf_decref (instance_conf); + goto error; + } + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "error responding to config-reload request"); + return; +error: + if (flux_respond_error (h, msg, errno, errstr) < 0) + flux_log_error (h, "error responding to config-reload request"); +} + +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "job-manager.config-reload", config_reload_cb, 0 }, + FLUX_MSGHANDLER_TABLE_END, +}; + + + + + +static int process_config (struct kvs_ctx *ctx) +{ + flux_error_t error; + if (kvs_checkpoint_config_parse (ctx->kcp, + flux_get_conf (ctx->h), + &error) < 0) { + flux_log (ctx->h, LOG_ERR, "%s", error.text); + return -1; + } + return 0; +} + + + +static int checkpoint_period_parse (const flux_conf_t *conf, + flux_error_t *errp, + double *checkpoint_period) +{ + flux_error_t error; + const char *str = NULL; + + if (flux_conf_unpack (conf, + &error, + "{s?{s?s}}", + "kvs", + "checkpoint-period", &str) < 0) { + errprintf (errp, + "error reading config for kvs: %s", + error.text); + return -1; + } + + if (str) { + if (fsd_parse_duration (str, checkpoint_period) < 0) { + errprintf (errp, + "invalid checkpoint-period config: %s", + str); + return -1; + } + } + + return 0; +} + +int kvs_checkpoint_config_parse (kvs_checkpoint_t *kcp, + const flux_conf_t *conf, + flux_error_t *errp) +{ + if (kcp) { + double checkpoint_period = kcp->checkpoint_period; + if (checkpoint_period_parse (conf, errp, &checkpoint_period) < 0) + return -1; + kcp->checkpoint_period = checkpoint_period; + } + return 0; +} + diff --git a/src/modules/job-list/constraint_sql.c b/src/modules/job-list/constraint_sql.c new file mode 100644 index 000000000000..be6cb163cb5e --- /dev/null +++ b/src/modules/job-list/constraint_sql.c @@ -0,0 +1,342 @@ +/************************************************************\ + * Copyright 2023 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include +#include + +#include "src/common/libutil/errprintf.h" +#include "ccan/str/str.h" + +#include "constraint_sql.h" +#include "match_util.h" + +static int create_userid_query (json_t *values, char **query, flux_error_t *errp) +{ + json_t *entry; + size_t index; + char *q = NULL; + json_array_foreach (values, index, entry) { + uint32_t userid; + if (!json_is_integer (entry)) { + errprintf (errp, "userid value must be an integer"); + goto error; + } + userid = json_integer_value (entry); + /* special case FLUX_USERID_UNKNOWN matches all, so no query result */ + if (userid == FLUX_USERID_UNKNOWN) + continue; + if (!q) { + if (asprintf (&q, "userid = %u", userid) < 0) { + errno = ENOMEM; + goto error; + } + } + else { + char *tmp; + if (asprintf (&tmp, "%s OR userid = %u", q, userid) < 0) { + errno = ENOMEM; + goto error; + } + free (q); + q = tmp; + } + } + (*query) = q; + return 0; +error: + free (q); + return -1; +} + +static int create_string_query (json_t *values, + const char *op, + char **query, + flux_error_t *errp) +{ + json_t *entry; + size_t index; + char *q = NULL; + json_array_foreach (values, index, entry) { + const char *str; + if (!json_is_string (entry)) { + errprintf (errp, "%s value must be a string", op); + goto error; + } + str = json_string_value (entry); + if (!q) { + if (asprintf (&q, "%s = '%s'", op, str) < 0) { + errno = ENOMEM; + goto error; + } + } + else { + char *tmp; + if (asprintf (&tmp, "%s OR %s = '%s'", q, op, str) < 0) { + errno = ENOMEM; + goto error; + } + free (q); + q = tmp; + } + } + (*query) = q; + return 0; +error: + free (q); + return -1; +} + +static int create_name_query (json_t *values, + char **query, + flux_error_t *errp) +{ + return create_string_query (values, "name", query, errp); +} + +static int create_queue_query (json_t *values, + char **query, + flux_error_t *errp) +{ + return create_string_query (values, "queue", query, errp); +} + +static int create_bitmask_query (const char *col, + json_t *values, + array_to_bitmask_f array_to_bitmask_cb, + char **query, + flux_error_t *errp) +{ + char *q = NULL; + int tmp; + if ((tmp = array_to_bitmask_cb (values, errp)) < 0) + return -1; + if (asprintf (&q, "(%s & %d) > 0", col, tmp) < 0) { + errno = ENOMEM; + return -1; + } + (*query) = q; + return 0; +} + +static int create_states_query (json_t *values, + char **query, + flux_error_t *errp) +{ + return create_bitmask_query ("state", + values, + array_to_states_bitmask, + query, + errp); +} + +static int create_results_query (json_t *values, + char **query, + flux_error_t *errp) +{ + return create_bitmask_query ("result", + values, + array_to_results_bitmask, + query, + errp); +} + +static int create_timestamp_query (const char *type, + json_t *values, + char **query, + flux_error_t *errp) +{ + const char *str; + const char *comp; + char *q = NULL; + double t; + char *endptr; + json_t *v = json_array_get (values, 0); + + if (!v) { + errprintf (errp, "timestamp value not specified"); + return -1; + } + if (!json_is_string (v)) { + errprintf (errp, "%s value must be a string", type); + return -1; + } + str = json_string_value (v); + if (strstarts (str, ">=")) { + comp = ">="; + str += 2; + } + else if (strstarts (str, "<=")) { + comp = "<="; + str += 2; + } + else if (strstarts (str, ">")) { + comp = ">"; + str +=1; + } + else if (strstarts (str, "<")) { + comp = "<"; + str += 1; + } + else { + errprintf (errp, "timestamp comparison operator not specified"); + return -1; + } + + errno = 0; + t = strtod (str, &endptr); + if (errno != 0 || *endptr != '\0') { + errprintf (errp, "Invalid timestamp value specified"); + return -1; + } + if (t < 0.0) { + errprintf (errp, "timestamp value must be >= 0.0"); + return -1; + } + + if (asprintf (&q, "%s %s %s", type, comp, str) < 0) { + errno = ENOMEM; + return -1; + } + + (*query) = q; + return 0; +} + +static int conditional_query (const char *type, + json_t *values, + char **query, + flux_error_t *errp) +{ + char *q = NULL; + char *cond; + json_t *entry; + size_t index; + + if (streq (type, "and")) + cond = "AND"; + else if (streq (type, "or")) + cond = "OR"; + else /* streq (type, "not") */ + /* we will "NOT" it at the end */ + cond = "AND"; + + json_array_foreach (values, index, entry) { + char *subquery; + if (constraint2sql (entry, &subquery, errp) < 0) + goto error; + if (!q) + q = subquery; + else if (subquery) { + char *tmp; + if (asprintf (&tmp, "%s %s %s", q, cond, subquery) < 0) { + free (subquery); + errno = ENOMEM; + goto error; + } + free (q); + q = tmp; + } + } + if (q && streq (type, "not")) { + char *tmp; + if (asprintf (&tmp, "NOT (%s)", q) < 0) { + errno = ENOMEM; + goto error; + } + free (q); + q = tmp; + } + (*query) = q; + return 0; + +error: + free (q); + return -1; +} + +int constraint2sql (json_t *constraint, char **query, flux_error_t *errp) +{ + char *q = NULL; + char *rv = NULL; + const char *op; + json_t *values; + + if (!query) + return -1; + + if (!constraint) + return 0; + + if (!json_is_object (constraint)) { + errprintf (errp, "constraint must be JSON object"); + return -1; + } + if (json_object_size (constraint) > 1) { + errprintf (errp, "constraint must only contain 1 element"); + return -1; + } + json_object_foreach (constraint, op, values) { + int ret; + if (!json_is_array (values)) { + errprintf (errp, "operator %s values not an array", op); + goto error; + } + if (streq (op, "userid")) + ret = create_userid_query (values, &q, errp); + else if (streq (op, "name")) + ret = create_name_query (values, &q, errp); + else if (streq (op, "queue")) + ret = create_queue_query (values, &q, errp); + else if (streq (op, "states")) + ret = create_states_query (values, &q, errp); + else if (streq (op, "results")) + ret = create_results_query (values, &q, errp); + else if (streq (op, "hostlist")) { + /* no hostlist column, no conversion to be done */ + ret = 0; + } + else if (streq (op, "t_submit") + || streq (op, "t_depend") + || streq (op, "t_run") + || streq (op, "t_cleanup") + || streq (op, "t_inactive")) + ret = create_timestamp_query (op, values, &q, errp); + else if (streq (op, "or") || streq (op, "and") || streq (op, "not")) + ret = conditional_query (op, values, &q, errp); + else { + errprintf (errp, "unknown constraint operator: %s", op); + goto error; + } + if (ret < 0) + goto error; + } + if (q) { + if (asprintf (&rv, "(%s)", q) < 0) { + errno = ENOMEM; + goto error; + } + free (q); + } + (*query) = rv; + return 0; + +error: + free (q); + return -1; +} + +/* vi: ts=4 sw=4 expandtab + */ diff --git a/src/modules/job-list/constraint_sql.h b/src/modules/job-list/constraint_sql.h new file mode 100644 index 000000000000..c46134ef475b --- /dev/null +++ b/src/modules/job-list/constraint_sql.h @@ -0,0 +1,23 @@ +/************************************************************\ + * Copyright 2023 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef HAVE_JOB_LIST_CONSTRAINT_SQL_H +#define HAVE_JOB_LIST_CONSTRAINT_SQL_H 1 + +#if HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include + +int constraint2sql (json_t *constraint, char **query, flux_error_t *errp); + +#endif /* !HAVE_JOB_LIST_MATCH_H */ diff --git a/src/modules/job-list/job-list.c b/src/modules/job-list/job-list.c index 476553b9e4f3..5525ed8d6fa6 100644 --- a/src/modules/job-list/job-list.c +++ b/src/modules/job-list/job-list.c @@ -210,6 +210,8 @@ static void list_ctx_destroy (struct list_ctx *ctx) flux_msglist_destroy (ctx->deferred_requests); if (ctx->jsctx) job_state_destroy (ctx->jsctx); + if (ctx->dbctx) + job_db_ctx_destroy (ctx->dbctx); if (ctx->isctx) idsync_ctx_destroy (ctx->isctx); if (ctx->mctx) @@ -219,7 +221,7 @@ static void list_ctx_destroy (struct list_ctx *ctx) } } -static struct list_ctx *list_ctx_create (flux_t *h) +static struct list_ctx *list_ctx_create (flux_t *h, int argc, char **argv) { struct list_ctx *ctx = calloc (1, sizeof (*ctx)); if (!ctx) @@ -231,6 +233,12 @@ static struct list_ctx *list_ctx_create (flux_t *h) goto error; if (!(ctx->isctx = idsync_ctx_create (ctx->h))) goto error; + /* job_db_setup() performs a job_db_ctx_create() and some + * initialization */ + if (!(ctx->dbctx = job_db_setup (h, argc, argv))) { + if (errno != ENOTBLK) + goto error; + } if (!(ctx->jsctx = job_state_create (ctx))) goto error; if (!(ctx->deferred_requests = flux_msglist_create ())) @@ -248,7 +256,7 @@ int mod_main (flux_t *h, int argc, char **argv) struct list_ctx *ctx; int rc = -1; - if (!(ctx = list_ctx_create (h))) { + if (!(ctx = list_ctx_create (h, argc, argv))) { flux_log_error (h, "initialization error"); goto done; } diff --git a/src/modules/job-list/job-list.h b/src/modules/job-list/job-list.h index 6a239275171d..a0680b3de828 100644 --- a/src/modules/job-list/job-list.h +++ b/src/modules/job-list/job-list.h @@ -16,6 +16,7 @@ #include "src/common/libczmqcontainers/czmq_containers.h" #include "job_state.h" +#include "job_db.h" #include "idsync.h" #include "match.h" @@ -23,6 +24,7 @@ struct list_ctx { flux_t *h; flux_msg_handler_t **handlers; struct job_state_ctx *jsctx; + struct job_db_ctx *dbctx; struct idsync_ctx *isctx; struct flux_msglist *deferred_requests; struct match_ctx *mctx; diff --git a/src/modules/job-list/job_data.c b/src/modules/job-list/job_data.c index a48f7bdca52a..2c28064df62b 100644 --- a/src/modules/job-list/job_data.c +++ b/src/modules/job-list/job_data.c @@ -38,9 +38,12 @@ void job_destroy (void *data) hostlist_destroy (job->nodelist_hl); json_decref (job->annotations); grudgeset_destroy (job->dependencies); + json_decref (job->dependencies_db); json_decref (job->jobspec); json_decref (job->R); + free (job->eventlog); json_decref (job->exception_context); + json_decref (job->job_dbdata); free (job); errno = save_errno; } diff --git a/src/modules/job-list/job_data.h b/src/modules/job-list/job_data.h index 512f796ff1e4..3a6d6b6260e6 100644 --- a/src/modules/job-list/job_data.h +++ b/src/modules/job-list/job_data.h @@ -65,13 +65,22 @@ struct job { const char *exception_note; flux_job_result_t result; json_t *annotations; + /* dependencies - built up + * dependencies_db - recovered from db + */ struct grudgeset *dependencies; + json_t *dependencies_db; /* cache of job information */ json_t *jobspec; json_t *R; + char *eventlog; + size_t eventlog_len; json_t *exception_context; + /* all job data from db */ + json_t *job_dbdata; + /* Track which states we have seen and have completed transition * to. States we've processed via the states_mask and states seen * via events stream in states_events_mask. diff --git a/src/modules/job-list/job_db.c b/src/modules/job-list/job_db.c new file mode 100644 index 000000000000..a38f9dddee56 --- /dev/null +++ b/src/modules/job-list/job_db.c @@ -0,0 +1,525 @@ +/************************************************************\ + * Copyright 2016 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* job_db: support storing inactive jobs to db */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/common/libutil/log.h" +#include "src/common/libutil/fsd.h" +#include "src/common/libutil/tstat.h" +#include "src/common/libutil/monotime.h" + +#include "job_db.h" +#include "job_util.h" +#include "util.h" + +#define BUSY_TIMEOUT_DEFAULT 50 +#define BUFSIZE 1024 + +/* N.B. "state" is always INACTIVE, but added in case of future changes */ + +const char *sql_create_table = "CREATE TABLE if not exists jobs(" + " id CHAR(16) PRIMARY KEY," + " userid INT," + " name TEXT," + " queue TEXT," + " state INT," + " result INT," + " nodelist TEXT," + " t_submit REAL," + " t_depend REAL," + " t_run REAL," + " t_cleanup REAL," + " t_inactive REAL," + " jobdata JSON," + " eventlog TEXT," + " jobspec JSON," + " R JSON" + ");"; + +const char *sql_store = \ + "INSERT INTO jobs" \ + "(" \ + " id," \ + " userid," \ + " name," \ + " queue," \ + " state," \ + " result," \ + " nodelist," \ + " t_submit," \ + " t_depend," \ + " t_run," \ + " t_cleanup," \ + " t_inactive," \ + " jobdata," \ + " eventlog," \ + " jobspec," \ + " R" \ + ") values (" \ + " ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16 " \ + ")"; + +void job_db_ctx_destroy (struct job_db_ctx *ctx) +{ + if (ctx) { + free (ctx->dbpath); + if (ctx->store_stmt) { + if (sqlite3_finalize (ctx->store_stmt) != SQLITE_OK) + log_sqlite_error (ctx, "sqlite_finalize store_stmt"); + } + if (ctx->db) { + if (sqlite3_close (ctx->db) != SQLITE_OK) + log_sqlite_error (ctx, "sqlite3_close"); + } + if (ctx->handlers) + flux_msg_handler_delvec (ctx->handlers); + free (ctx); + } +} + +static struct job_db_ctx * job_db_ctx_create (flux_t *h) +{ + struct job_db_ctx *ctx = calloc (1, sizeof (*ctx)); + + if (!ctx) { + flux_log_error (h, "job_db_ctx_create"); + goto error; + } + + ctx->h = h; + ctx->busy_timeout = BUSY_TIMEOUT_DEFAULT; + + return ctx; + error: + job_db_ctx_destroy (ctx); + return (NULL); +} + +static unsigned long long get_file_size (const char *path) +{ + struct stat sb; + + if (stat (path, &sb) < 0) + return 0; + return sb.st_size; +} + +static void db_stats_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct job_db_ctx *ctx = arg; + + if (flux_respond_pack (h, + msg, + "{s:I s:{s:i s:f s:f s:f s:f}}", + "dbfile_size", get_file_size (ctx->dbpath), + "store", + "count", tstat_count (&ctx->sqlstore), + "min", tstat_min (&ctx->sqlstore), + "max", tstat_max (&ctx->sqlstore), + "mean", tstat_mean (&ctx->sqlstore), + "stddev", tstat_stddev (&ctx->sqlstore)) < 0) + flux_log_error (h, "error responding to db-stats request"); + return; +} + +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "job-list.db-stats", db_stats_cb, 0 }, + FLUX_MSGHANDLER_TABLE_END, +}; + +static int get_max_inactive(struct job_db_ctx *ctx) +{ + char *inactive_max_query = "SELECT MAX(t_inactive) FROM jobs"; + sqlite3_stmt *res = NULL; + int save_errno, rv = -1; + + if (sqlite3_prepare_v2 (ctx->db, + inactive_max_query, + -1, + &res, + 0) != SQLITE_OK) { + log_sqlite_error (ctx, "sqlite3_prepare_v2"); + goto error; + } + + while (sqlite3_step (res) == SQLITE_ROW) { + const char *s = (const char *)sqlite3_column_text (res, 0); + if (s) { + char *endptr; + double d; + errno = 0; + d = strtod (s, &endptr); + if (errno || *endptr != '\0') + goto error; + ctx->initial_max_inactive = d; + break; + } + } + + rv = 0; +error: + save_errno = errno; + sqlite3_finalize (res); + errno = save_errno; + return rv; +} + +int job_db_init (struct job_db_ctx *ctx) +{ + int flags = SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE; + char buf[1024]; + int rc = -1; + + if (sqlite3_open_v2 (ctx->dbpath, &ctx->db, flags, NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "opening %s", ctx->dbpath); + goto error; + } + + if (sqlite3_exec (ctx->db, + "PRAGMA journal_mode=WAL", + NULL, + NULL, + NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "setting sqlite 'journal_mode' pragma"); + goto error; + } + if (sqlite3_exec (ctx->db, + "PRAGMA synchronous=NORMAL", + NULL, + NULL, + NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "setting sqlite 'synchronous' pragma"); + goto error; + } + snprintf (buf, 1024, "PRAGMA busy_timeout=%u;", ctx->busy_timeout); + if (sqlite3_exec (ctx->db, + buf, + NULL, + NULL, + NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "setting sqlite 'busy_timeout' pragma"); + goto error; + } + if (sqlite3_exec (ctx->db, + sql_create_table, + NULL, + NULL, + NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "creating object table"); + goto error; + } + + if (sqlite3_prepare_v2 (ctx->db, + sql_store, + -1, + &ctx->store_stmt, + NULL) != SQLITE_OK) { + log_sqlite_error (ctx, "preparing store stmt"); + goto error; + } + + if (flux_msg_handler_addvec (ctx->h, htab, ctx, &ctx->handlers) < 0) { + flux_log_error (ctx->h, "flux_msg_handler_addvec"); + goto error; + } + + if (get_max_inactive (ctx) < 0) + goto error; + + rc = 0; +error: + return rc; +} + +int job_db_store (struct job_db_ctx *ctx, struct job *job) +{ + json_t *o = NULL; + flux_error_t err; + char *job_str = NULL; + char *jobspec = NULL; + char *R = NULL; + char idbuf[64]; + struct timespec t0; + int rv = -1; + + /* when job-list is initialized from the journal, we could + * re-store duplicate entries into the db. Do not do this if the + * t_inactive is less than the max we read from the db upon module + * initialization + * + * Note, small chance of floating point rounding errors here, but + * if 1 job is added twice to the DB, we can live with it. + */ + if (job->t_inactive <= ctx->initial_max_inactive) + return 0; + + monotime (&t0); + + snprintf (idbuf, 64, "%llu", (unsigned long long)job->id); + if (sqlite3_bind_text (ctx->store_stmt, + 1, + idbuf, + strlen (idbuf), + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding id"); + goto out; + } + if (sqlite3_bind_int (ctx->store_stmt, + 2, + job->userid) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding userid"); + goto out; + } + /* N.B. name can be NULL. sqlite_bind_text() same as + * sqlite_bind_null() if pointer is NULL */ + if (sqlite3_bind_text (ctx->store_stmt, + 3, + job->name, + job->name ? strlen (job->name) : 0, + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding job name"); + goto out; + } + /* N.B. queue can be NULL. sqlite_bind_text() same as + * sqlite_bind_null() if pointer is NULL */ + if (sqlite3_bind_text (ctx->store_stmt, + 4, + job->queue, + job->queue ? strlen (job->queue) : 0, + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding job queue"); + goto out; + } + if (sqlite3_bind_int (ctx->store_stmt, + 5, + job->state) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding state"); + goto out; + } + if (sqlite3_bind_int (ctx->store_stmt, + 6, + job->result) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding result"); + goto out; + } + /* N.B. nodelist can be NULL. sqlite_bind_text() same as + * sqlite_bind_null() if pointer is NULL */ + if (sqlite3_bind_text (ctx->store_stmt, + 7, + job->nodelist, + job->nodelist ? strlen (job->nodelist) : 0, + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding job nodelist"); + goto out; + } + if (sqlite3_bind_double (ctx->store_stmt, + 8, + job->t_submit) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding t_submit"); + goto out; + } + if (sqlite3_bind_double (ctx->store_stmt, + 9, + job->t_depend) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding t_depend"); + goto out; + } + if (sqlite3_bind_double (ctx->store_stmt, + 10, + job->t_run) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding t_run"); + goto out; + } + if (sqlite3_bind_double (ctx->store_stmt, + 11, + job->t_cleanup) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding t_cleanup"); + goto out; + } + if (sqlite3_bind_double (ctx->store_stmt, + 12, + job->t_inactive) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding t_inactive"); + goto out; + } + if (!(o = job_to_json_dbdata (job, &err))) + goto out; + if (!(job_str = json_dumps (o, JSON_COMPACT))) { + errno = ENOMEM; + goto out; + } + if (sqlite3_bind_text (ctx->store_stmt, + 13, + job_str, + strlen (job_str), + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding jobdata"); + goto out; + } + if (sqlite3_bind_text (ctx->store_stmt, + 14, + job->eventlog, + strlen (job->eventlog), + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding eventlog"); + goto out; + } + if (!(jobspec = json_dumps (job->jobspec, 0))) { + flux_log_error (ctx->h, "json_dumps jobspec"); + goto out; + } + if (sqlite3_bind_text (ctx->store_stmt, + 15, + jobspec, + strlen (jobspec), + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding jobspec"); + goto out; + } + if (job->R) { + if (!(R = json_dumps (job->R, 0))) { + flux_log_error (ctx->h, "json_dumps R"); + goto out; + } + } + /* N.B. R can be NULL. sqlite_bind_text() same as + * sqlite_bind_null() if pointer is NULL */ + if (sqlite3_bind_text (ctx->store_stmt, + 16, + R, + R ? strlen (R) : 0, + SQLITE_STATIC) != SQLITE_OK) { + log_sqlite_error (ctx, "store: binding R"); + goto out; + } + while (sqlite3_step (ctx->store_stmt) != SQLITE_DONE) { + /* due to rounding errors in sqlite, duplicate entries could be + * written out on occasion leading to a SQLITE_CONSTRAINT error. + * We accept this and move on. + */ + int errcode = sqlite3_errcode (ctx->db); + if (errcode == SQLITE_CONSTRAINT) + break; + else if (errcode == SQLITE_BUSY) { + /* In the rare case this cannot complete within the normal + * busytimeout, we elect to spin till it completes. This + * may need to be revisited in the future: */ + flux_log (ctx->h, LOG_DEBUG, "%s: BUSY", __FUNCTION__); + usleep (1000); + continue; + } + else { + log_sqlite_error (ctx, "store: executing stmt"); + goto out; + } + } + + tstat_push (&ctx->sqlstore, monotime_since (t0)); + + rv = 0; +out: + sqlite3_reset (ctx->store_stmt); + json_decref (o); + free (job_str); + free (jobspec); + free (R); + return rv; +} + +static int process_config (struct job_db_ctx *ctx) +{ + flux_error_t err; + const char *dbpath = NULL; + const char *busytimeout = NULL; + + if (flux_conf_unpack (flux_get_conf (ctx->h), + &err, + "{s?{s?s s?s}}", + "job-list", + "dbpath", &dbpath, + "busytimeout", &busytimeout) < 0) { + flux_log (ctx->h, LOG_ERR, + "error reading db config: %s", + err.text); + return -1; + } + + if (dbpath) { + if (!(ctx->dbpath = strdup (dbpath))) + flux_log_error (ctx->h, "dbpath not configured"); + } + else { + const char *dbdir = flux_attr_get (ctx->h, "statedir"); + if (dbdir) { + if (asprintf (&ctx->dbpath, "%s/job-db.sqlite", dbdir) < 0) { + flux_log_error (ctx->h, "asprintf"); + return -1; + } + } + } + if (busytimeout) { + double tmp; + if (fsd_parse_duration (busytimeout, &tmp) < 0) + flux_log_error (ctx->h, "busytimeout not configured"); + else + ctx->busy_timeout = (int)(1000 * tmp); + } + + return 0; +} + +struct job_db_ctx * job_db_setup (flux_t *h, int ac, char **av) +{ + struct job_db_ctx *ctx = job_db_ctx_create (h); + + if (!ctx) + return NULL; + + if (process_config (ctx) < 0) + goto done; + + if (!ctx->dbpath) { + errno = ENOTBLK; + goto done; + } + + if (job_db_init (ctx) < 0) + goto done; + + return ctx; + +done: + job_db_ctx_destroy (ctx); + return NULL; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/modules/job-list/job_db.h b/src/modules/job-list/job_db.h new file mode 100644 index 000000000000..5b6cc10baa7f --- /dev/null +++ b/src/modules/job-list/job_db.h @@ -0,0 +1,44 @@ +/************************************************************\ + * Copyright 2018 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_JOB_DB_H +#define _FLUX_JOB_DB_H + +#include +#include + +#include "src/common/libczmqcontainers/czmq_containers.h" +#include "src/common/libutil/tstat.h" + +#include "job_data.h" + +struct job_db_ctx { + flux_t *h; + char *dbpath; + unsigned int busy_timeout; + sqlite3 *db; + sqlite3_stmt *store_stmt; + flux_msg_handler_t **handlers; + tstat_t sqlstore; + double initial_max_inactive; /* when db initially loaded */ +}; + +struct job_db_ctx *job_db_setup (flux_t *h, int ac, char **av); + +void job_db_ctx_destroy (struct job_db_ctx *ctx); + +int job_db_store (struct job_db_ctx *ctx, struct job *job); + +#endif /* _FLUX_JOB_DB_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ + diff --git a/src/modules/job-list/job_state.c b/src/modules/job-list/job_state.c index 2ce0519ef03d..0877a48b055d 100644 --- a/src/modules/job-list/job_state.c +++ b/src/modules/job-list/job_state.c @@ -31,6 +31,7 @@ #include "job-list.h" #include "job_state.h" #include "job_data.h" +#include "job_db.h" #include "idsync.h" #include "job_util.h" @@ -309,11 +310,23 @@ static void process_state_transition_update (struct job_state_ctx *jsctx, /* FLUX_JOB_STATE_SCHED */ /* FLUX_JOB_STATE_CLEANUP */ /* FLUX_JOB_STATE_INACTIVE */ + bool inactive = false; - if (state == FLUX_JOB_STATE_INACTIVE) + if (state == FLUX_JOB_STATE_INACTIVE) { eventlog_inactive_complete (job); + inactive = true; + } update_job_state_and_list (jsctx, job, state, timestamp); + + if (inactive) { + assert (job->state == FLUX_JOB_STATE_INACTIVE); + if (job->eventlog && jsctx->ctx->dbctx) { + if (job_db_store (jsctx->ctx->dbctx, job) < 0) + flux_log_error (jsctx->h, "%s: job_db_store", + __FUNCTION__); + } + } } } @@ -409,6 +422,42 @@ void job_state_unpause_cb (flux_t *h, flux_msg_handler_t *mh, flux_log_error (h, "error responding to unpause request"); } +static int store_eventlog_entry (struct job_state_ctx *jsctx, + struct job *job, + json_t *entry) +{ + char *s = json_dumps (entry, 0); + int rv = -1; + + /* entry should have been verified via eventlog_entry_parse() + * earlier */ + assert (s); + + if (!job->eventlog) { + job->eventlog_len = strlen (s) + 2; /* +2 for \n and \0 */ + if (!(job->eventlog = calloc (1, job->eventlog_len))) { + flux_log_error (jsctx->h, "calloc"); + goto error; + + } + strcpy (job->eventlog, s); + strcat (job->eventlog, "\n"); + } + else { + job->eventlog_len += strlen (s) + 1; /* +1 for \n */ + if (!(job->eventlog = realloc (job->eventlog, job->eventlog_len))) { + flux_log_error (jsctx->h, "realloc"); + goto error; + } + strcat (job->eventlog, s); + strcat (job->eventlog, "\n"); + } + rv = 0; +error: + free (s); + return rv; +} + static int job_transition_state (struct job_state_ctx *jsctx, struct job *job, flux_job_state_t newstate, @@ -487,12 +536,17 @@ static int journal_submit_event (struct job_state_ctx *jsctx, struct job *job, flux_jobid_t id, double timestamp, + json_t *entry, json_t *context, json_t *jobspec) { if (!job) { if (!(job = job_create (jsctx->h, id))) return -1; + if (store_eventlog_entry (jsctx, job, entry) < 0) { + job_destroy (job); + return -1; + } if (jobspec) job->jobspec = json_incref (jobspec); if (zhashx_insert (jsctx->index, &job->id, job) < 0) { @@ -895,11 +949,17 @@ static int journal_process_event (struct job_state_ctx *jsctx, return 0; } + if (job && job->eventlog) { + if (store_eventlog_entry (jsctx, job, event) < 0) + return -1; + } + if (streq (name, "submit")) { if (journal_submit_event (jsctx, job, id, timestamp, + event, context, jobspec) < 0) return -1; @@ -1115,6 +1175,9 @@ struct job_state_ctx *job_state_create (struct list_ctx *ctx) { struct job_state_ctx *jsctx = NULL; + /* dbctx can be NULL */ + assert (ctx->isctx); + if (!(jsctx = calloc (1, sizeof (*jsctx)))) { flux_log_error (ctx->h, "calloc"); return NULL; diff --git a/src/modules/job-list/job_state.h b/src/modules/job-list/job_state.h index 56d6f6ca0a72..43607bd0920e 100644 --- a/src/modules/job-list/job_state.h +++ b/src/modules/job-list/job_state.h @@ -16,6 +16,7 @@ #include "src/common/libczmqcontainers/czmq_containers.h" +#include "job_db.h" #include "idsync.h" #include "stats.h" diff --git a/src/modules/job-list/job_util.c b/src/modules/job-list/job_util.c index 163ab84d9462..695b168c6e67 100644 --- a/src/modules/job-list/job_util.c +++ b/src/modules/job-list/job_util.c @@ -205,7 +205,10 @@ static int store_attr (struct job *job, else if (streq (attr, "dependencies")) { if (!job->dependencies) return 0; - val = json_incref (grudgeset_tojson (job->dependencies)); + if (job->dependencies_db) + val = json_incref (job->dependencies_db); + else + val = json_incref (grudgeset_tojson (job->dependencies)); } else { errprintf (errp, "%s is not a valid attribute", attr); @@ -290,6 +293,38 @@ json_t *job_to_json (struct job *job, json_t *attrs, flux_error_t *errp) return NULL; } +json_t *job_to_json_dbdata (struct job *job, flux_error_t *errp) +{ + json_t *val = NULL; + json_t *o; + + memset (errp, 0, sizeof (*errp)); + + if (!(o = json_object ())) + goto error_nomem; + if (!(val = json_integer (job->id))) + goto error_nomem; + if (json_object_set_new (o, "id", val) < 0) { + json_decref (val); + goto error_nomem; + } + if (store_all_attr (job, o, errp) < 0) + goto error; + if (!(val = json_integer (job->states_mask))) + goto error_nomem; + if (json_object_set_new (o, "states_mask", val) < 0) { + json_decref (val); + goto error_nomem; + } + return o; +error_nomem: + errno = ENOMEM; +error: + ERRNO_SAFE_WRAP (json_decref, o); + return NULL; +} + + /* * vi:tabstop=4 shiftwidth=4 expandtab */ diff --git a/src/modules/job-list/job_util.h b/src/modules/job-list/job_util.h index 8e09fc47e722..422ca618c4f5 100644 --- a/src/modules/job-list/job_util.h +++ b/src/modules/job-list/job_util.h @@ -17,6 +17,10 @@ json_t *job_to_json (struct job *job, json_t *attrs, flux_error_t *errp); +/* similar to job_to_json(), but all data to be stored in db. + */ +json_t *job_to_json_dbdata (struct job *job, flux_error_t *errp); + #endif /* ! _FLUX_JOB_LIST_JOB_UTIL_H */ /* diff --git a/src/modules/job-list/list.c b/src/modules/job-list/list.c index e952e4fb0185..dcc7664f77be 100644 --- a/src/modules/job-list/list.c +++ b/src/modules/job-list/list.c @@ -15,6 +15,8 @@ #endif #include #include +#include +#include #include #include "src/common/libutil/errno_safe.h" @@ -29,6 +31,9 @@ #include "job_data.h" #include "match.h" #include "state_match.h" +#include "constraint_sql.h" +#include "job_db.h" +#include "util.h" json_t *get_job_by_id (struct job_state_ctx *jsctx, flux_error_t *errp, @@ -50,7 +55,8 @@ int get_jobs_from_list (json_t *jobs, int max_entries, json_t *attrs, double since, - struct list_constraint *c) + struct list_constraint *c, + double *t_inactive_last) { struct job *job; @@ -80,6 +86,8 @@ int get_jobs_from_list (json_t *jobs, errno = ENOMEM; return -1; } + if (t_inactive_last) + (*t_inactive_last) = job->t_inactive; if (json_array_size (jobs) == max_entries) return 1; } @@ -89,6 +97,268 @@ int get_jobs_from_list (json_t *jobs, return 0; } +struct job *sqliterow_2_job (struct job_state_ctx *jsctx, sqlite3_stmt *res) +{ + struct job *job = NULL; + const char *s; + int success; + int exception_occurred; + const char *ranks = NULL; + const char *nodelist = NULL; + + /* initially set job id to 0 */ + if (!(job = job_create (jsctx->h, 0))) + return NULL; + + /* most job fields will be handled by parsing it out of job data */ + + /* index 0 - id + * index 1 - userid + * index 2 - name + * index 3 - queue + * index 4 - state + * index 5 - result + * index 6 - nodelist + * index 7 - t_submit + * index 8 - t_depend + * index 9 - t_run + * index 10 - t_cleanup + * index 11 - t_inactive + */ + + s = (const char *)sqlite3_column_text (res, 12); + assert (s); + job->job_dbdata = json_loads (s, 0, NULL); + assert (job->job_dbdata); + + s = (const char *)sqlite3_column_text (res, 13); + assert (s); + job->eventlog = strdup (s); + assert (job->eventlog); + + s = (const char *)sqlite3_column_text (res, 14); + assert (s); + job->jobspec = json_loads (s, 0, NULL); + assert (job->jobspec); + + s = (const char *)sqlite3_column_text (res, 15); + if (s) { + job->R = json_loads (s, 0, NULL); + assert (job->R); + } + + if (json_unpack (job->job_dbdata, + "{s:I s:i s:i s?:I s:f s?:f s?:f s?:f s:f s:i s:i}", + "id", &job->id, + "userid", &job->userid, + "urgency", &job->urgency, + "priority", &job->priority, + "t_submit", &job->t_submit, + "t_depend", &job->t_depend, + "t_run", &job->t_run, + "t_cleanup", &job->t_cleanup, + "t_inactive", &job->t_inactive, + "state", &job->state, + "states_mask", &job->states_mask) < 0) { + flux_log (jsctx->h, LOG_ERR, "json_unpack"); + return NULL; + } + + if (json_unpack (job->job_dbdata, + "{s?:s s?:s s?:s s?:s s?:s}", + "name", &job->name, + "cwd", &job->cwd, + "queue", &job->queue, + "project", &job->project, + "bank", &job->bank, + "ntasks", &job->ntasks, + "ncores", &job->ncores, + "nnodes", &job->nnodes, + "ranks", &ranks, + "nodelist", &nodelist, + "expiration", &job->expiration, + "waitstatus", &job->wait_status, + "success", &success) < 0) { + flux_log (jsctx->h, LOG_ERR, "json_unpack"); + return NULL; + } + + /* N.B. success required for inactive job */ + if (json_unpack (job->job_dbdata, + "{s?:i s?:i s?:i s?:s s?:s s?:f s?:i s:b}", + "ntasks", &job->ntasks, + "ncores", &job->ncores, + "nnodes", &job->nnodes, + "ranks", &ranks, + "nodelist", &nodelist, + "expiration", &job->expiration, + "waitstatus", &job->wait_status, + "success", &success) < 0) { + flux_log (jsctx->h, LOG_ERR, "json_unpack"); + return NULL; + } + + /* N.B. result required for inactive job */ + if (json_unpack (job->job_dbdata, + "{s:b s?:s s?:i s?:s s:i s?:O s?:O}", + "exception_occurred", &exception_occurred, + "exception_type", &job->exception_type, + "exception_severity", &job->exception_severity, + "exception_note", &job->exception_note, + "result", &job->result, + "annotations", &job->annotations, + "dependencies", &job->dependencies_db) < 0) { + flux_log (jsctx->h, LOG_ERR, "json_unpack"); + return NULL; + } + + job->success = success; + job->exception_occurred = exception_occurred; + + if (ranks) { + job->ranks = strdup (ranks); + assert (job->ranks); + } + if (nodelist) { + job->nodelist = strdup (nodelist); + assert (job->nodelist); + } + + return job; +} + +int get_jobs_from_sqlite (struct job_state_ctx *jsctx, + json_t *jobs, + flux_error_t *errp, + int max_entries, + json_t *attrs, + double since, + struct list_constraint *c, + json_t *constraint, + double t_inactive_last) +{ + char *sql_query = NULL; + char *sql_select = "SELECT *"; + char *sql_from = "FROM jobs"; + char *sql_constraint = NULL; + char *sql_where = NULL; + char *sql_order = NULL; + sqlite3_stmt *res = NULL; + flux_error_t error; + int save_errno, rv = -1; + + if (!jsctx->ctx->dbctx) + return 0; + + if (constraint2sql (constraint, &sql_constraint, &error) < 0) { + flux_log (jsctx->h, LOG_ERR, "constraint2sql: %s", error.text); + goto out; + } + + /* N.B. timestamp precision largely depends on how the jansson + * library encodes timestamps in the eventlog. Trial and error + * indicates it averages 7 decimal places. Default "%f" is 6 + * decimal places and can lead to rounding errors in the SQL + * query. We'll up it to 9 decimal places to be on the safe side. + * + * It's possible on some systems / builds, this could not be the case. + */ + + if (since > 0.0) { + if (asprintf (&sql_where, + "WHERE t_inactive < %.9f AND t_inactive > %.9f%s%s", + t_inactive_last, + since, + sql_constraint ? " AND " : "", + sql_constraint ? sql_constraint : "") < 0) { + errno = ENOMEM; + goto out; + } + } + else { + if (asprintf (&sql_where, + "WHERE t_inactive < %.9f%s%s", + t_inactive_last, + sql_constraint ? " AND " : "", + sql_constraint ? sql_constraint : "") < 0) { + errno = ENOMEM; + goto out; + } + } + + if (max_entries) { + if (asprintf (&sql_order, + "ORDER BY t_inactive DESC LIMIT %d", + max_entries) < 0) { + errno = ENOMEM; + goto out; + } + } + else { + if (asprintf (&sql_order, "ORDER BY t_inactive DESC") < 0) { + errno = ENOMEM; + goto out; + } + } + + if (asprintf (&sql_query, + "%s %s %s %s", + sql_select, + sql_from, + sql_where, + sql_order) < 0) { + errno = ENOMEM; + goto out; + } + + if (sqlite3_prepare_v2 (jsctx->ctx->dbctx->db, + sql_query, + -1, + &res, + 0) != SQLITE_OK) { + log_sqlite_error (jsctx->ctx->dbctx, "sqlite3_prepare_v2"); + goto out; + } + + while (sqlite3_step (res) == SQLITE_ROW) { + struct job *job; + if (!(job = sqliterow_2_job (jsctx, res))) { + log_sqlite_error (jsctx->ctx->dbctx, "sqliterow_2_job"); + goto out; + } + if (job_match (job, c, errp)) { + json_t *o; + if (!(o = job_to_json (job, attrs, errp))) { + job_destroy (job); + goto out; + } + if (json_array_append_new (jobs, o) < 0) { + json_decref (o); + job_destroy (job); + errno = ENOMEM; + goto out; + } + if (json_array_size (jobs) == max_entries) { + job_destroy (job); + rv = 1; + goto out; + } + } + + job_destroy (job); + } + + rv = 0; + out: + save_errno = errno; + free (sql_query); + free (sql_where); + free (sql_order); + sqlite3_finalize (res); + errno = save_errno; + return rv; +} + /* Create a JSON array of 'job' objects. 'max_entries' determines the * max number of jobs to return, 0=unlimited. 'since' limits jobs returned * to those with t_inactive greater than timestamp. Returns JSON object @@ -103,6 +373,7 @@ json_t *get_jobs (struct job_state_ctx *jsctx, double since, json_t *attrs, struct list_constraint *c, + json_t *constraint, struct state_constraint *statec) { json_t *jobs = NULL; @@ -122,7 +393,8 @@ json_t *get_jobs (struct job_state_ctx *jsctx, max_entries, attrs, 0., - c)) < 0) + c, + NULL)) < 0) goto error; } @@ -134,21 +406,38 @@ json_t *get_jobs (struct job_state_ctx *jsctx, max_entries, attrs, 0., - c)) < 0) + c, + NULL)) < 0) goto error; } } if (state_match (FLUX_JOB_STATE_INACTIVE, statec)) { if (!ret) { + double t_inactive_last = DBL_MAX; + if ((ret = get_jobs_from_list (jobs, errp, jsctx->inactive, max_entries, attrs, since, - c)) < 0) + c, + &t_inactive_last)) < 0) goto error; + + if (!ret) { + if ((ret = get_jobs_from_sqlite (jsctx, + jobs, + errp, + max_entries, + attrs, + since, + c, + constraint, + t_inactive_last)) < 0) + goto error; + } } } @@ -336,7 +625,7 @@ void list_cb (flux_t *h, } if (!(jobs = get_jobs (ctx->jsctx, &err, max_entries, since, - attrs, c, statec))) + attrs, c, constraint, statec))) goto error; if (flux_respond_pack (h, msg, "{s:O}", "jobs", jobs) < 0) @@ -436,6 +725,49 @@ int check_id_valid (struct job_state_ctx *jsctx, return 0; } +struct job *get_job_by_id_sqlite (struct job_state_ctx *jsctx, + flux_error_t *errp, + const flux_msg_t *msg, + flux_jobid_t id, + json_t *attrs, + bool *stall) +{ + char *sql = "SELECT * FROM jobs WHERE id = ?;"; + sqlite3_stmt *res = NULL; + struct job *job = NULL; + struct job *rv = NULL; + + if (!jsctx->ctx->dbctx) + return NULL; + + if (sqlite3_prepare_v2 (jsctx->ctx->dbctx->db, + sql, + -1, + &res, + 0) != SQLITE_OK) { + log_sqlite_error (jsctx->ctx->dbctx, "sqlite3_prepare_v2"); + goto error; + } + + if (sqlite3_bind_int64 (res, 1, id) != SQLITE_OK) { + log_sqlite_error (jsctx->ctx->dbctx, "sqlite3_bind_int64"); + goto error; + } + + if (sqlite3_step (res) == SQLITE_ROW) { + if (!(job = sqliterow_2_job (jsctx, res))) { + log_sqlite_error (jsctx->ctx->dbctx, "sqliterow_2_job"); + goto error; + } + } + + rv = job; +error: + sqlite3_finalize (res); + return rv; +} + + /* Returns JSON object which the caller must free. On error, return * NULL with errno set: * @@ -454,14 +786,16 @@ json_t *get_job_by_id (struct job_state_ctx *jsctx, struct job *job; if (!(job = zhashx_lookup (jsctx->index, &id))) { - if (stall) { - if (check_id_valid (jsctx, msg, id, attrs, state) < 0) { - flux_log_error (jsctx->h, "%s: check_id_valid", __FUNCTION__); - return NULL; + if (!(job = get_job_by_id_sqlite (jsctx, errp, msg, id, attrs, stall))) { + if (stall) { + if (check_id_valid (jsctx, msg, id, attrs, state) < 0) { + flux_log_error (jsctx->h, "%s: check_id_valid", __FUNCTION__); + return NULL; + } + (*stall) = true; } - (*stall) = true; + return NULL; } - return NULL; } /* Always return job in inactive state, even if a requested state was diff --git a/src/modules/job-list/match.c b/src/modules/job-list/match.c index f60764bc6923..56ec3002ab11 100644 --- a/src/modules/job-list/match.c +++ b/src/modules/job-list/match.c @@ -379,45 +379,6 @@ static int match_results (struct list_constraint *c, return ((*results) & job->result) ? 1 : 0; } -static int array_to_results_bitmask (json_t *values, flux_error_t *errp) -{ - int results = 0; - json_t *entry; - size_t index; - int valid_results = (FLUX_JOB_RESULT_COMPLETED - | FLUX_JOB_RESULT_FAILED - | FLUX_JOB_RESULT_CANCELED - | FLUX_JOB_RESULT_TIMEOUT); - - json_array_foreach (values, index, entry) { - flux_job_result_t result; - if (json_is_string (entry)) { - const char *resultstr = json_string_value (entry); - if (flux_job_strtoresult (resultstr, &result) < 0) { - errprintf (errp, - "invalid results value '%s' specified", - resultstr); - return -1; - } - } - else if (json_is_integer (entry)) { - result = json_integer_value (entry); - if (result & ~valid_results) { - errprintf (errp, - "invalid results value '%Xh' specified", - result); - return -1; - } - } - else { - errprintf (errp, "results value invalid type"); - return -1; - } - results |= result; - } - return results; -} - static struct list_constraint *create_results_constraint (struct match_ctx *mctx, json_t *values, flux_error_t *errp) diff --git a/src/modules/job-list/match.h b/src/modules/job-list/match.h index 5589a671fdbc..1d22bf63aaba 100644 --- a/src/modules/job-list/match.h +++ b/src/modules/job-list/match.h @@ -52,6 +52,6 @@ int job_match (const struct job *job, int job_match_config_reload (struct match_ctx *mctx, const flux_conf_t *conf, - flux_error_t *errp);; + flux_error_t *errp); #endif /* !HAVE_JOB_LIST_MATCH_H */ diff --git a/src/modules/job-list/match_util.c b/src/modules/job-list/match_util.c index e7a0a6f36b2d..09703f8ef8e2 100644 --- a/src/modules/job-list/match_util.c +++ b/src/modules/job-list/match_util.c @@ -60,5 +60,44 @@ int array_to_states_bitmask (json_t *values, flux_error_t *errp) return states; } +int array_to_results_bitmask (json_t *values, flux_error_t *errp) +{ + int results = 0; + json_t *entry; + size_t index; + int valid_results = (FLUX_JOB_RESULT_COMPLETED + | FLUX_JOB_RESULT_FAILED + | FLUX_JOB_RESULT_CANCELED + | FLUX_JOB_RESULT_TIMEOUT); + + json_array_foreach (values, index, entry) { + flux_job_result_t result; + if (json_is_string (entry)) { + const char *resultstr = json_string_value (entry); + if (flux_job_strtoresult (resultstr, &result) < 0) { + errprintf (errp, + "invalid results value '%s' specified", + resultstr); + return -1; + } + } + else if (json_is_integer (entry)) { + result = json_integer_value (entry); + if (result & ~valid_results) { + errprintf (errp, + "invalid results value '%Xh' specified", + result); + return -1; + } + } + else { + errprintf (errp, "results value invalid type"); + return -1; + } + results |= result; + } + return results; +} + /* vi: ts=4 sw=4 expandtab */ diff --git a/src/modules/job-list/match_util.h b/src/modules/job-list/match_util.h index 59dbcd9c500c..0402955bd779 100644 --- a/src/modules/job-list/match_util.h +++ b/src/modules/job-list/match_util.h @@ -26,4 +26,6 @@ typedef int (*array_to_bitmask_f) (json_t *, flux_error_t *); int array_to_states_bitmask (json_t *values, flux_error_t *errp); +int array_to_results_bitmask (json_t *values, flux_error_t *errp); + #endif /* !HAVE_JOB_LIST_MATCH_UTIL_H */ diff --git a/src/modules/job-list/test/constraint_sql.c b/src/modules/job-list/test/constraint_sql.c new file mode 100644 index 000000000000..48f7cbf68da5 --- /dev/null +++ b/src/modules/job-list/test/constraint_sql.c @@ -0,0 +1,492 @@ +/************************************************************\ + * Copyright 2023 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include + +#include "src/common/libtap/tap.h" +#include "src/modules/job-list/job_data.h" +#include "src/modules/job-list/constraint_sql.h" +#include "ccan/str/str.h" + +static void constraint2sql_corner_case (const char *str, + const char *msg) +{ + flux_error_t error; + json_error_t jerror; + json_t *jc; + char *query; + int ret; + + if (!(jc = json_loads (str, 0, &jerror))) + BAIL_OUT ("json constraint invalid: %s", jerror.text); + + ret = constraint2sql (jc, &query, &error); + + ok (ret < 0, "constraint2sql fails on %s", msg); + diag ("error: %s", error.text); + json_decref (jc); +} + +static void test_corner_case (void) +{ + constraint2sql_corner_case ("{\"userid\":[1], \"name\":[\"foo\"] }", + "object with too many keys"); + constraint2sql_corner_case ("{\"userid\":1}", + "object with values not array"); + constraint2sql_corner_case ("{\"foo\":[1]}", + "object with invalid operation"); + constraint2sql_corner_case ("{\"userid\":[\"foo\"]}", + "userid value not integer"); + constraint2sql_corner_case ("{\"name\":[1]}", + "name value not string"); + constraint2sql_corner_case ("{\"queue\":[1]}", + "queue value not string"); + constraint2sql_corner_case ("{\"states\":[0.0]}", + "states value not integer or string"); + constraint2sql_corner_case ("{\"states\":[\"foo\"]}", + "states value not valid string"); + constraint2sql_corner_case ("{\"states\":[8192]}", + "states value not valid integer"); + constraint2sql_corner_case ("{\"results\":[0.0]}", + "results value not integer or string"); + constraint2sql_corner_case ("{\"results\":[\"foo\"]}", + "results value not valid string"); + constraint2sql_corner_case ("{\"results\":[8192]}", + "results value not valid integer"); + constraint2sql_corner_case ("{\"t_depend\":[]}", + "t_depend value not specified"); + constraint2sql_corner_case ("{\"t_depend\":[1.0]}", + "t_depend value in invalid format (int)"); + constraint2sql_corner_case ("{\"t_depend\":[\"0.0\"]}", + "t_depend no comparison operator"); + constraint2sql_corner_case ("{\"t_depend\":[\">=foof\"]}", + "t_depend value invalid (str)"); + constraint2sql_corner_case ("{\"t_depend\":[\">=-1.0\"]}", + "t_depend value < 0.0 (str)"); + constraint2sql_corner_case ("{\"not\":[1]}", + "sub constraint not a constraint"); +} + +void test_constraint2sql (const char *constraint, const char *expected) +{ + flux_error_t error; + json_error_t jerror; + json_t *jc = NULL; + char *query = NULL; + char *constraint_compact = NULL; + int ret; + + if (constraint) { + if (!(jc = json_loads (constraint, 0, &jerror))) + BAIL_OUT ("json constraint invalid: %s", jerror.text); + /* b/c tests written below have spacing, alignment, + * etc. etc. which outputs poorly */ + if (!(constraint_compact = json_dumps (jc, JSON_COMPACT))) + BAIL_OUT ("json_dumps"); + } + + ret = constraint2sql (jc, &query, &error); + + ok (ret == 0, "constraint2sql success"); + if (ret < 0) + diag ("error: %s", error.text); + if (expected) { + bool pass = query && streq (query, expected); + ok (pass == true, + "constraint2sql on \"%s\" success", constraint_compact); + if (!pass) + diag ("unexpected result: %s", query); + } + else { + ok (query == NULL, + "constraint2sql on \"%s\" success", constraint_compact); + } + json_decref (jc); +} + +static void test_special_cases (void) +{ + test_constraint2sql (NULL, NULL); +} + +struct constraint2sql_test { + const char *constraint; + const char *expected; +}; + +/* N.B. These constraints are copied from the tests in match.c */ +struct constraint2sql_test tests[] = { + /* + * userid tests + */ + /* matches "all", so no query result */ + { + "{}", + NULL, + }, + /* no sql query possible, return is NULL */ + { + "{ \"userid\": [ ] }", + NULL, + }, + { + "{ \"userid\": [ 42 ] }", + "(userid = 42)", + }, + { + "{ \"userid\": [ 42, 43 ] }", + "(userid = 42 OR userid = 43)", + }, + /* FLUX_USERID_UNKNOWN = 0xFFFFFFFF + * matches "all", so no query result + */ + { + "{ \"userid\": [ -1 ] }", + NULL, + }, + /* + * name tests + */ + /* no sql query possible, return is NULL */ + { + "{ \"name\": [ ] }", + NULL, + }, + { + "{ \"name\": [ \"foo\" ] }", + "(name = 'foo')", + }, + { + "{ \"name\": [ \"foo\", \"bar\" ] }", + "(name = 'foo' OR name = 'bar')", + }, + /* + * queue tests + */ + /* no sql query possible, return is NULL */ + { + "{ \"queue\": [ ] }", + NULL, + }, + { + "{ \"queue\": [ \"foo\" ] }", + "(queue = 'foo')", + }, + { + "{ \"queue\": [ \"foo\", \"bar\" ] }", + "(queue = 'foo' OR queue = 'bar')", + }, + /* + * states tests + */ + /* matches "nothing" */ + { + "{ \"states\": [ ] }", + "((state & 0) > 0)", + }, + { + /* sanity check integer inputs work, we assume FLUX_JOB_STATE_NEW + * will always be 1, use strings everywhere else + */ + "{ \"states\": [ 1 ] }", + "((state & 1) > 0)", + }, + { + "{ \"states\": [ \"sched\" ] }", + "((state & 8) > 0)", + }, + { + "{ \"states\": [ \"sched\", \"RUN\" ] }", + "((state & 24) > 0)", + }, + /* + * results tests + */ + /* matches "nothing" */ + { + "{ \"results\": [ ] }", + "((result & 0) > 0)", + }, + { + /* sanity check integer inputs work, we assume + * FLUX_JOB_RESULT_COMPLETED will always be 1, use strings + * everywhere else + */ + "{ \"results\": [ 1 ] }", + "((result & 1) > 0)", + }, + { + "{ \"results\": [ \"completed\" ] }", + "((result & 1) > 0)", + }, + { + "{ \"results\": [ \"completed\", \"FAILED\" ] }", + "((result & 3) > 0)", + }, + /* + * hostlist tests + * + * N.B. hostlist cannot be converted to SQL query, so all return + * NULL + */ + { + "{ \"hostlist\": [ ] }", + NULL, + }, + { + "{ \"hostlist\": [ \"foo1\" ] }", + NULL, + }, + { + "{ \"hostlist\": [ \"foo[1-2]\" ] }", + NULL, + }, + { + "{ \"hostlist\": [ \"foo1\", \"foo2\", \"foo3\" ] }", + NULL, + }, + /* + * timestamp tests + */ + { + "{ \"t_submit\": [ \">=0\" ] }", + "(t_submit >= 0)", + }, + { + "{ \"t_depend\": [ \">=0.0\" ] }", + "(t_depend >= 0.0)", + }, + { + "{ \"t_run\": [ \">=0\" ] }", + "(t_run >= 0)", + }, + { + "{ \"t_cleanup\": [ \">=0.0\" ] }", + "(t_cleanup >= 0.0)", + }, + { + "{ \"t_inactive\": [ \">=0.0\" ] }", + "(t_inactive >= 0.0)", + }, + { + "{ \"t_inactive\": [ \"<100.0\" ] }", + "(t_inactive < 100.0)", + }, + { + "{ \"t_inactive\": [ \"<=100.0\" ] }", + "(t_inactive <= 100.0)", + }, + { + "{ \"t_inactive\": [ \">=100.0\" ] }", + "(t_inactive >= 100.0)", + }, + { + "{ \"or\": [] }", + NULL, + }, + { + "{ \"and\": [] }", + NULL, + }, + { + "{ \"not\": [] }", + NULL, + }, + { + "{ \"not\": [ { \"userid\": [ 42 ] } ] }", + "(NOT ((userid = 42)))", + }, + { + "{ \"or\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"name\": [ \"foo\" ] } \ + ] \ + }", + "((userid = 42) OR (name = 'foo'))", + }, + { + "{ \"or\": \ + [ \ + { \"not\": [ { \"userid\": [ 42 ] } ] }, \ + { \"name\": [ \"foo\" ] } \ + ] \ + }", + "((NOT ((userid = 42))) OR (name = 'foo'))", + }, + { + "{ \"not\": \ + [ \ + { \"or\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"name\": [ \"foo\" ] } \ + ] \ + } \ + ] \ + }", + "(NOT (((userid = 42) OR (name = 'foo'))))", + }, + { + "{ \"and\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"name\": [ \"foo\" ] } \ + ] \ + }", + "((userid = 42) AND (name = 'foo'))", + }, + { + "{ \"and\": \ + [ \ + { \"not\": [ { \"userid\": [ 42 ] } ] }, \ + { \"name\": [ \"foo\" ] } \ + ] \ + }", + "((NOT ((userid = 42))) AND (name = 'foo'))", + }, + { + "{ \"not\": \ + [ \ + { \"and\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"name\": [ \"foo\" ] } \ + ] \ + } \ + ] \ + }", + "(NOT (((userid = 42) AND (name = 'foo'))))", + }, + { + "{ \"and\": \ + [ \ + { \"or\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"userid\": [ 43 ] } \ + ] \ + }, \ + { \"name\": [ \"foo\" ] } \ + ] \ + }", + "(((userid = 42) OR (userid = 43)) AND (name = 'foo'))", + }, + { + /* all the jobs in all states for a specific user */ + "{ \"and\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"states\": [ \"pending\", \"running\", \"inactive\" ] } \ + ] \ + }", + "((userid = 42) AND ((state & 126) > 0))", + }, + { + /* all the unsuccessful jobs for a specific user */ + "{ \"and\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"results\": [ \"failed\", \"canceled\", \"timeout\" ] } \ + ] \ + }", + "((userid = 42) AND ((result & 14) > 0))", + }, + { + /* all the pending and running jobs for a user, in two specific queues */ + "{ \"and\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"states\" : [ \"pending\", \"running\" ] }, \ + { \"queue\": [ \"batch\", \"debug\" ] } \ + ] \ + }", + "((userid = 42) AND ((state & 62) > 0) AND (queue = 'batch' OR queue = 'debug'))", + }, + { + /* jobs for a user, in queue batch, with specific job name, are running */ + "{ \"and\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"queue\": [ \"batch\" ] }, \ + { \"name\": [ \"foo\" ] }, \ + { \"states\": [ \"running\" ] } \ + ] \ + }", + "((userid = 42) AND (queue = 'batch') AND (name = 'foo') AND ((state & 48) > 0))", + }, + { + /* all the inactive jobs since a specific time (via t_inactve) */ + "{ \"and\": \ + [ \ + { \"states\": [ \"inactive\" ] }, \ + { \"t_inactive\": [ \">=500.0\" ] } \ + ] \ + }", + "(((state & 64) > 0) AND (t_inactive >= 500.0))", + }, + { + /* jobs for a user that ran on specific hostlist */ + /* N.B. "hostlist" can't be converted into query, so is dropped */ + "{ \"and\": \ + [ \ + { \"userid\": [ 42 ] }, \ + { \"hostlist\": [ \"node1\", \"node2\" ] } \ + ] \ + }", + "((userid = 42))", + }, + { + /* jobs that ran on specific hostlist during a time period + */ + /* N.B. "hostlist" can't be converted into query, so is dropped */ + "{ \"and\": \ + [ \ + { \"hostlist\": [ \"node1\", \"node2\" ] }, \ + { \"t_run\": [ \">=500.0\" ] }, \ + { \"t_inactive\": [ \"<=5000.0\" ] } \ + ] \ + }", + "((t_run >= 500.0) AND (t_inactive <= 5000.0))", + }, + { + NULL, + NULL, + }, +}; + +static void run_constraint2sql_tests (void) +{ + struct constraint2sql_test *ltests = tests; + + while (ltests->constraint) { + test_constraint2sql (ltests->constraint, ltests->expected); + ltests++; + } +} + +int main (int argc, char *argv[]) +{ + plan (NO_PLAN); + + test_corner_case (); + test_special_cases (); + run_constraint2sql_tests (); + + done_testing (); +} + +/* + * vi:ts=4 sw=4 expandtab + */ diff --git a/src/modules/job-list/test/match.c b/src/modules/job-list/test/match.c index 6c2f0307fecd..a4515805ba6e 100644 --- a/src/modules/job-list/test/match.c +++ b/src/modules/job-list/test/match.c @@ -27,26 +27,19 @@ struct match_ctx mctx = { .h = NULL, .max_comparisons = 0 }; static void list_constraint_create_corner_case (const char *str, - const char *fmt, - ...) + const char *msg) { struct list_constraint *c; - char buf[1024]; flux_error_t error; json_error_t jerror; json_t *jc; - va_list ap; if (!(jc = json_loads (str, 0, &jerror))) BAIL_OUT ("json constraint invalid: %s", jerror.text); - va_start (ap, fmt); - vsnprintf(buf, sizeof (buf), fmt, ap); - va_end (ap); - c = list_constraint_create (&mctx, jc, &error); - ok (c == NULL, "list_constraint_create fails on %s", buf); + ok (c == NULL, "list_constraint_create fails on %s", msg); diag ("error: %s", error.text); json_decref (jc); } diff --git a/src/modules/job-list/test/state_match.c b/src/modules/job-list/test/state_match.c index 0265634a6de6..2a47ed18d97b 100644 --- a/src/modules/job-list/test/state_match.c +++ b/src/modules/job-list/test/state_match.c @@ -20,25 +20,18 @@ #include "ccan/str/str.h" static void state_constraint_create_corner_case (const char *str, - const char *fmt, - ...) + const char *msg) { struct state_constraint *c; - char buf[1024]; flux_error_t error; json_error_t jerror; json_t *jc; - va_list ap; if (!(jc = json_loads (str, 0, &jerror))) BAIL_OUT ("json constraint invalid: %s", jerror.text); - va_start (ap, fmt); - vsnprintf(buf, sizeof (buf), fmt, ap); - va_end (ap); - c = state_constraint_create (jc, &error); - ok (c == NULL, "state_constraint_create fails on %s", buf); + ok (c == NULL, "state_constraint_create fails on %s", msg); diag ("error: %s", error.text); json_decref (jc); } diff --git a/src/modules/job-list/util.c b/src/modules/job-list/util.c new file mode 100644 index 000000000000..7cafe5851763 --- /dev/null +++ b/src/modules/job-list/util.c @@ -0,0 +1,51 @@ +/************************************************************\ + * Copyright 2022 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* util.c - utility functions */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#include + +#include "util.h" + +void log_sqlite_error (struct job_db_ctx *dbctx, const char *fmt, ...) +{ + char buf[128]; + va_list ap; + + va_start (ap, fmt); + (void)vsnprintf (buf, sizeof (buf), fmt, ap); + va_end (ap); + + if (dbctx->db) { + const char *errmsg = sqlite3_errmsg (dbctx->db); + flux_log (dbctx->h, + LOG_ERR, + "%s: %s(%d)", + buf, + errmsg ? errmsg : "unknown error code", + sqlite3_extended_errcode (dbctx->db)); + } + else + flux_log (dbctx->h, + LOG_ERR, + "%s: unknown error, no sqlite3 handle", + buf); +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/modules/job-list/util.h b/src/modules/job-list/util.h new file mode 100644 index 000000000000..59a78899462a --- /dev/null +++ b/src/modules/job-list/util.h @@ -0,0 +1,26 @@ +/************************************************************\ + * Copyright 2018 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_JOB_LIST_UTIL_H +#define _FLUX_JOB_LIST_UTIL_H + +#include + +#include "job_db.h" +#include "job_data.h" + +void __attribute__((format (printf, 2, 3))) +log_sqlite_error (struct job_db_ctx *dbctx, const char *fmt, ...); + +#endif /* ! _FLUX_JOB_LIST_UTIL_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/t/Makefile.am b/t/Makefile.am index bb3c31e81c02..b481cf5b55e8 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -155,6 +155,7 @@ TESTSCRIPTS = \ t2260-job-list.t \ t2261-job-list-update.t \ t2262-job-list-stats.t \ + t2263-job-list-db.t \ t2270-job-dependencies.t \ t2271-job-dependency-after.t \ t2272-job-begin-time.t \ diff --git a/t/t2263-job-list-db.t b/t/t2263-job-list-db.t new file mode 100755 index 000000000000..7006097c8188 --- /dev/null +++ b/t/t2263-job-list-db.t @@ -0,0 +1,496 @@ +#!/bin/sh + +test_description='Test flux job list db' + +. $(dirname $0)/job-list/job-list-helper.sh + +. $(dirname $0)/sharness.sh + +export FLUX_CONF_DIR=$(pwd) +if test -z "${TEST_UNDER_FLUX_ACTIVE}"; then + STATEDIR=$(mktemp -d) +fi +test_under_flux 4 job -o,-Sstatedir=${STATEDIR} + +RPC=${FLUX_BUILD_DIR}/t/request/rpc + +QUERYCMD="flux python ${FLUX_SOURCE_DIR}/t/scripts/sqlite-query.py" + +fj_wait_event() { + flux job wait-event --timeout=20 "$@" +} + +get_statedir_dbpath() { + statedir=`flux getattr statedir` && + echo ${statedir}/job-db.sqlite +} + +# count number of entries in database +# arg1 - database path +db_count_entries() { + local dbpath=$1 + query="select id from jobs;" + count=`${QUERYCMD} -t 10000 ${dbpath} "${query}" | grep "^id =" | wc -l` + echo $count +} + +# verify entries stored in database +# arg1 - jobid +# arg2 - database path +db_check_entries() { + local id=$(flux job id $1) + local dbpath=$2 + query="select * from jobs where id=$id;" + ${QUERYCMD} -t 10000 ${dbpath} "${query}" > query.out + if grep -q "^id = " query.out \ + && grep -q "t_inactive = " query.out \ + && grep -q "jobdata = " query.out \ + && grep -q "eventlog = " query.out \ + && grep -q "jobspec = " query.out \ + && grep -q "R = " query.out + then + return 0 + fi + return 1 +} + +# get job values via job list +# arg1 - jobid +# arg2 - database path +get_job_list_values() { + local id=$(flux job id $1) + jobdata=`flux job list-ids ${id}` + list_userid=`echo ${jobdata} | jq ".userid"` + list_urgency=`echo ${jobdata} | jq ".urgency"` + list_priority=`echo ${jobdata} | jq ".priority"` + list_state=`echo ${jobdata} | jq ".state"` + list_ranks=`echo ${jobdata} | jq ".ranks"` + list_nnodes=`echo ${jobdata} | jq ".nnodes"` + list_nodelist=`echo ${jobdata} | jq -r ".nodelist"` + list_ntasks=`echo ${jobdata} | jq ".ntasks"` + list_ncores=`echo ${jobdata} | jq ".ncores"` + list_name=`echo ${jobdata} | jq -r ".name"` + list_cwd=`echo ${jobdata} | jq -r ".cwd"` + list_queue=`echo ${jobdata} | jq -r ".queue"` + list_project=`echo ${jobdata} | jq -r ".project"` + list_bank=`echo ${jobdata} | jq -r ".bank"` + list_waitstatus=`echo ${jobdata} | jq ".waitstatus"` + list_success=`echo ${jobdata} | jq ".success"` + list_result=`echo ${jobdata} | jq ".result"` + list_expiration=`echo ${jobdata} | jq ".expiration"` + list_annotations=`echo ${jobdata} | jq ".annotations"` + list_dependencies=`echo ${jobdata} | jq ".dependencies"` + list_exception_occurred=`echo ${jobdata} | jq ".exception_occurred"` + list_exception_type=`echo ${jobdata} | jq -r ".exception_type"` + list_exception_severity=`echo ${jobdata} | jq ".exception_severity"` + list_exception_note=`echo ${jobdata} | jq -r ".exception_note"` + list_t_submit=`echo ${jobdata} | jq ".t_submit"` + list_t_depend=`echo ${jobdata} | jq ".t_depend"` + list_t_run=`echo ${jobdata} | jq ".t_run"` + list_t_cleanup=`echo ${jobdata} | jq ".t_cleanup"` + list_t_inactive=`echo ${jobdata} | jq ".t_inactive"` +} + +# get job values from database +# arg1 - jobid +# arg2 - database path +get_db_values() { + local id=$(flux job id $1) + local dbpath=$2 + query="select * from jobs where id=$id;" + ${QUERYCMD} -t 10000 ${dbpath} "${query}" > query.out + db_jobdata=`grep "jobdata = " query.out | cut -f3- -d' '` + db_eventlog=`grep "eventlog = " query.out | awk '{print \$3}'` + db_jobspec=`grep "jobspec = " query.out | awk '{print \$3}'` + db_R=`grep "R = " query.out | awk '{print \$3}'` + db_userid=`echo ${jobdata} | jq ".userid"` + db_urgency=`echo ${jobdata} | jq ".urgency"` + db_priority=`echo ${jobdata} | jq ".priority"` + db_state=`echo ${jobdata} | jq ".state"` + db_ranks=`echo ${jobdata} | jq ".ranks"` + db_nnodes=`echo ${jobdata} | jq ".nnodes"` + db_nodelist=`echo ${jobdata} | jq -r ".nodelist"` + db_ntasks=`echo ${jobdata} | jq ".ntasks"` + db_ncores=`echo ${jobdata} | jq ".ncores"` + db_name=`echo ${jobdata} | jq -r ".name"` + db_cwd=`echo ${jobdata} | jq -r ".cwd"` + db_queue=`echo ${jobdata} | jq -r ".queue"` + db_project=`echo ${jobdata} | jq -r ".project"` + db_bank=`echo ${jobdata} | jq -r ".bank"` + db_waitstatus=`echo ${jobdata} | jq ".waitstatus"` + db_success=`echo ${jobdata} | jq ".success"` + db_result=`echo ${jobdata} | jq ".result"` + db_expiration=`echo ${jobdata} | jq ".expiration"` + db_annotations=`echo ${jobdata} | jq ".annotations"` + db_dependencies=`echo ${jobdata} | jq ".dependencies"` + db_exception_occurred=`echo ${jobdata} | jq ".exception_occurred"` + db_exception_type=`echo ${jobdata} | jq -r ".exception_type"` + db_exception_severity=`echo ${jobdata} | jq ".exception_severity"` + db_exception_note=`echo ${jobdata} | jq -r ".exception_note"` + db_t_submit=`echo ${jobdata} | jq ".t_submit"` + db_t_depend=`echo ${jobdata} | jq ".t_depend"` + db_t_run=`echo ${jobdata} | jq ".t_run"` + db_t_cleanup=`echo ${jobdata} | jq ".t_cleanup"` + db_t_inactive=`echo ${jobdata} | jq ".t_inactive"` +} + +# compare data from job list and job db +# arg1 - job id +# arg2 - dbpath +db_compare_data() { + local id=$(flux job id $1) + local dbpath=$2 + get_job_list_values ${id} + get_db_values ${id} ${dbpath} + if [ "${list_userid}" != "${db_userid}" ] \ + || [ "${list_urgency}" != "${db_urgency}" ] \ + || [ "${list_priority}" != "${db_priority}" ] \ + || [ "${list_state}" != "${db_state}" ] \ + || [ "${list_ranks}" != "${db_ranks}" ] \ + || [ "${list_nnodes}" != "${db_nnodes}" ] \ + || [ "${list_nodelist}" != "${db_nodelist}" ] \ + || [ "${list_ntasks}" != "${db_ntasks}" ] \ + || [ "${list_ncores}" != "${db_ncores}" ] \ + || [ "${list_name}" != "${db_name}" ] \ + || [ "${list_cwd}" != "${db_cwd}" ] \ + || [ "${list_queue}" != "${db_queue}" ] \ + || [ "${list_project}" != "${db_project}" ] \ + || [ "${list_bank}" != "${db_bank}" ] \ + || [ "${list_waitstatus}" != "${db_waitstatus}" ] \ + || [ "${list_success}" != "${db_success}" ] \ + || [ "${list_result}" != "${db_result}" ] \ + || [ "${list_expiration}" != "${db_expiration}" ] \ + || [ "${list_annotations}" != "${db_annotations}" ] \ + || [ "${list_dependencies}" != "${db_dependencies}" ] \ + || [ "${list_exception_occurred}" != "${db_exception_occurred}" ] \ + || [ "${list_exception_type}" != "${db_exception_type}" ] \ + || [ "${list_exception_severity}" != "${db_exception_severity}" ] \ + || [ "${list_exception_note}" != "${db_exception_note}" ] \ + || [ "${list_t_submit}" != "${db_t_submit}" ] \ + || [ "${list_t_depend}" != "${db_t_depend}" ] \ + || [ "${list_t_run}" != "${db_t_run}" ] \ + || [ "${list_t_cleanup}" != "${db_t_cleanup}" ] \ + || [ "${list_t_inactive}" != "${db_t_inactive}" ] + then + return 1 + fi + return 0 +} + +# wait for inactive job list to reach expected count +# arg1 - expected final count +# Usage: wait_inactive_count method target tries +# where method is job-manager, job-list, or job-list-stats (jq required) +wait_inactive_list_count() { + local target=$1 + local tries=50 + local count + while test $tries -gt 0; do + count=$(flux module stats -p jobs.inactive job-list) + test $count -eq $target && return 0 + sleep 0.25 + tries=$(($tries-1)) + done + return 1 +} + +# submit jobs for job list & job-list db testing + +test_expect_success 'configure testing queues' ' + flux config load <<-EOF && +[policy] +jobspec.defaults.system.queue = "defaultqueue" +[queues.defaultqueue] +[queues.altqueue] +EOF + flux queue start --all +' + +test_expect_success 'submit jobs for job list testing' ' + # Create `hostname` jobspec + # N.B. Used w/ `flux job submit` for serial job submission + # for efficiency (vs serial `flux submit`. + # + flux submit --dry-run hostname >hostname.json && + # + # submit jobs that will complete + # + for i in $(seq 0 3); do + flux job submit hostname.json >> inactiveids + fj_wait_event `tail -n 1 inactiveids` clean + done && + # + # Currently all inactive ids are "completed" + # + tac inactiveids | flux job id > completed.ids && + # + # Hold a job and cancel it, ensuring it never gets resources + # + jobid=`flux submit --urgency=hold /bin/true` && + flux cancel $jobid && + echo $jobid >> inactiveids && + # + # Run a job that will fail, copy its JOBID to both inactive and + # failed lists. + # + ! jobid=`flux submit --wait nosuchcommand` && + echo $jobid >> inactiveids && + tac inactiveids | flux job id > inactive.ids && + cat inactive.ids > all.ids && + # + # The job-list module has eventual consistency with the jobs stored in + # the job-manager queue. To ensure no raciness in tests, ensure + # jobs above have reached expected states in job-list before continuing. + # + flux job list-ids --wait-state=inactive $(job_list_state_ids inactive) > /dev/null +' + +test_expect_success 'flux job list inactive jobs (pre purge)' ' + flux job list -s inactive | jq .id > listI.out && + test_cmp listI.out inactive.ids +' + +test_expect_success 'flux job list inactive jobs w/ count (pre purge)' ' + count=$(job_list_state_count inactive) && + count=$((count - 2)) && + flux job list -s inactive -c ${count} | jq .id > listI_count.out && + head -n ${count} inactive.ids > listI_count.exp && + test_cmp listI_count.out listI_count.exp +' + +test_expect_success 'flux job list-inactive jobs (pre purge)' ' + flux job list-inactive | jq .id > list_inactive.out && + test_cmp list_inactive.out inactive.ids +' + +test_expect_success 'flux job list-inactive jobs w/ count (pre purge)' ' + count=$(job_list_state_count inactive) && + count=$((count - 1)) && + flux job list-inactive -c ${count} | jq .id > list_inactive_count.out && + head -n ${count} inactive.ids > list_inactive_count.exp && + test_cmp list_inactive_count.out list_inactive_count.exp +' + +test_expect_success 'flux job list-inactive jobs w/ since 1 (pre purge)' ' + timestamp=`flux job list -s inactive | head -n 2 | tail -n 1 | jq .t_inactive` && + flux job list-inactive --since=${timestamp} | jq .id > list_inactive_since1.out && + head -n 1 inactive.ids > list_inactive_since1.exp && + test_cmp list_inactive_since1.out list_inactive_since1.exp +' + +test_expect_success 'flux job list-inactive jobs w/ since 2 (pre purge)' ' + count=$(job_list_state_count inactive) && + count=$((count - 1)) && + timestamp=`flux job list -s inactive | tail -n 1 | jq .t_inactive` && + flux job list-inactive --since=${timestamp} | jq .id > list_inactive_since2.out && + head -n ${count} inactive.ids > list_inactive_since2.exp && + test_cmp list_inactive_since2.out list_inactive_since2.exp +' + +test_expect_success 'flux job list-ids jobs (pre purge)' ' + for id in `cat inactive.ids` + do + flux job list-ids ${id} | jq -e ".id == ${id}" + done +' + +test_expect_success 'flux job list has all inactive jobs cached' ' + cached=`flux module stats -p jobs.inactive job-list` && + test ${cached} -eq $(job_list_state_count inactive) +' + +test_expect_success 'job-list db: db stored in statedir' ' + dbpath=$(get_statedir_dbpath) && + ls ${dbpath} +' + +test_expect_success 'job-list db: has correct number of entries' ' + dbpath=$(get_statedir_dbpath) && + entries=$(db_count_entries ${dbpath}) && + test ${entries} -eq $(job_list_state_count inactive) +' + +test_expect_success 'job-list db: make sure job data looks ok' ' + dbpath=$(get_statedir_dbpath) && + for id in `cat list_inactive.out` + do + db_check_entries ${id} ${dbpath} + done +' + +test_expect_success 'job-list db: make sure job data correct' ' + dbpath=$(get_statedir_dbpath) && + for id in `cat list_inactive.out` + do + db_compare_data ${id} ${dbpath} + done +' + +test_expect_success 'reload the job-list module' ' + flux module reload job-list +' + +test_expect_success 'job-list db: still has correct number of entries' ' + dbpath=$(get_statedir_dbpath) && + entries=$(db_count_entries ${dbpath}) && + test ${entries} -eq $(job_list_state_count inactive) +' + +test_expect_success 'job-list db: purge 2 jobs' ' + len=$(job_list_state_count inactive) && + len=$((len - 2)) && + flux job purge --force --num-limit=${len} && + mv inactive.ids inactive.ids.orig && + head -n ${len} inactive.ids.orig > inactive.ids && + wait_inactive_list_count ${len} +' + +test_expect_success 'flux job list inactive jobs (post purge)' ' + flux job list -s inactive | jq .id > listIPP.out && + test_cmp listIPP.out inactive.ids.orig +' + +test_expect_success 'flux job list inactive jobs w/ count (post purge)' ' + count=$(cat inactive.ids.orig | wc -l) && + count=$((count - 2)) && + flux job list -s inactive -c ${count} | jq .id > listI_countPP.out && + head -n ${count} inactive.ids.orig > listI_countPP.exp && + test_cmp listI_countPP.out listI_countPP.exp +' + +test_expect_success 'flux job list-inactive jobs (post purge)' ' + flux job list-inactive | jq .id > list_inactivePP.out && + test_cmp list_inactivePP.out inactive.ids.orig +' + +test_expect_success 'flux job list-inactive jobs w/ count (post purge)' ' + count=$(cat inactive.ids.orig | wc -l) && + count=$((count - 1)) && + flux job list-inactive -c ${count} | jq .id > list_inactive_countPP.out && + head -n ${count} inactive.ids.orig > list_inactive_countPP.exp && + test_cmp list_inactive_countPP.out list_inactive_countPP.exp +' + +test_expect_success 'flux job list-inactive jobs w/ since 1 (post purge)' ' + timestamp=`flux job list -s inactive | head -n 2 | tail -n 1 | jq .t_inactive` && + flux job list-inactive --since=${timestamp} | jq .id > list_inactive_since1PP.out && + head -n 1 inactive.ids.orig > list_inactive_since1PP.exp && + test_cmp list_inactive_since1PP.out list_inactive_since1PP.exp +' + +test_expect_success 'flux job list-inactive jobs w/ since 2 (post purge)' ' + count=$(cat inactive.ids.orig | wc -l) && + count=$((count - 1)) && + timestamp=`flux job list -s inactive | tail -n 1 | jq .t_inactive` && + flux job list-inactive --since=${timestamp} | jq .id > list_inactive_since2PP.out && + head -n ${count} inactive.ids.orig > list_inactive_since2PP.exp && + test_cmp list_inactive_since2PP.out list_inactive_since2PP.exp +' + +test_expect_success 'flux job list-ids jobs (post purge)' ' + for id in `cat inactive.ids.orig` + do + flux job list-ids ${id} | jq -e ".id == ${id}" + done +' + +test_expect_success 'job-list db: purge all jobs' ' + len=$(job_list_state_count inactive) && + flux job purge --force --num-limit=0 && + : > inactive.ids && + wait_inactive_list_count 0 +' + +test_expect_success 'flux job list inactive jobs (all purge)' ' + flux job list -s inactive | jq .id > listI3.out && + test_cmp listI3.out inactive.ids.orig +' + +test_expect_success 'flux job list inactive jobs w/ count (all purge)' ' + count=$(cat inactive.ids.orig | wc -l) && + count=$((count - 2)) && + flux job list -s inactive -c ${count} | jq .id > listI_countAP.out && + head -n ${count} inactive.ids.orig > listI_countAP.exp && + test_cmp listI_countAP.out listI_countAP.exp +' + +test_expect_success 'flux job list-inactive jobs (all purge)' ' + flux job list-inactive | jq .id > list_inactiveAP.out && + test_cmp list_inactiveAP.out inactive.ids.orig +' + +test_expect_success 'flux job list-inactive jobs w/ count (all purge)' ' + count=$(cat inactive.ids.orig | wc -l) && + count=$((count - 1)) && + flux job list-inactive -c ${count} | jq .id > list_inactive_countAP.out && + head -n ${count} inactive.ids.orig > list_inactive_countAP.exp && + test_cmp list_inactive_countAP.out list_inactive_countAP.exp +' + +test_expect_success 'flux job list-inactive jobs w/ since 1 (all purge)' ' + timestamp=`flux job list -s inactive | head -n 2 | tail -n 1 | jq .t_inactive` && + flux job list-inactive --since=${timestamp} | jq .id > list_inactive_since1AP.out && + head -n 1 inactive.ids.orig > list_inactive_since1AP.exp && + test_cmp list_inactive_since1AP.out list_inactive_since1AP.exp +' + +test_expect_success 'flux job list-inactive jobs w/ since 2 (all purge)' ' + count=$(cat inactive.ids.orig | wc -l) && + count=$((count - 1)) && + timestamp=`flux job list -s inactive | tail -n 1 | jq .t_inactive` && + flux job list-inactive --since=${timestamp} | jq .id > list_inactive_since2AP.out && + head -n ${count} inactive.ids.orig > list_inactive_since2AP.exp && + test_cmp list_inactive_since2AP.out list_inactive_since2AP.exp +' + +test_expect_success 'flux job list-ids jobs (all purge)' ' + for id in `cat inactive.ids.orig` + do + flux job list-ids ${id} | jq -e ".id == ${id}" + done +' + +test_expect_success 'flux jobs gets all jobs with various constraints' ' + countA=$(cat inactive.ids.orig | wc -l) && + countB=$(flux jobs -n -a | wc -l) && + countC=$(flux jobs -n -a --queue=defaultqueue | wc -l) && + countD=$(flux jobs -n -a --user=$USER | wc -l) && + countE=$(flux jobs -n -a --since=-1h | wc -l) && + countF=$(flux jobs -n --filter=pending,running,inactive | wc -l) && + test $countA = $countB && + test $countA = $countC && + test $countA = $countD && + test $countA = $countE && + test $countA = $countF +' + +test_expect_success 'flux job db stats works' ' + ${RPC} job-list.db-stats 0 < /dev/null +' + +test_expect_success 'reload the job-list module with alternate config' ' + statedir=`flux getattr statedir` && + cat >job-list.toml <