Skip to content

Commit 4132292

Browse files
committed
ml: group: stream: Add multiline_truncated: true metadata for truncated records
Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent b71f25e commit 4132292

File tree

4 files changed

+13
-0
lines changed

4 files changed

+13
-0
lines changed

include/fluent-bit/multiline/flb_ml.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ struct flb_ml_stream_group {
113113
msgpack_sbuffer mp_sbuf; /* temporary msgpack buffer */
114114
msgpack_packer mp_pck; /* temporary msgpack packer */
115115
struct flb_time mp_time; /* multiline time parsed from first line */
116+
int truncated; /* was the buffer truncated? */
116117

117118
/* parent stream reference */
118119
struct flb_ml_stream *stream;

src/multiline/flb_ml.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1419,6 +1419,14 @@ int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser,
14191419
FLB_TRUE);
14201420
}
14211421

1422+
/* If the buffer was truncated, append the marker to the metadata */
1423+
if (ret == FLB_EVENT_ENCODER_SUCCESS && group->truncated) {
1424+
ret = flb_log_event_encoder_append_metadata_values(
1425+
&mst->ml->log_event_encoder,
1426+
FLB_LOG_EVENT_CSTRING_VALUE("multiline_truncated"),
1427+
FLB_LOG_EVENT_BOOLEAN_VALUE(FLB_TRUE));
1428+
}
1429+
14221430
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
14231431
ret = flb_log_event_encoder_set_body_from_raw_msgpack(
14241432
&mst->ml->log_event_encoder,
@@ -1457,6 +1465,7 @@ int flb_ml_flush_stream_group(struct flb_ml_parser *ml_parser,
14571465

14581466
msgpack_sbuffer_destroy(&mp_sbuf);
14591467
flb_sds_len_set(group->buf, 0);
1468+
group->truncated = FLB_FALSE;
14601469

14611470
/* Update last flush time */
14621471
group->last_flush = time_ms_now();

src/multiline/flb_ml_group.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,14 @@ int flb_ml_group_cat(struct flb_ml_stream_group *group,
9797
limit = group->stream->ml->buffer_limit;
9898
if (limit > 0) {
9999
if (flb_sds_len(group->buf) >= limit) {
100+
group->truncated = FLB_TRUE;
100101
return FLB_MULTILINE_TRUNCATED;
101102
}
102103

103104
avail = limit - flb_sds_len(group->buf);
104105
if (len > avail) {
105106
len = avail;
107+
group->truncated = FLB_TRUE;
106108
status = FLB_MULTILINE_TRUNCATED;
107109
}
108110
}

src/multiline/flb_ml_stream.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ static struct flb_ml_stream_group *stream_group_create(struct flb_ml_stream *mst
7979
msgpack_sbuffer_init(&group->mp_sbuf);
8080
msgpack_packer_init(&group->mp_pck, &group->mp_sbuf, msgpack_sbuffer_write);
8181

82+
group->truncated = FLB_FALSE;
8283
/* parent stream reference */
8384
group->stream = mst;
8485

0 commit comments

Comments
 (0)