Skip to content

Commit 810431f

Browse files
committed
job-list: rebuild and store eventlog
Problem: In the near future we will need access to the job's eventlog when a job goes inactive. Solution: Rebuild the job eventlog from the events journal and store it internally in struct job.
1 parent 1e51de1 commit 810431f

File tree

3 files changed

+53
-0
lines changed

3 files changed

+53
-0
lines changed

src/modules/job-list/job_data.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ void job_destroy (void *data)
3232
grudgeset_destroy (job->dependencies);
3333
json_decref (job->jobspec);
3434
json_decref (job->R);
35+
free (job->eventlog);
3536
json_decref (job->exception_context);
3637
zlist_destroy (&job->next_states);
3738
free (job);

src/modules/job-list/job_data.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ struct job {
6363
/* cache of job information */
6464
json_t *jobspec;
6565
json_t *R;
66+
char *eventlog;
67+
size_t eventlog_len;
6668
json_t *exception_context;
6769

6870
/* Track which states we have seen and have completed transition

src/modules/job-list/job_state.c

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,42 @@ void job_state_unpause_cb (flux_t *h, flux_msg_handler_t *mh,
581581
flux_log_error (h, "error responding to unpause request");
582582
}
583583

584+
static int store_eventlog_entry (struct list_ctx *ctx,
585+
struct job *job,
586+
json_t *entry)
587+
{
588+
char *s = json_dumps (entry, 0);
589+
int rv = -1;
590+
591+
/* entry should have been verified via eventlog_entry_parse()
592+
* earlier */
593+
assert (s);
594+
595+
if (!job->eventlog) {
596+
job->eventlog_len = strlen (s) + 2; /* +2 for \n and \0 */
597+
if (!(job->eventlog = calloc (1, job->eventlog_len))) {
598+
flux_log_error (ctx->h, "calloc");
599+
goto error;
600+
601+
}
602+
strcpy (job->eventlog, s);
603+
strcat (job->eventlog, "\n");
604+
}
605+
else {
606+
job->eventlog_len += strlen (s) + 1; /* +1 for \n */
607+
if (!(job->eventlog = realloc (job->eventlog, job->eventlog_len))) {
608+
flux_log_error (ctx->h, "realloc");
609+
goto error;
610+
}
611+
strcat (job->eventlog, s);
612+
strcat (job->eventlog, "\n");
613+
}
614+
rv = 0;
615+
error:
616+
free (s);
617+
return rv;
618+
}
619+
584620
static struct job *eventlog_restart_parse (struct list_ctx *ctx,
585621
const char *eventlog,
586622
flux_jobid_t id)
@@ -610,6 +646,9 @@ static struct job *eventlog_restart_parse (struct list_ctx *ctx,
610646
goto error;
611647
}
612648

649+
if (store_eventlog_entry (ctx, job, value) < 0)
650+
goto error;
651+
613652
job->eventlog_seq++;
614653
if (!strcmp (name, "submit")) {
615654
if (submit_context_parse (ctx->h, job, context) < 0)
@@ -956,13 +995,18 @@ static int journal_submit_event (struct job_state_ctx *jsctx,
956995
struct job *job,
957996
flux_jobid_t id,
958997
int eventlog_seq,
998+
json_t *entry,
959999
json_t *context)
9601000
{
9611001
if (!job) {
9621002
if (!(job = job_create (jsctx->ctx, id))){
9631003
flux_log_error (jsctx->h, "%s: job_create", __FUNCTION__);
9641004
return -1;
9651005
}
1006+
if (store_eventlog_entry (jsctx->ctx, job, entry) < 0) {
1007+
job_destroy (job);
1008+
return -1;
1009+
}
9661010
if (zhashx_insert (jsctx->index, &job->id, job) < 0) {
9671011
flux_log_error (jsctx->h, "%s: zhashx_insert", __FUNCTION__);
9681012
job_destroy (job);
@@ -1335,11 +1379,17 @@ static int journal_process_event (struct job_state_ctx *jsctx, json_t *event)
13351379
return 0;
13361380
}
13371381

1382+
if (job && job->eventlog) {
1383+
if (store_eventlog_entry (jsctx->ctx, job, entry) < 0)
1384+
return -1;
1385+
}
1386+
13381387
if (!strcmp (name, "submit")) {
13391388
if (journal_submit_event (jsctx,
13401389
job,
13411390
id,
13421391
eventlog_seq,
1392+
entry,
13431393
context) < 0)
13441394
return -1;
13451395
}

0 commit comments

Comments
 (0)