Skip to content

Commit f671194

Browse files
committed
job-list: add job db
Problem: We would like to store inactive jobs on disk for retrieval at a later time. Solution: Add an sqlite job db to the job-list module. If the job-list module is configured with a database path or if the flux broker is configured with a statedir, store all inactive job data to the db.
1 parent e2e7ee8 commit f671194

File tree

13 files changed

+826
-6
lines changed

13 files changed

+826
-6
lines changed

src/modules/Makefile.am

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ job_list_la_SOURCES =
190190
job_list_la_CPPFLAGS = \
191191
$(AM_CPPFLAGS) \
192192
$(FLUX_SECURITY_CFLAGS) \
193-
$(HWLOC_CFLAGS)
193+
$(HWLOC_CFLAGS) \
194+
$(SQLITE_CFLAGS)
194195
job_list_la_LIBADD = \
195196
$(builddir)/job-list/libjob-list.la \
196197
$(top_builddir)/src/common/libjob/libjob.la \
@@ -199,7 +200,8 @@ job_list_la_LIBADD = \
199200
$(top_builddir)/src/common/libflux-optparse.la \
200201
$(top_builddir)/src/common/librlist/librlist.la \
201202
$(JANSSON_LIBS) \
202-
$(HWLOC_LIBS)
203+
$(HWLOC_LIBS) \
204+
$(SQLITE_LIBS)
203205
job_list_la_LDFLAGS = $(fluxmod_ldflags) -module
204206

205207
job_ingest_la_SOURCES =

src/modules/job-list/Makefile.am

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ libjob_list_la_SOURCES = \
2222
job_state.c \
2323
job_data.h \
2424
job_data.c \
25+
job_db.h \
26+
job_db.c \
2527
list.h \
2628
list.c \
2729
job_util.h \
@@ -35,7 +37,9 @@ libjob_list_la_SOURCES = \
3537
state_match.h \
3638
state_match.c \
3739
match_util.h \
38-
match_util.c
40+
match_util.c \
41+
util.h \
42+
util.c
3943

4044
TESTS = \
4145
test_job_data.t \

src/modules/job-list/config.c

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
static void config_reload_cb (flux_t *h,
2+
flux_msg_handler_t *mh,
3+
const flux_msg_t *msg,
4+
void *arg)
5+
{
6+
struct conf *conf = arg;
7+
const flux_conf_t *instance_conf;
8+
struct conf_callback *ccb;
9+
flux_error_t error;
10+
const char *errstr = NULL;
11+
12+
if (flux_conf_reload_decode (msg, &instance_conf) < 0)
13+
goto error;
14+
if (policy_validate (instance_conf, &error) < 0) {
15+
errstr = error.text;
16+
goto error;
17+
}
18+
ccb = zlistx_first (conf->callbacks);
19+
while (ccb) {
20+
if (ccb->cb (instance_conf, &error, ccb->arg) < 0) {
21+
errstr = error.text;
22+
errno = EINVAL;
23+
goto error;
24+
}
25+
ccb = zlistx_next (conf->callbacks);
26+
}
27+
if (flux_set_conf (h, flux_conf_incref (instance_conf)) < 0) {
28+
errstr = "error updating cached configuration";
29+
flux_conf_decref (instance_conf);
30+
goto error;
31+
}
32+
if (flux_respond (h, msg, NULL) < 0)
33+
flux_log_error (h, "error responding to config-reload request");
34+
return;
35+
error:
36+
if (flux_respond_error (h, msg, errno, errstr) < 0)
37+
flux_log_error (h, "error responding to config-reload request");
38+
}
39+
40+
static const struct flux_msg_handler_spec htab[] = {
41+
{ FLUX_MSGTYPE_REQUEST, "job-manager.config-reload", config_reload_cb, 0 },
42+
FLUX_MSGHANDLER_TABLE_END,
43+
};
44+
45+
46+
47+
48+
49+
static int process_config (struct kvs_ctx *ctx)
50+
{
51+
flux_error_t error;
52+
if (kvs_checkpoint_config_parse (ctx->kcp,
53+
flux_get_conf (ctx->h),
54+
&error) < 0) {
55+
flux_log (ctx->h, LOG_ERR, "%s", error.text);
56+
return -1;
57+
}
58+
return 0;
59+
}
60+
61+
62+
63+
static int checkpoint_period_parse (const flux_conf_t *conf,
64+
flux_error_t *errp,
65+
double *checkpoint_period)
66+
{
67+
flux_error_t error;
68+
const char *str = NULL;
69+
70+
if (flux_conf_unpack (conf,
71+
&error,
72+
"{s?{s?s}}",
73+
"kvs",
74+
"checkpoint-period", &str) < 0) {
75+
errprintf (errp,
76+
"error reading config for kvs: %s",
77+
error.text);
78+
return -1;
79+
}
80+
81+
if (str) {
82+
if (fsd_parse_duration (str, checkpoint_period) < 0) {
83+
errprintf (errp,
84+
"invalid checkpoint-period config: %s",
85+
str);
86+
return -1;
87+
}
88+
}
89+
90+
return 0;
91+
}
92+
93+
int kvs_checkpoint_config_parse (kvs_checkpoint_t *kcp,
94+
const flux_conf_t *conf,
95+
flux_error_t *errp)
96+
{
97+
if (kcp) {
98+
double checkpoint_period = kcp->checkpoint_period;
99+
if (checkpoint_period_parse (conf, errp, &checkpoint_period) < 0)
100+
return -1;
101+
kcp->checkpoint_period = checkpoint_period;
102+
}
103+
return 0;
104+
}
105+

