Skip to content

Commit 7fcbc88

Browse files
authored
Merge pull request #1316 from kernelkit/statd-journaling
Add operational data journal with retention policy
2 parents 8d5e9c7 + cb9aeea commit 7fcbc88

File tree

10 files changed

+814
-14
lines changed

10 files changed

+814
-14
lines changed

doc/ChangeLog.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ All notable changes to the project are documented in this file.
1616
production environments. See the documentation for usage examples
1717
- Add support for "routing interfaces", issue #647. Lists interfaces with IP
1818
forwarding. Inspect from CLI using `show interface`, look for `` flag
19+
- Add operational data journal to statd with hierarchical time-based retention
20+
policy, keeping snapshots from every 5 minutes (recent) to yearly (historical)
1921

2022
### Fixes
2123

src/statd/Makefile.am

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,17 @@ DISTCLEANFILES = *~ *.d
22
ACLOCAL_AMFLAGS = -I m4
33

44
sbin_PROGRAMS = statd
5-
statd_SOURCES = statd.c shared.c shared.h
5+
statd_SOURCES = statd.c shared.c shared.h journal.c journal_retention.c journal.h
66
statd_CPPFLAGS = -D_DEFAULT_SOURCE -D_GNU_SOURCE
77
statd_CFLAGS = -W -Wall -Wextra
88
statd_CFLAGS += $(jansson_CFLAGS) $(libyang_CFLAGS) $(sysrepo_CFLAGS)
99
statd_CFLAGS += $(libsrx_CFLAGS) $(libite_CFLAGS)
1010
statd_LDADD = $(jansson_LIBS) $(libyang_LIBS) $(sysrepo_LIBS)
11-
statd_LDADD += $(libsrx_LIBS) $(libite_LIBS)
11+
statd_LDADD += $(libsrx_LIBS) $(libite_LIBS) $(EV_LIBS) -lz
12+
13+
# Test stub for journal retention policy (no dependencies, standalone)
14+
noinst_PROGRAMS = journal_retention_stub
15+
journal_retention_stub_SOURCES = journal_retention_stub.c journal_retention.c journal.h
16+
journal_retention_stub_CPPFLAGS = -D_DEFAULT_SOURCE -D_GNU_SOURCE -DJOURNAL_RETENTION_STUB
17+
journal_retention_stub_CFLAGS = -W -Wall -Wextra
18+
journal_retention_stub_LDFLAGS = -static

src/statd/configure.ac

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,14 @@ PKG_CHECK_MODULES([sysrepo], [sysrepo >= 2.2.36])
4343
PKG_CHECK_MODULES([libsrx], [libsrx >= 1.0.0])
4444

4545
AC_CHECK_HEADER([ev.h],
46-
[AC_CHECK_LIB([ev], [ev_loop_new],
47-
[],
48-
[AC_MSG_ERROR("libev not found")] )],
46+
[saved_LIBS="$LIBS"
47+
AC_CHECK_LIB([ev], [ev_loop_new],
48+
[EV_LIBS="-lev"],
49+
[AC_MSG_ERROR("libev not found")] )
50+
LIBS="$saved_LIBS"],
4951
[AC_MSG_ERROR("ev.h not found")]
5052
)
53+
AC_SUBST([EV_LIBS])
5154

5255
test "x$prefix" = xNONE && prefix=$ac_default_prefix
5356
test "x$exec_prefix" = xNONE && exec_prefix='${prefix}'

