Skip to content

Commit 503cdbd

Browse files
cosmo0920edsiper
andcommitted
ml: group: stream: Prepare to handle limitations of multiline
Signed-off-by: Hiroshi Hatake <[email protected]> Co-authored-by: Eduardo Silva <[email protected]>
1 parent 76d0d4f commit 503cdbd

File tree

4 files changed

+58
-0
lines changed

4 files changed

+58
-0
lines changed

include/fluent-bit/multiline/flb_ml.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@
5252
/* Default multiline buffer size: 4Kb */
5353
#define FLB_ML_BUF_SIZE 1024*4
5454

55+
/* Default limit for concatenated multiline messages: 2MB */
56+
#define FLB_ML_BUFFER_LIMIT_DEFAULT (1024 * 1024 * 2)
57+
58+
/* Return codes */
59+
#define FLB_MULTILINE_OK 0
60+
#define FLB_MULTILINE_TRUNCATED 1
61+
5562
/* Maximum number of groups per stream */
5663
#define FLB_ML_MAX_GROUPS 6
5764

@@ -107,6 +114,9 @@ struct flb_ml_stream_group {
107114
msgpack_packer mp_pck; /* temporary msgpack packer */
108115
struct flb_time mp_time; /* multiline time parsed from first line */
109116

117+
/* parent stream reference */
118+
struct flb_ml_stream *stream;
119+
110120
struct mk_list _head;
111121
};
112122

@@ -275,6 +285,9 @@ struct flb_ml {
275285
struct flb_log_event_encoder log_event_encoder;
276286
struct flb_log_event_decoder log_event_decoder;
277287
struct flb_config *config; /* Fluent Bit context */
288+
289+
/* Limit for concatenated multiline messages */
290+
size_t buffer_limit;
278291
};
279292

280293
struct flb_ml *flb_ml_create(struct flb_config *ctx, char *name);

include/fluent-bit/multiline/flb_ml_group.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,12 @@ struct flb_ml_group *flb_ml_group_create(struct flb_ml *ml);
2828
void flb_ml_group_destroy(struct flb_ml_group *group);
2929
int flb_ml_group_add_parser(struct flb_ml *ctx, struct flb_ml_parser_ins *p);
3030

31+
/*
32+
* Append data to a multiline stream group respecting the configured
33+
* buffer limit. The length of the appended data might be reduced if
34+
* the limit is reached.
35+
*/
36+
int flb_ml_group_cat(struct flb_ml_stream_group *group,
37+
const char *data, size_t len);
38+
3139
#endif

src/multiline/flb_ml_group.c

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <fluent-bit/flb_info.h>
2121
#include <fluent-bit/flb_mem.h>
2222
#include <fluent-bit/flb_log.h>
23+
#include <fluent-bit/flb_sds.h>
2324
#include <fluent-bit/multiline/flb_ml.h>
2425
#include <fluent-bit/multiline/flb_ml_parser.h>
2526

@@ -84,3 +85,36 @@ void flb_ml_group_destroy(struct flb_ml_group *group)
8485
mk_list_del(&group->_head);
8586
flb_free(group);
8687
}
88+
89+
int flb_ml_group_cat(struct flb_ml_stream_group *group,
90+
const char *data, size_t len)
91+
{
92+
size_t avail;
93+
size_t limit;
94+
int ret;
95+
int status = FLB_MULTILINE_OK;
96+
97+
limit = group->stream->ml->buffer_limit;
98+
if (limit > 0) {
99+
if (flb_sds_len(group->buf) >= limit) {
100+
return FLB_MULTILINE_TRUNCATED;
101+
}
102+
103+
avail = limit - flb_sds_len(group->buf);
104+
if (len > avail) {
105+
len = avail;
106+
status = FLB_MULTILINE_TRUNCATED;
107+
}
108+
}
109+
110+
if (len == 0) {
111+
return status;
112+
}
113+
114+
ret = flb_sds_cat_safe(&group->buf, data, len);
115+
if (ret == -1) {
116+
return -1;
117+
}
118+
119+
return status;
120+
}

src/multiline/flb_ml_stream.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ static struct flb_ml_stream_group *stream_group_create(struct flb_ml_stream *mst
7979
msgpack_sbuffer_init(&group->mp_sbuf);
8080
msgpack_packer_init(&group->mp_pck, &group->mp_sbuf, msgpack_sbuffer_write);
8181

82+
/* parent stream reference */
83+
group->stream = mst;
84+
8285
mk_list_add(&group->_head, &mst->groups);
8386

8487
return group;

0 commit comments

Comments
 (0)