src/modules/job-list/job-list.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@ static void list_ctx_destroy (struct list_ctx *ctx)
210210
flux_msglist_destroy (ctx->deferred_requests);
211211
if (ctx->jsctx)
212212
job_state_destroy (ctx->jsctx);
213+
if (ctx->dbctx)
214+
job_db_ctx_destroy (ctx->dbctx);
213215
if (ctx->isctx)
214216
idsync_ctx_destroy (ctx->isctx);
215217
if (ctx->mctx)
@@ -219,7 +221,7 @@ static void list_ctx_destroy (struct list_ctx *ctx)
219221
}
220222
}
221223

222-
static struct list_ctx *list_ctx_create (flux_t *h)
224+
static struct list_ctx *list_ctx_create (flux_t *h, int argc, char **argv)
223225
{
224226
struct list_ctx *ctx = calloc (1, sizeof (*ctx));
225227
if (!ctx)
@@ -231,6 +233,12 @@ static struct list_ctx *list_ctx_create (flux_t *h)
231233
goto error;
232234
if (!(ctx->isctx = idsync_ctx_create (ctx->h)))
233235
goto error;
236+
/* job_db_setup() performs a job_db_ctx_create() and some
237+
* initialization */
238+
if (!(ctx->dbctx = job_db_setup (h, argc, argv))) {
239+
if (errno != ENOTBLK)
240+
goto error;
241+
}
234242
if (!(ctx->jsctx = job_state_create (ctx)))
235243
goto error;
236244
if (!(ctx->deferred_requests = flux_msglist_create ()))
@@ -248,7 +256,7 @@ int mod_main (flux_t *h, int argc, char **argv)
248256
struct list_ctx *ctx;
249257
int rc = -1;
250258

251-
if (!(ctx = list_ctx_create (h))) {
259+
if (!(ctx = list_ctx_create (h, argc, argv))) {
252260
flux_log_error (h, "initialization error");
253261
goto done;
254262
}

src/modules/job-list/job-list.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
#include "src/common/libczmqcontainers/czmq_containers.h"
1717

1818
#include "job_state.h"
19+
#include "job_db.h"
1920
#include "idsync.h"
2021
#include "match.h"
2122

2223
struct list_ctx {
2324
flux_t *h;
2425
flux_msg_handler_t **handlers;
2526
struct job_state_ctx *jsctx;
27+
struct job_db_ctx *dbctx;
2628
struct idsync_ctx *isctx;
2729
struct flux_msglist *deferred_requests;
2830
struct match_ctx *mctx;

0 commit comments

Comments
 (0)