src/statd/journal.c

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
/* SPDX-License-Identifier: BSD-3-Clause */
2+
3+
#include <stdio.h>
4+
#include <stdlib.h>
5+
#include <unistd.h>
6+
#include <sysrepo.h>
7+
#include <ev.h>
8+
#include <string.h>
9+
#include <errno.h>
10+
#include <time.h>
11+
#include <sys/stat.h>
12+
#include <pthread.h>
13+
#include <dirent.h>
14+
#include <zlib.h>
15+
16+
#include <srx/common.h>
17+
18+
#include "journal.h"
19+
20+
#define JOURNAL_DIR "/var/lib/statd"
21+
#define DUMP_FILE "/var/lib/statd/operational.json"
22+
#define DUMP_INTERVAL 300.0 /* 5 minutes in seconds */
23+
24+
static void journal_stop_cb(struct ev_loop *loop, struct ev_async *, int)
25+
{
26+
DEBUG("Journal thread stop signal received");
27+
ev_break(loop, EVBREAK_ALL);
28+
}
29+
30+
static void get_timestamp_filename(char *buf, size_t len, time_t ts)
31+
{
32+
struct tm *tm = gmtime(&ts);
33+
34+
snprintf(buf, len, "%04d%02d%02d-%02d%02d%02d.json.gz",
35+
tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
36+
tm->tm_hour, tm->tm_min, tm->tm_sec);
37+
}
38+
39+
/* Compress a file using gzip */
40+
static int gzip_file(const char *src, const char *dst)
41+
{
42+
FILE *in;
43+
gzFile gz;
44+
char buf[4096];
45+
size_t n;
46+
47+
in = fopen(src, "r");
48+
if (!in) {
49+
ERROR("Error, opening %s: %s", src, strerror(errno));
50+
return -1;
51+
}
52+
53+
gz = gzopen(dst, "wb");
54+
if (!gz) {
55+
ERROR("Error, opening %s: %s", dst, strerror(errno));
56+
fclose(in);
57+
return -1;
58+
}
59+
60+
while ((n = fread(buf, 1, sizeof(buf), in)) > 0) {
61+
if (gzwrite(gz, buf, n) != (int)n) {
62+
ERROR("Error, writing to %s", dst);
63+
gzclose(gz);
64+
fclose(in);
65+
unlink(dst);
66+
return -1;
67+
}
68+
}
69+
70+
gzclose(gz);
71+
fclose(in);
72+
return 0;
73+
}
74+
75+
/* Create timestamped snapshot and update operational.json */
76+
static int create_snapshot(const struct lyd_node *tree)
77+
{
78+
char timestamp_file[300];
79+
char timestamp_path[512];
80+
time_t now;
81+
int ret;
82+
83+
/* Write latest snapshot as uncompressed operational.json for easy access */
84+
ret = lyd_print_path(DUMP_FILE, tree, LYD_JSON, LYD_PRINT_WITHSIBLINGS);
85+
if (ret != LY_SUCCESS) {
86+
ERROR("Error, writing operational.json: %d", ret);
87+
return -1;
88+
}
89+
90+
/* Compress operational.json to timestamped archive */
91+
now = time(NULL);
92+
get_timestamp_filename(timestamp_file, sizeof(timestamp_file), now);
93+
snprintf(timestamp_path, sizeof(timestamp_path), "%s/%s",
94+
JOURNAL_DIR, timestamp_file);
95+
96+
if (gzip_file(DUMP_FILE, timestamp_path) != 0) {
97+
ERROR("Error, compressing snapshot to %s", timestamp_file);
98+
return -1;
99+
}
100+
101+
DEBUG("Created snapshot %s", timestamp_file);
102+
return 0;
103+
}
104+
105+
static void journal_timer_cb(struct ev_loop *, struct ev_timer *w, int)
106+
{
107+
struct journal_ctx *jctx = (struct journal_ctx *)w->data;
108+
struct timespec start, end;
109+
struct snapshot *snapshots = NULL;
110+
sr_conn_ctx_t *con;
111+
const struct ly_ctx *ctx;
112+
sr_data_t *sr_data = NULL;
113+
sr_error_t err;
114+
int snapshot_count = 0;
115+
long duration_ms;
116+
117+
clock_gettime(CLOCK_MONOTONIC, &start);
118+
DEBUG("Starting operational datastore dump");
119+
120+
con = sr_session_get_connection(jctx->sr_query_ses);
121+
if (!con) {
122+
ERROR("Error, getting sr connection for dump");
123+
return;
124+
}
125+
126+
ctx = sr_acquire_context(con);
127+
if (!ctx) {
128+
ERROR("Error, acquiring context for dump");
129+
return;
130+
}
131+
132+
/* Query ALL operational data via second session
133+
* This triggers our own operational callbacks running in main thread
134+
*/
135+
DEBUG("Calling sr_get_data on session %p", jctx->sr_query_ses);
136+
err = sr_get_data(jctx->sr_query_ses, "/*", 0, 0, 0, &sr_data);
137+
if (err != SR_ERR_OK) {
138+
ERROR("Error, getting operational data: %s", sr_strerror(err));
139+
sr_release_context(con);
140+
return;
141+
}
142+
DEBUG("sr_get_data succeeded, got data tree: %p", sr_data ? sr_data->tree : NULL);
143+
144+
/* Create timestamped snapshot */
145+
if (sr_data && sr_data->tree) {
146+
if (create_snapshot(sr_data->tree) != 0) {
147+
sr_release_data(sr_data);
148+
sr_release_context(con);
149+
return;
150+
}
151+
} else {
152+
DEBUG("No operational data to dump");
153+
}
154+
155+
sr_release_data(sr_data);
156+
sr_release_context(con);
157+
158+
/* Apply retention policy */
159+
if (journal_scan_snapshots(JOURNAL_DIR, &snapshots, &snapshot_count) == 0) {
160+
DEBUG("Applying retention policy to %d snapshots", snapshot_count);
161+
journal_apply_retention_policy(JOURNAL_DIR, snapshots, snapshot_count, time(NULL));
162+
free(snapshots);
163+
}
164+
165+
clock_gettime(CLOCK_MONOTONIC, &end);
166+
duration_ms = (end.tv_sec - start.tv_sec) * 1000 +
167+
(end.tv_nsec - start.tv_nsec) / 1000000;
168+
169+
INFO("Journal snapshot created and retention applied (took %ld ms)", duration_ms);
170+
}
171+
172+
static void *journal_thread_fn(void *arg)
173+
{
174+
struct journal_ctx *jctx = (struct journal_ctx *)arg;
175+
struct ev_timer journal_timer;
176+
177+
INFO("Journal thread started");
178+
179+
if (mkdir("/var/lib/statd", 0755) != 0 && errno != EEXIST) {
180+
ERROR("Error, creating directory /var/lib/statd: %s", strerror(errno));
181+
}
182+
183+
jctx->journal_loop = ev_loop_new(EVFLAG_AUTO);
184+
if (!jctx->journal_loop) {
185+
ERROR("Error, creating journal thread event loop");
186+
return NULL;
187+
}
188+
189+
/* Setup async watcher for stop signal */
190+
ev_async_init(&jctx->journal_stop, journal_stop_cb);
191+
ev_async_start(jctx->journal_loop, &jctx->journal_stop);
192+
193+
/* Setup timer for periodic dumps */
194+
ev_timer_init(&journal_timer, journal_timer_cb, DUMP_INTERVAL, DUMP_INTERVAL);
195+
journal_timer.data = jctx;
196+
ev_timer_start(jctx->journal_loop, &journal_timer);
197+
198+
DEBUG("Journal thread entering event loop");
199+
ev_run(jctx->journal_loop, 0);
200+
201+
ev_timer_stop(jctx->journal_loop, &journal_timer);
202+
ev_async_stop(jctx->journal_loop, &jctx->journal_stop);
203+
ev_loop_destroy(jctx->journal_loop);
204+
205+
INFO("Journal thread exiting");
206+
return NULL;
207+
}
208+
209+
int journal_start(struct journal_ctx *jctx, sr_session_ctx_t *sr_query_ses)
210+
{
211+
int err;
212+
213+
jctx->sr_query_ses = sr_query_ses;
214+
jctx->journal_thread_running = 1;
215+
216+
err = pthread_create(&jctx->journal_thread, NULL, journal_thread_fn, jctx);
217+
if (err) {
218+
ERROR("Error, creating journal thread: %s", strerror(err));
219+
return err;
220+
}
221+
222+
INFO("Periodic operational dump enabled (every %.0f seconds)", DUMP_INTERVAL);
223+
return 0;
224+
}
225+
226+
void journal_stop(struct journal_ctx *jctx)
227+
{
228+
/* Signal thread to exit immediately via async watcher */
229+
jctx->journal_thread_running = 0;
230+
ev_async_send(jctx->journal_loop, &jctx->journal_stop);
231+
pthread_join(jctx->journal_thread, NULL);
232+
}

