Skip to content

Commit 9b6e32d

Browse files
Add additional entry in the record for parsed result
Signed-off-by: Athish Pranav D <[email protected]>
1 parent d02a383 commit 9b6e32d

File tree

3 files changed

+103
-5
lines changed

3 files changed

+103
-5
lines changed

include/fluent-bit/flb_msgpack_append_message.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3
2828

2929
#include <fluent-bit/flb_pack.h>
30+
#include <msgpack/unpack.h>
3031

3132
int flb_msgpack_append_message_to_record(char **result_buffer,
3233
size_t *result_size,
@@ -36,4 +37,12 @@ int flb_msgpack_append_message_to_record(char **result_buffer,
3637
char *message_buffer,
3738
size_t message_size,
3839
int message_type);
40+
41+
int flb_msgpack_append_map_to_record(char **result_buffer,
42+
size_t *result_size,
43+
flb_sds_t message_key_name,
44+
char *base_object_buffer,
45+
size_t base_object_size,
46+
char *map_data,
47+
size_t map_size);
3948
#endif

plugins/filter_parser/filter_parser.c

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@
2828
#include <fluent-bit/flb_kv.h>
2929
#include <fluent-bit/flb_log_event_decoder.h>
3030
#include <fluent-bit/flb_log_event_encoder.h>
31+
#include <fluent-bit/flb_msgpack_append_message.h>
3132
#include <msgpack.h>
3233

3334
#include <string.h>
3435
#include <fluent-bit.h>
35-
#include <time.h>
3636

3737
#include "filter_parser.h"
3838

@@ -186,6 +186,8 @@ static int cb_parser_filter(const void *data, size_t bytes,
186186
int key_len;
187187
const char *val_str;
188188
int val_len;
189+
char *parsed_buf;
190+
size_t parsed_size;
189191
char *out_buf;
190192
size_t out_size;
191193
struct flb_time parsed_time;
@@ -229,6 +231,7 @@ static int cb_parser_filter(const void *data, size_t bytes,
229231
&log_decoder,
230232
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
231233
out_buf = NULL;
234+
parsed_buf = NULL;
232235
append_arr_i = 0;
233236

234237
flb_time_copy(&tm, &log_event.timestamp);
@@ -276,7 +279,7 @@ static int cb_parser_filter(const void *data, size_t bytes,
276279
flb_time_zero(&parsed_time);
277280

278281
parse_ret = flb_parser_do(fp->parser, val_str, val_len,
279-
(void **) &out_buf, &out_size,
282+
(void **) &parsed_buf, &parsed_size,
280283
&parsed_time);
281284
if (parse_ret >= 0) {
282285
/*
@@ -320,13 +323,13 @@ static int cb_parser_filter(const void *data, size_t bytes,
320323
&log_encoder, log_event.metadata);
321324
}
322325

323-
if (out_buf != NULL) {
326+
if (parsed_buf != NULL) {
327+
324328
if (ctx->reserve_data) {
325329
char *new_buf = NULL;
326330
int new_size;
327331
int ret;
328-
329-
ret = flb_msgpack_expand_map(out_buf, out_size,
332+
ret = flb_msgpack_expand_map(parsed_buf, parsed_size,
330333
append_arr, append_arr_len,
331334
&new_buf, &new_size);
332335
if (ret == -1) {
@@ -339,6 +342,32 @@ static int cb_parser_filter(const void *data, size_t bytes,
339342
return FLB_FILTER_NOTOUCH;
340343
}
341344

345+
out_buf = new_buf;
346+
out_size = new_size;
347+
}
348+
else {
349+
out_buf = strdup(parsed_buf);
350+
out_size = parsed_size;
351+
}
352+
if (ctx->hash_value_field) {
353+
char *new_buf = NULL;
354+
size_t new_size;
355+
int ret;
356+
flb_sds_t hash_key = flb_sds_create("parsed");
357+
ret = flb_msgpack_append_map_to_record(&new_buf, &new_size,
358+
hash_key,
359+
out_buf, out_size,
360+
parsed_buf,parsed_size);
361+
flb_sds_destroy(hash_key);
362+
if ( ret != FLB_MAP_EXPAND_SUCCESS){
363+
flb_plg_error(ctx->ins, "cannot append parsed entry to record");
364+
365+
flb_log_event_decoder_destroy(&log_decoder);
366+
flb_log_event_encoder_destroy(&log_encoder);
367+
flb_free(append_arr);
368+
369+
return FLB_FILTER_NOTOUCH;
370+
}
342371
flb_free(out_buf);
343372
out_buf = new_buf;
344373
out_size = new_size;
@@ -351,6 +380,7 @@ static int cb_parser_filter(const void *data, size_t bytes,
351380
}
352381

353382
flb_free(out_buf);
383+
flb_free(parsed_buf);
354384
ret = FLB_FILTER_MODIFIED;
355385
}
356386
else {

src/flb_msgpack_append_message.c

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,62 @@ int flb_msgpack_append_message_to_record(char **result_buffer,
8080

8181
return result;
8282
}
83+
84+
int flb_msgpack_append_map_to_record(char **result_buffer,
85+
size_t *result_size,
86+
flb_sds_t message_key_name,
87+
char *base_object_buffer,
88+
size_t base_object_size,
89+
char *map_data,
90+
size_t map_size)
91+
{
92+
msgpack_unpacked unpacker;
93+
msgpack_object_kv *new_map_entries[1];
94+
msgpack_object_kv message_entry;
95+
char *modified_data_buffer;
96+
int modified_data_size;
97+
size_t off = 0;
98+
int i;
99+
int result = FLB_MAP_NOT_MODIFIED;
100+
*result_buffer = NULL;
101+
*result_size = 0;
102+
103+
if (message_key_name == NULL || map_data == NULL){
104+
return result;
105+
}
106+
107+
new_map_entries[0] = &message_entry;
108+
109+
message_entry.key.type = MSGPACK_OBJECT_STR;
110+
message_entry.key.via.str.size = flb_sds_len(message_key_name);
111+
message_entry.key.via.str.ptr = message_key_name;
112+
113+
msgpack_unpacked_init(&unpacker);
114+
if ((i=msgpack_unpack_next(&unpacker, map_data, map_size, &off)) !=
115+
MSGPACK_UNPACK_SUCCESS ) {
116+
msgpack_unpacked_destroy(&unpacker);
117+
return FLB_MAP_EXPANSION_ERROR;
118+
}
119+
if (unpacker.data.type != MSGPACK_OBJECT_MAP) {
120+
msgpack_unpacked_destroy(&unpacker);
121+
return FLB_MAP_EXPANSION_ERROR;
122+
}
123+
124+
message_entry.val = unpacker.data;
125+
result = flb_msgpack_expand_map(base_object_buffer,
126+
base_object_size,
127+
new_map_entries, 1,
128+
&modified_data_buffer,
129+
&modified_data_size);
130+
if (result == 0) {
131+
result = FLB_MAP_EXPAND_SUCCESS;
132+
*result_buffer = modified_data_buffer;
133+
*result_size = modified_data_size;
134+
}
135+
else {
136+
result = FLB_MAP_EXPANSION_ERROR;
137+
}
138+
msgpack_unpacked_destroy(&unpacker);
139+
140+
return result;
141+
}

0 commit comments

Comments
 (0)