Skip to content

Commit c767a6f

Browse files
committed
statd: add operational data journal with retention policy
Dump operational datastore to timestamped JSON snapshots every 5 minutes (in /var/lib/statd/). The operational.json symlink always points to the latest snapshot. Implement hierarchical retention policy that keeps the first snapshot of each time period (hour/day/week/month/year), providing fine-grained recent history while preventing unbounded disk usage. This will allow us to plot / track how the system state evolves as well as give us somewhat fine-grained info in the case of an event, such as a crash. Add unit test simulating months of snapshots to verify retention behavior using a statd stub that only runs the retention code locally (unit test) Signed-off-by: Richard Alpe <[email protected]>
1 parent d9af63c commit c767a6f

File tree

10 files changed

+780
-14
lines changed

10 files changed

+780
-14
lines changed

doc/ChangeLog.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ All notable changes to the project are documented in this file.
1616
production environments. See the documentation for usage examples
1717
- Add support for "routing interfaces", issue #647. Lists interfaces with IP
1818
forwarding. Inspect from CLI using `show interface`, look for `` flag
19+
- Add operational data journal to statd with hierarchical time-based retention
20+
policy, keeping snapshots from every 5 minutes (recent) to yearly (historical)
1921

2022
### Fixes
2123

src/statd/Makefile.am

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,17 @@ DISTCLEANFILES = *~ *.d
22
ACLOCAL_AMFLAGS = -I m4
33

44
sbin_PROGRAMS = statd
5-
statd_SOURCES = statd.c shared.c shared.h
5+
statd_SOURCES = statd.c shared.c shared.h journal.c journal_retention.c journal.h
66
statd_CPPFLAGS = -D_DEFAULT_SOURCE -D_GNU_SOURCE
77
statd_CFLAGS = -W -Wall -Wextra
88
statd_CFLAGS += $(jansson_CFLAGS) $(libyang_CFLAGS) $(sysrepo_CFLAGS)
99
statd_CFLAGS += $(libsrx_CFLAGS) $(libite_CFLAGS)
1010
statd_LDADD = $(jansson_LIBS) $(libyang_LIBS) $(sysrepo_LIBS)
11-
statd_LDADD += $(libsrx_LIBS) $(libite_LIBS)
11+
statd_LDADD += $(libsrx_LIBS) $(libite_LIBS) $(EV_LIBS)
12+
13+
# Test stub for journal retention policy (no dependencies, standalone)
14+
noinst_PROGRAMS = journal_retention_stub
15+
journal_retention_stub_SOURCES = journal_retention_stub.c journal_retention.c journal.h
16+
journal_retention_stub_CPPFLAGS = -D_DEFAULT_SOURCE -D_GNU_SOURCE -DJOURNAL_RETENTION_STUB
17+
journal_retention_stub_CFLAGS = -W -Wall -Wextra
18+
journal_retention_stub_LDFLAGS = -static

src/statd/configure.ac

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,14 @@ PKG_CHECK_MODULES([sysrepo], [sysrepo >= 2.2.36])
4343
PKG_CHECK_MODULES([libsrx], [libsrx >= 1.0.0])
4444

4545
AC_CHECK_HEADER([ev.h],
46-
[AC_CHECK_LIB([ev], [ev_loop_new],
47-
[],
48-
[AC_MSG_ERROR("libev not found")] )],
46+
[saved_LIBS="$LIBS"
47+
AC_CHECK_LIB([ev], [ev_loop_new],
48+
[EV_LIBS="-lev"],
49+
[AC_MSG_ERROR("libev not found")] )
50+
LIBS="$saved_LIBS"],
4951
[AC_MSG_ERROR("ev.h not found")]
5052
)
53+
AC_SUBST([EV_LIBS])
5154

5255
test "x$prefix" = xNONE && prefix=$ac_default_prefix
5356
test "x$exec_prefix" = xNONE && exec_prefix='${prefix}'