src/statd/journal.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/* SPDX-License-Identifier: BSD-3-Clause */
2+
3+
#ifndef STATD_JOURNAL_H_
4+
#define STATD_JOURNAL_H_
5+
6+
#include <pthread.h>
7+
#include <sysrepo.h>
8+
#include <ev.h>
9+
#include <time.h>
10+
11+
/* Snapshot structure for tracking journal files */
12+
struct snapshot {
13+
char filename[256];
14+
time_t timestamp;
15+
};
16+
17+
struct journal_ctx {
18+
sr_session_ctx_t *sr_query_ses; /* Consumer session for queries */
19+
struct ev_loop *journal_loop; /* Event loop for journal thread */
20+
pthread_t journal_thread; /* Thread for periodic dumps */
21+
struct ev_async journal_stop; /* Signal to stop journal thread */
22+
volatile int journal_thread_running; /* Flag to stop journal thread */
23+
};
24+
25+
int journal_start(struct journal_ctx *jctx, sr_session_ctx_t *sr_query_ses);
26+
void journal_stop(struct journal_ctx *jctx);
27+
28+
int journal_scan_snapshots(const char *dir, struct snapshot **snapshots, int *count);
29+
void journal_apply_retention_policy(const char *dir, struct snapshot *snapshots, int count, time_t now);
30+
31+
#endif

0 commit comments

Comments
 (0)