Skip to content

Commit e973f22

Browse files
committed
storage: new metrics interface
Signed-off-by: Eduardo Silva <[email protected]>
1 parent 700e9e1 commit e973f22

File tree

2 files changed

+265
-1
lines changed

2 files changed

+265
-1
lines changed

include/fluent-bit/flb_storage.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,16 @@
2828
#define FLB_STORAGE_BL_MEM_LIMIT "5M"
2929
#define FLB_STORAGE_MAX_CHUNKS_UP 128
3030

31+
struct flb_storage_metrics {
32+
int fd;
33+
};
34+
3135
/*
3236
* The storage structure helps to associate the contexts between
3337
* input instances and the chunkio context and further streams.
3438
*
3539
* Each input instance have a stream associated.
3640
*/
37-
3841
struct flb_storage_input {
3942
int type; /* CIO_STORE_FS | CIO_STORE_MEM */
4043
struct cio_stream *stream;
@@ -47,4 +50,6 @@ int flb_storage_input_create(struct cio_ctx *cio,
4750
void flb_storage_destroy(struct flb_config *ctx);
4851
void flb_storage_input_destroy(struct flb_input_instance *in);
4952

53+
struct flb_storage_metrics *flb_storage_metrics_create(struct flb_config *ctx);
54+
5055
#endif

src/flb_storage.c

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,260 @@
2222
#include <fluent-bit/flb_input.h>
2323
#include <fluent-bit/flb_log.h>
2424
#include <fluent-bit/flb_storage.h>
25+
#include <fluent-bit/flb_scheduler.h>
26+
#include <fluent-bit/flb_utils.h>
27+
#include <fluent-bit/flb_http_server.h>
28+
29+
static void metrics_append_general(msgpack_packer *mp_pck,
30+
struct flb_config *ctx,
31+
struct flb_storage_metrics *sm)
32+
{
33+
struct cio_stats storage_st;
34+
35+
/* Retrieve general stats from the storage layer */
36+
cio_stats_get(ctx->cio, &storage_st);
37+
38+
msgpack_pack_str(mp_pck, 13);
39+
msgpack_pack_str_body(mp_pck, "storage_layer", 13);
40+
msgpack_pack_map(mp_pck, 1);
41+
42+
/* Chunks */
43+
msgpack_pack_str(mp_pck, 6);
44+
msgpack_pack_str_body(mp_pck, "chunks", 6);
45+
msgpack_pack_map(mp_pck, 5);
46+
47+
/* chunks['total_chunks'] */
48+
msgpack_pack_str(mp_pck, 12);
49+
msgpack_pack_str_body(mp_pck, "total_chunks", 12);
50+
msgpack_pack_uint64(mp_pck, storage_st.chunks_total);
51+
52+
/* chunks['mem_chunks'] */
53+
msgpack_pack_str(mp_pck, 10);
54+
msgpack_pack_str_body(mp_pck, "mem_chunks", 10);
55+
msgpack_pack_uint64(mp_pck, storage_st.chunks_mem);
56+
57+
/* chunks['fs_chunks'] */
58+
msgpack_pack_str(mp_pck, 9);
59+
msgpack_pack_str_body(mp_pck, "fs_chunks", 9);
60+
msgpack_pack_uint64(mp_pck, storage_st.chunks_fs);
61+
62+
/* chunks['fs_up_chunks'] */
63+
msgpack_pack_str(mp_pck, 12);
64+
msgpack_pack_str_body(mp_pck, "fs_chunks_up", 12);
65+
msgpack_pack_uint64(mp_pck, storage_st.chunks_fs_up);
66+
67+
/* chunks['fs_down_chunks'] */
68+
msgpack_pack_str(mp_pck, 14);
69+
msgpack_pack_str_body(mp_pck, "fs_chunks_down", 14);
70+
msgpack_pack_uint64(mp_pck, storage_st.chunks_fs_down);
71+
}
72+
73+
static void metrics_append_input(msgpack_packer *mp_pck,
74+
struct flb_config *ctx,
75+
struct flb_storage_metrics *sm)
76+
{
77+
int len;
78+
int ret;
79+
const char *tmp;
80+
char buf[32];
81+
ssize_t size;
82+
83+
/* chunks */
84+
int up;
85+
int down;
86+
int busy;
87+
int busy_size_err;
88+
ssize_t busy_size;
89+
struct mk_list *head;
90+
struct mk_list *h_chunks;
91+
struct flb_input_instance *i;
92+
struct flb_input_chunk *ic;
93+
94+
msgpack_pack_str(mp_pck, 12);
95+
msgpack_pack_str_body(mp_pck, "input_chunks", 12);
96+
msgpack_pack_map(mp_pck, mk_list_size(&ctx->inputs));
97+
98+
/* Input Plugins Ingestion */
99+
mk_list_foreach(head, &ctx->inputs) {
100+
i = mk_list_entry(head, struct flb_input_instance, _head);
101+
102+
tmp = flb_input_name(i);
103+
len = strlen(tmp);
104+
105+
msgpack_pack_str(mp_pck, len);
106+
msgpack_pack_str_body(mp_pck, tmp, len);
107+
108+
/* Map for 'status' and 'chunks' */
109+
msgpack_pack_map(mp_pck, 2);
110+
111+
/*
112+
* Status
113+
* ======
114+
*/
115+
msgpack_pack_str(mp_pck, 6);
116+
msgpack_pack_str_body(mp_pck, "status", 6);
117+
118+
/* 'status' map has 2 keys: overlimit and chunks */
119+
msgpack_pack_map(mp_pck, 3);
120+
121+
/* status['overlimit'] */
122+
msgpack_pack_str(mp_pck, 9);
123+
msgpack_pack_str_body(mp_pck, "overlimit", 9);
124+
125+
ret = FLB_FALSE;
126+
if (i->mem_buf_limit > 0) {
127+
if (i->mem_chunks_size >= i->mem_buf_limit) {
128+
ret = FLB_TRUE;
129+
}
130+
}
131+
if (ret == FLB_TRUE) {
132+
msgpack_pack_true(mp_pck);
133+
}
134+
else {
135+
msgpack_pack_false(mp_pck);
136+
}
137+
138+
/* status['mem_size'] */
139+
msgpack_pack_str(mp_pck, 8);
140+
msgpack_pack_str_body(mp_pck, "mem_size", 8);
141+
142+
/* Current memory size used based on last ingestion */
143+
flb_utils_bytes_to_human_readable_size(i->mem_chunks_size,
144+
buf, sizeof(buf) - 1);
145+
len = strlen(buf);
146+
msgpack_pack_str(mp_pck, len);
147+
msgpack_pack_str_body(mp_pck, buf, len);
148+
149+
/* status['mem_limit'] */
150+
msgpack_pack_str(mp_pck, 9);
151+
msgpack_pack_str_body(mp_pck, "mem_limit", 9);
152+
153+
flb_utils_bytes_to_human_readable_size(i->mem_buf_limit,
154+
buf, sizeof(buf) - 1);
155+
len = strlen(buf);
156+
msgpack_pack_str(mp_pck, len);
157+
msgpack_pack_str_body(mp_pck, buf, len);
158+
159+
/*
160+
* Chunks
161+
* ======
162+
*/
163+
msgpack_pack_str(mp_pck, 6);
164+
msgpack_pack_str_body(mp_pck, "chunks", 6);
165+
166+
/* 'chunks' has 3 keys: total, up, down, busy and busy_size */
167+
msgpack_pack_map(mp_pck, 5);
168+
169+
/* chunks['total_chunks'] */
170+
msgpack_pack_str(mp_pck, 5);
171+
msgpack_pack_str_body(mp_pck, "total", 5);
172+
msgpack_pack_uint64(mp_pck, mk_list_size(&i->chunks));
173+
174+
/*
175+
* chunks Details: chunks marked as 'busy' are 'locked' since they are in
176+
* a 'flush' state. No more data can be appended to a busy chunk.
177+
*/
178+
busy = 0;
179+
busy_size = 0;
180+
busy_size_err = 0;
181+
182+
/* up/down */
183+
up = 0;
184+
down = 0;
185+
186+
/* Iterate chunks for the input instance in question */
187+
mk_list_foreach(h_chunks, &i->chunks) {
188+
ic = mk_list_entry(h_chunks, struct flb_input_chunk, _head);
189+
if (ic->busy == FLB_TRUE) {
190+
busy++;
191+
size = cio_chunk_get_content_size(ic->chunk);
192+
if (size >= 0) {
193+
busy_size += size;
194+
}
195+
else {
196+
busy_size_err++;
197+
}
198+
}
199+
200+
if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
201+
up++;
202+
}
203+
else {
204+
down++;
205+
}
206+
207+
}
208+
209+
/* chunks['up'] */
210+
msgpack_pack_str(mp_pck, 2);
211+
msgpack_pack_str_body(mp_pck, "up", 2);
212+
msgpack_pack_uint64(mp_pck, up);
213+
214+
/* chunks['down'] */
215+
msgpack_pack_str(mp_pck, 4);
216+
msgpack_pack_str_body(mp_pck, "down", 4);
217+
msgpack_pack_uint64(mp_pck, down);
218+
219+
/* chunks['busy'] */
220+
msgpack_pack_str(mp_pck, 4);
221+
msgpack_pack_str_body(mp_pck, "busy", 4);
222+
msgpack_pack_uint64(mp_pck, busy);
223+
224+
/* chunks['busy_size'] */
225+
msgpack_pack_str(mp_pck, 9);
226+
msgpack_pack_str_body(mp_pck, "busy_size", 9);
227+
228+
flb_utils_bytes_to_human_readable_size(busy_size, buf, sizeof(buf) - 1);
229+
len = strlen(buf);
230+
msgpack_pack_str(mp_pck, len);
231+
msgpack_pack_str_body(mp_pck, buf, len);
232+
}
233+
}
234+
235+
static void cb_storage_metrics_collect(struct flb_config *ctx, void *data)
236+
{
237+
msgpack_sbuffer mp_sbuf;
238+
msgpack_packer mp_pck;
239+
240+
/* Prepare new outgoing buffer */
241+
msgpack_sbuffer_init(&mp_sbuf);
242+
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
243+
244+
/* Pack main map and append relevant data */
245+
msgpack_pack_map(&mp_pck, 2);
246+
metrics_append_general(&mp_pck, ctx, data);
247+
metrics_append_input(&mp_pck, ctx, data);
248+
249+
#ifdef FLB_HAVE_HTTP_SERVER
250+
if (ctx->http_server == FLB_TRUE) {
251+
flb_hs_push_storage_metrics(ctx->http_ctx, mp_sbuf.data, mp_sbuf.size);
252+
}
253+
#endif
254+
msgpack_sbuffer_destroy(&mp_sbuf);
255+
}
256+
257+
struct flb_storage_metrics *flb_storage_metrics_create(struct flb_config *ctx)
258+
{
259+
int ret;
260+
struct flb_storage_metrics *sm;
261+
262+
sm = flb_malloc(sizeof(struct flb_storage_metrics));
263+
if (!sm) {
264+
flb_errno();
265+
return NULL;
266+
}
267+
268+
ret = flb_sched_timer_cb_create(ctx, FLB_SCHED_TIMER_CB_PERM, 5000,
269+
cb_storage_metrics_collect,
270+
ctx->storage_metrics_ctx);
271+
if (ret == -1) {
272+
flb_error("[storage metrics] cannot create timer to collect metrics");
273+
flb_free(sm);
274+
return NULL;
275+
}
276+
277+
return sm;
278+
}
25279

26280
static int sort_chunk_cmp(const void *a_arg, const void *b_arg)
27281
{
@@ -322,6 +576,11 @@ void flb_storage_destroy(struct flb_config *ctx)
322576
return;
323577
}
324578

579+
if (ctx->storage_metrics == FLB_TRUE &&
580+
ctx->storage_metrics_ctx != NULL) {
581+
flb_free(ctx->storage_metrics_ctx);
582+
}
583+
325584
cio_destroy(cio);
326585

327586
/* Delete references from input instances */

0 commit comments

Comments
 (0)