src/statd/journal.c

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/* SPDX-License-Identifier: BSD-3-Clause */
2+
3+
#include <stdio.h>
4+
#include <stdlib.h>
5+
#include <unistd.h>
6+
#include <sysrepo.h>
7+
#include <ev.h>
8+
#include <string.h>
9+
#include <errno.h>
10+
#include <time.h>
11+
#include <sys/stat.h>
12+
#include <pthread.h>
13+
#include <dirent.h>
14+
15+
#include <srx/common.h>
16+
17+
#include "journal.h"
18+
19+
#define JOURNAL_DIR "/var/lib/statd"
20+
#define DUMP_FILE "/var/lib/statd/operational.json"
21+
#define DUMP_INTERVAL 300.0 /* 5 minutes in seconds */
22+
23+
static void journal_stop_cb(struct ev_loop *loop, struct ev_async *, int)
24+
{
25+
DEBUG("Journal thread stop signal received");
26+
ev_break(loop, EVBREAK_ALL);
27+
}
28+
29+
static void get_timestamp_filename(char *buf, size_t len, time_t ts)
30+
{
31+
struct tm *tm = gmtime(&ts);
32+
33+
snprintf(buf, len, "%04d%02d%02d-%02d%02d%02d.json",
34+
tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
35+
tm->tm_hour, tm->tm_min, tm->tm_sec);
36+
}
37+
38+
/* Create timestamped snapshot and update operational.json symlink */
39+
static int create_snapshot(const struct lyd_node *tree)
40+
{
41+
char timestamp_file[300];
42+
char timestamp_path[512];
43+
time_t now;
44+
int ret;
45+
46+
now = time(NULL);
47+
get_timestamp_filename(timestamp_file, sizeof(timestamp_file), now);
48+
snprintf(timestamp_path, sizeof(timestamp_path), "%s/%s",
49+
JOURNAL_DIR, timestamp_file);
50+
51+
/* Write timestamped snapshot directly */
52+
ret = lyd_print_path(timestamp_path, tree, LYD_JSON,
53+
LYD_PRINT_WITHSIBLINGS);
54+
if (ret != LY_SUCCESS) {
55+
ERROR("Error, writing snapshot %s: %d", timestamp_file, ret);
56+
unlink(timestamp_path); /* Clean up incomplete file */
57+
return -1;
58+
}
59+
60+
/* Update operational.json symlink to point to latest */
61+
unlink(DUMP_FILE); /* Remove old symlink/file */
62+
if (symlink(timestamp_file, DUMP_FILE) != 0) {
63+
ERROR("Error, creating symlink to %s: %s",
64+
timestamp_file, strerror(errno));
65+
}
66+
67+
DEBUG("Created snapshot %s", timestamp_file);
68+
return 0;
69+
}
70+
71+
static void journal_timer_cb(struct ev_loop *, struct ev_timer *w, int)
72+
{
73+
struct journal_ctx *jctx = (struct journal_ctx *)w->data;
74+
struct timespec start, end;
75+
struct snapshot *snapshots = NULL;
76+
sr_conn_ctx_t *con;
77+
const struct ly_ctx *ctx;
78+
sr_data_t *sr_data = NULL;
79+
sr_error_t err;
80+
int snapshot_count = 0;
81+
long duration_ms;
82+
83+
clock_gettime(CLOCK_MONOTONIC, &start);
84+
DEBUG("Starting operational datastore dump");
85+
86+
con = sr_session_get_connection(jctx->sr_query_ses);
87+
if (!con) {
88+
ERROR("Error, getting sr connection for dump");
89+
return;
90+
}
91+
92+
ctx = sr_acquire_context(con);
93+
if (!ctx) {
94+
ERROR("Error, acquiring context for dump");
95+
return;
96+
}
97+
98+
/* Query ALL operational data via second session
99+
* This triggers our own operational callbacks running in main thread
100+
*/
101+
DEBUG("Calling sr_get_data on session %p", jctx->sr_query_ses);
102+
err = sr_get_data(jctx->sr_query_ses, "/*", 0, 0, 0, &sr_data);
103+
if (err != SR_ERR_OK) {
104+
ERROR("Error, getting operational data: %s", sr_strerror(err));
105+
sr_release_context(con);
106+
return;
107+
}
108+
DEBUG("sr_get_data succeeded, got data tree: %p", sr_data ? sr_data->tree : NULL);
109+
110+
/* Create timestamped snapshot */
111+
if (sr_data && sr_data->tree) {
112+
if (create_snapshot(sr_data->tree) != 0) {
113+
sr_release_data(sr_data);
114+
sr_release_context(con);
115+
return;
116+
}
117+
} else {
118+
DEBUG("No operational data to dump");
119+
}
120+
121+
sr_release_data(sr_data);
122+
sr_release_context(con);
123+
124+
/* Apply retention policy */
125+
if (journal_scan_snapshots(JOURNAL_DIR, &snapshots, &snapshot_count) == 0) {
126+
DEBUG("Applying retention policy to %d snapshots", snapshot_count);
127+
journal_apply_retention_policy(JOURNAL_DIR, snapshots, snapshot_count, time(NULL));
128+
free(snapshots);
129+
}
130+
131+
clock_gettime(CLOCK_MONOTONIC, &end);
132+
duration_ms = (end.tv_sec - start.tv_sec) * 1000 +
133+
(end.tv_nsec - start.tv_nsec) / 1000000;
134+
135+
INFO("Journal snapshot created and retention applied (took %ld ms)", duration_ms);
136+
}
137+
138+
static void *journal_thread_fn(void *arg)
139+
{
140+
struct journal_ctx *jctx = (struct journal_ctx *)arg;
141+
struct ev_timer journal_timer;
142+
143+
INFO("Journal thread started");
144+
145+
if (mkdir("/var/lib/statd", 0755) != 0 && errno != EEXIST) {
146+
ERROR("Error, creating directory /var/lib/statd: %s", strerror(errno));
147+
}
148+
149+
jctx->journal_loop = ev_loop_new(EVFLAG_AUTO);
150+
if (!jctx->journal_loop) {
151+
ERROR("Error, creating journal thread event loop");
152+
return NULL;
153+
}
154+
155+
/* Setup async watcher for stop signal */
156+
ev_async_init(&jctx->journal_stop, journal_stop_cb);
157+
ev_async_start(jctx->journal_loop, &jctx->journal_stop);
158+
159+
/* Setup timer for periodic dumps */
160+
ev_timer_init(&journal_timer, journal_timer_cb, DUMP_INTERVAL, DUMP_INTERVAL);
161+
journal_timer.data = jctx;
162+
ev_timer_start(jctx->journal_loop, &journal_timer);
163+
164+
DEBUG("Journal thread entering event loop");
165+
ev_run(jctx->journal_loop, 0);
166+
167+
ev_timer_stop(jctx->journal_loop, &journal_timer);
168+
ev_async_stop(jctx->journal_loop, &jctx->journal_stop);
169+
ev_loop_destroy(jctx->journal_loop);
170+
171+
INFO("Journal thread exiting");
172+
return NULL;
173+
}
174+
175+
int journal_start(struct journal_ctx *jctx, sr_session_ctx_t *sr_query_ses)
176+
{
177+
int err;
178+
179+
jctx->sr_query_ses = sr_query_ses;
180+
jctx->journal_thread_running = 1;
181+
182+
err = pthread_create(&jctx->journal_thread, NULL, journal_thread_fn, jctx);
183+
if (err) {
184+
ERROR("Error, creating journal thread: %s", strerror(err));
185+
return err;
186+
}
187+
188+
INFO("Periodic operational dump enabled (every %.0f seconds)", DUMP_INTERVAL);
189+
return 0;
190+
}
191+
192+
void journal_stop(struct journal_ctx *jctx)
193+
{
194+
/* Signal thread to exit immediately via async watcher */
195+
jctx->journal_thread_running = 0;
196+
ev_async_send(jctx->journal_loop, &jctx->journal_stop);
197+
pthread_join(jctx->journal_thread, NULL);
198+
}

