Skip to content

Commit a48d354

Browse files
committed
job-list: rebuild and store eventlog
Problem: In the near future we will need access to the job's eventlog when a job goes inactive. Solution: Rebuild the job eventlog from the events journal and store it internally in struct job.
1 parent 80c8a69 commit a48d354

File tree

3 files changed

+53
-0
lines changed

3 files changed

+53
-0
lines changed

src/modules/job-list/job_data.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ void job_destroy (void *data)
3636
grudgeset_destroy (job->dependencies);
3737
json_decref (job->jobspec);
3838
json_decref (job->R);
39+
free (job->eventlog);
3940
json_decref (job->exception_context);
4041
zlist_destroy (&job->next_states);
4142
free (job);

src/modules/job-list/job_data.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ struct job {
6666
/* cache of job information */
6767
json_t *jobspec;
6868
json_t *R;
69+
char *eventlog;
70+
size_t eventlog_len;
6971
json_t *exception_context;
7072

7173
/* Track which states we have seen and have completed transition

src/modules/job-list/job_state.c

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,42 @@ void job_state_unpause_cb (flux_t *h, flux_msg_handler_t *mh,
541541
flux_log_error (h, "error responding to unpause request");
542542
}
543543

544+
static int store_eventlog_entry (struct job_state_ctx *jsctx,
545+
struct job *job,
546+
json_t *entry)
547+
{
548+
char *s = json_dumps (entry, 0);
549+
int rv = -1;
550+
551+
/* entry should have been verified via eventlog_entry_parse()
552+
* earlier */
553+
assert (s);
554+
555+
if (!job->eventlog) {
556+
job->eventlog_len = strlen (s) + 2; /* +2 for \n and \0 */
557+
if (!(job->eventlog = calloc (1, job->eventlog_len))) {
558+
flux_log_error (jsctx->h, "calloc");
559+
goto error;
560+
561+
}
562+
strcpy (job->eventlog, s);
563+
strcat (job->eventlog, "\n");
564+
}
565+
else {
566+
job->eventlog_len += strlen (s) + 1; /* +1 for \n */
567+
if (!(job->eventlog = realloc (job->eventlog, job->eventlog_len))) {
568+
flux_log_error (jsctx->h, "realloc");
569+
goto error;
570+
}
571+
strcat (job->eventlog, s);
572+
strcat (job->eventlog, "\n");
573+
}
574+
rv = 0;
575+
error:
576+
free (s);
577+
return rv;
578+
}
579+
544580
static struct job *eventlog_restart_parse (struct job_state_ctx *jsctx,
545581
const char *eventlog,
546582
flux_jobid_t id)
@@ -571,6 +607,9 @@ static struct job *eventlog_restart_parse (struct job_state_ctx *jsctx,
571607
goto error;
572608
}
573609

610+
if (store_eventlog_entry (jsctx, job, value) < 0)
611+
goto error;
612+
574613
job->eventlog_seq++;
575614
if (streq (name, "submit")) {
576615
if (submit_context_parse (jsctx->h, job, context) < 0)
@@ -927,11 +966,16 @@ static int journal_submit_event (struct job_state_ctx *jsctx,
927966
flux_jobid_t id,
928967
int eventlog_seq,
929968
double timestamp,
969+
json_t *entry,
930970
json_t *context)
931971
{
932972
if (!job) {
933973
if (!(job = job_create (jsctx->h, id)))
934974
return -1;
975+
if (store_eventlog_entry (jsctx, job, entry) < 0) {
976+
job_destroy (job);
977+
return -1;
978+
}
935979
if (zhashx_insert (jsctx->index, &job->id, job) < 0) {
936980
job_destroy (job);
937981
errno = ENOMEM;
@@ -1303,12 +1347,18 @@ static int journal_process_event (struct job_state_ctx *jsctx, json_t *event)
13031347
return 0;
13041348
}
13051349

1350+
if (job && job->eventlog) {
1351+
if (store_eventlog_entry (jsctx, job, entry) < 0)
1352+
return -1;
1353+
}
1354+
13061355
if (streq (name, "submit")) {
13071356
if (journal_submit_event (jsctx,
13081357
job,
13091358
id,
13101359
eventlog_seq,
13111360
timestamp,
1361+
entry,
13121362
context) < 0)
13131363
return -1;
13141364
}

0 commit comments

Comments
 (0)