Skip to content

Commit a30a976

Browse files
Code refactoring
Signed-off-by: Athish Pranav D <[email protected]>
1 parent c021e35 commit a30a976

File tree

3 files changed

+103
-4
lines changed

3 files changed

+103
-4
lines changed

include/fluent-bit/flb_msgpack_append_message.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3
2828

2929
#include <fluent-bit/flb_pack.h>
30+
#include <msgpack/unpack.h>
3031

3132
int flb_msgpack_append_message_to_record(char **result_buffer,
3233
size_t *result_size,
@@ -36,4 +37,12 @@ int flb_msgpack_append_message_to_record(char **result_buffer,
3637
char *message_buffer,
3738
size_t message_size,
3839
int message_type);
40+
41+
int flb_msgpack_append_map_to_record(char **result_buffer,
42+
size_t *result_size,
43+
flb_sds_t message_key_name,
44+
char *base_object_buffer,
45+
size_t base_object_size,
46+
char *map_data,
47+
size_t map_size);
3948
#endif

plugins/filter_parser/filter_parser.c

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@
2828
#include <fluent-bit/flb_kv.h>
2929
#include <fluent-bit/flb_log_event_decoder.h>
3030
#include <fluent-bit/flb_log_event_encoder.h>
31+
#include <fluent-bit/flb_msgpack_append_message.h>
3132
#include <msgpack.h>
3233

3334
#include <string.h>
3435
#include <fluent-bit.h>
35-
#include <time.h>
3636

3737
#include "filter_parser.h"
3838

@@ -189,6 +189,8 @@ static int cb_parser_filter(const void *data, size_t bytes,
189189
int key_len;
190190
const char *val_str;
191191
int val_len;
192+
char *parsed_buf;
193+
size_t parsed_size;
192194
char *out_buf;
193195
size_t out_size;
194196
struct flb_time parsed_time;
@@ -232,6 +234,7 @@ static int cb_parser_filter(const void *data, size_t bytes,
232234
&log_decoder,
233235
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
234236
out_buf = NULL;
237+
parsed_buf = NULL;
235238
append_arr_i = 0;
236239

237240
flb_time_copy(&tm, &log_event.timestamp);
@@ -279,7 +282,7 @@ static int cb_parser_filter(const void *data, size_t bytes,
279282
flb_time_zero(&parsed_time);
280283

281284
parse_ret = flb_parser_do(fp->parser, val_str, val_len,
282-
(void **) &out_buf, &out_size,
285+
(void **) &parsed_buf, &parsed_size,
283286
&parsed_time);
284287
if (parse_ret >= 0) {
285288
/*
@@ -323,12 +326,13 @@ static int cb_parser_filter(const void *data, size_t bytes,
323326
&log_encoder, log_event.metadata);
324327
}
325328

326-
if (out_buf != NULL) {
329+
if (parsed_buf != NULL) {
330+
327331
if (ctx->reserve_data) {
328332
char *new_buf = NULL;
329333
int new_size;
330334
int ret;
331-
ret = flb_msgpack_expand_map(out_buf, out_size,
335+
ret = flb_msgpack_expand_map(parsed_buf, parsed_size,
332336
append_arr, append_arr_len,
333337
&new_buf, &new_size);
334338
if (ret == -1) {
@@ -341,6 +345,32 @@ static int cb_parser_filter(const void *data, size_t bytes,
341345
return FLB_FILTER_NOTOUCH;
342346
}
343347

348+
out_buf = new_buf;
349+
out_size = new_size;
350+
}
351+
else {
352+
out_buf = strdup(parsed_buf);
353+
out_size = parsed_size;
354+
}
355+
if (ctx->hash_value_field) {
356+
char *new_buf = NULL;
357+
size_t new_size;
358+
int ret;
359+
flb_sds_t hash_key = flb_sds_create("parsed");
360+
ret = flb_msgpack_append_map_to_record(&new_buf, &new_size,
361+
hash_key,
362+
out_buf, out_size,
363+
parsed_buf,parsed_size);
364+
flb_sds_destroy(hash_key);
365+
if ( ret != FLB_MAP_EXPAND_SUCCESS){
366+
flb_plg_error(ctx->ins, "cannot append parsed entry to record");
367+
368+
flb_log_event_decoder_destroy(&log_decoder);
369+
flb_log_event_encoder_destroy(&log_encoder);
370+
flb_free(append_arr);
371+
372+
return FLB_FILTER_NOTOUCH;
373+
}
344374
flb_free(out_buf);
345375
out_buf = new_buf;
346376
out_size = new_size;
@@ -353,6 +383,7 @@ static int cb_parser_filter(const void *data, size_t bytes,
353383
}
354384

355385
flb_free(out_buf);
386+
flb_free(parsed_buf);
356387
ret = FLB_FILTER_MODIFIED;
357388
}
358389
else {

src/flb_msgpack_append_message.c

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,62 @@ int flb_msgpack_append_message_to_record(char **result_buffer,
8080

8181
return result;
8282
}
83+
84+
int flb_msgpack_append_map_to_record(char **result_buffer,
85+
size_t *result_size,
86+
flb_sds_t message_key_name,
87+
char *base_object_buffer,
88+
size_t base_object_size,
89+
char *map_data,
90+
size_t map_size)
91+
{
92+
msgpack_unpacked unpacker;
93+
msgpack_object_kv *new_map_entries[1];
94+
msgpack_object_kv message_entry;
95+
char *modified_data_buffer;
96+
int modified_data_size;
97+
size_t off = 0;
98+
int i;
99+
int result = FLB_MAP_NOT_MODIFIED;
100+
*result_buffer = NULL;
101+
*result_size = 0;
102+
103+
if (message_key_name == NULL || map_data == NULL){
104+
return result;
105+
}
106+
107+
new_map_entries[0] = &message_entry;
108+
109+
message_entry.key.type = MSGPACK_OBJECT_STR;
110+
message_entry.key.via.str.size = flb_sds_len(message_key_name);
111+
message_entry.key.via.str.ptr = message_key_name;
112+
113+
msgpack_unpacked_init(&unpacker);
114+
if ((i=msgpack_unpack_next(&unpacker, map_data, map_size, &off)) !=
115+
MSGPACK_UNPACK_SUCCESS ) {
116+
msgpack_unpacked_destroy(&unpacker);
117+
return FLB_MAP_EXPANSION_ERROR;
118+
}
119+
if (unpacker.data.type != MSGPACK_OBJECT_MAP) {
120+
msgpack_unpacked_destroy(&unpacker);
121+
return FLB_MAP_EXPANSION_ERROR;
122+
}
123+
124+
message_entry.val = unpacker.data;
125+
result = flb_msgpack_expand_map(base_object_buffer,
126+
base_object_size,
127+
new_map_entries, 1,
128+
&modified_data_buffer,
129+
&modified_data_size);
130+
if (result == 0) {
131+
result = FLB_MAP_EXPAND_SUCCESS;
132+
*result_buffer = modified_data_buffer;
133+
*result_size = modified_data_size;
134+
}
135+
else {
136+
result = FLB_MAP_EXPANSION_ERROR;
137+
}
138+
msgpack_unpacked_destroy(&unpacker);
139+
140+
return result;
141+
}

0 commit comments

Comments
 (0)