src/statd/journal.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/* SPDX-License-Identifier: BSD-3-Clause */
2+
3+
#ifndef STATD_JOURNAL_H_
4+
#define STATD_JOURNAL_H_
5+
6+
#include <pthread.h>
7+
#include <sysrepo.h>
8+
#include <ev.h>
9+
#include <time.h>
10+
11+
/* Snapshot structure for tracking journal files */
12+
struct snapshot {
13+
char filename[256];
14+
time_t timestamp;
15+
};
16+
17+
struct journal_ctx {
18+
sr_session_ctx_t *sr_query_ses; /* Consumer session for queries */
19+
struct ev_loop *journal_loop; /* Event loop for journal thread */
20+
pthread_t journal_thread; /* Thread for periodic dumps */
21+
struct ev_async journal_stop; /* Signal to stop journal thread */
22+
volatile int journal_thread_running; /* Flag to stop journal thread */
23+
};
24+
25+
int journal_start(struct journal_ctx *jctx, sr_session_ctx_t *sr_query_ses);
26+
void journal_stop(struct journal_ctx *jctx);
27+
28+
int journal_scan_snapshots(const char *dir, struct snapshot **snapshots, int *count);
29+
void journal_apply_retention_policy(const char *dir, struct snapshot *snapshots, int count, time_t now);
30+
31+
#endif

0 commit comments

Comments
 (0)