Skip to content

Commit 2a09dd7

Browse files
committed
filter_parser: support record accessor key
1 parent 0ae7e15 commit 2a09dd7

File tree

2 files changed

+63
-20
lines changed

2 files changed

+63
-20
lines changed

plugins/filter_parser/filter_parser.c

Lines changed: 60 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ static int configure(struct filter_parser_ctx *ctx,
102102
struct flb_kv *kv;
103103

104104
ctx->key_name = NULL;
105+
ctx->ra_key = NULL;
105106
ctx->reserve_data = FLB_FALSE;
106107
ctx->preserve_key = FLB_FALSE;
107108
mk_list_init(&ctx->parsers);
@@ -118,6 +119,15 @@ static int configure(struct filter_parser_ctx *ctx,
118119
}
119120
ctx->key_name_len = flb_sds_len(ctx->key_name);
120121

122+
if (ctx->key_name && ctx->key_name[0] == '$') {
123+
ctx->ra_key = flb_ra_create(ctx->key_name, FLB_TRUE);
124+
if (!ctx->ra_key) {
125+
flb_plg_error(ctx->ins, "invalid record accessor pattern '%s'",
126+
ctx->key_name);
127+
return -1;
128+
}
129+
}
130+
121131
/* Read all Parsers */
122132
mk_list_foreach(head, &f_ins->properties) {
123133
kv = mk_list_entry(head, struct flb_kv, _head);
@@ -246,20 +256,11 @@ static int cb_parser_filter(const void *data, size_t bytes,
246256
}
247257
}
248258

249-
/* Process the target key */
250-
for (i = 0; i < map_num; i++) {
251-
kv = &obj->via.map.ptr[i];
252-
if (msgpackobj2char(&kv->key, &key_str, &key_len) < 0) {
253-
continue;
254-
}
259+
if (ctx->ra_key) {
260+
struct flb_ra_value *rval;
255261

256-
if (key_len == ctx->key_name_len &&
257-
!strncmp(key_str, ctx->key_name, key_len)) {
258-
if (msgpackobj2char(&kv->val, &val_str, &val_len) < 0) {
259-
continue;
260-
}
261-
262-
/* Lookup parser */
262+
rval = flb_ra_get_value_object(ctx->ra_key, *obj);
263+
if (rval && msgpackobj2char(&rval->o, &val_str, &val_len) == 0) {
263264
mk_list_foreach(head, &ctx->parsers) {
264265
fp = mk_list_entry(head, struct filter_parser, _head);
265266
flb_time_zero(&parsed_time);
@@ -271,17 +272,53 @@ static int cb_parser_filter(const void *data, size_t bytes,
271272
if (flb_time_to_nanosec(&parsed_time) != 0L) {
272273
flb_time_copy(&tm, &parsed_time);
273274
}
275+
break;
276+
}
277+
}
278+
}
274279

275-
if (append_arr != NULL) {
276-
if (!ctx->preserve_key) {
277-
append_arr[i] = NULL;
280+
if (rval) {
281+
flb_ra_key_value_destroy(rval);
282+
}
283+
}
284+
else {
285+
/* Process the target key */
286+
for (i = 0; i < map_num; i++) {
287+
kv = &obj->via.map.ptr[i];
288+
if (msgpackobj2char(&kv->key, &key_str, &key_len) < 0) {
289+
continue;
290+
}
291+
292+
if (key_len == ctx->key_name_len &&
293+
!strncmp(key_str, ctx->key_name, key_len)) {
294+
if (msgpackobj2char(&kv->val, &val_str, &val_len) < 0) {
295+
continue;
296+
}
297+
298+
/* Lookup parser */
299+
mk_list_foreach(head, &ctx->parsers) {
300+
fp = mk_list_entry(head, struct filter_parser, _head);
301+
flb_time_zero(&parsed_time);
302+
303+
parse_ret = flb_parser_do(fp->parser, val_str, val_len,
304+
(void **) &out_buf, &out_size,
305+
&parsed_time);
306+
if (parse_ret >= 0) {
307+
if (flb_time_to_nanosec(&parsed_time) != 0L) {
308+
flb_time_copy(&tm, &parsed_time);
278309
}
279-
else if (!ctx->reserve_data) {
280-
/* Store only the key being preserved */
281-
append_arr[0] = kv;
310+
311+
if (append_arr != NULL) {
312+
if (!ctx->preserve_key) {
313+
append_arr[i] = NULL;
314+
}
315+
else if (!ctx->reserve_data) {
316+
/* Store only the key being preserved */
317+
append_arr[0] = kv;
318+
}
282319
}
320+
break;
283321
}
284-
break;
285322
}
286323
}
287324
}
@@ -413,6 +450,9 @@ static int cb_parser_exit(void *data, struct flb_config *config)
413450
}
414451

415452
delete_parsers(ctx);
453+
if (ctx->ra_key) {
454+
flb_ra_destroy(ctx->ra_key);
455+
}
416456
flb_free(ctx);
417457
return 0;
418458
}

plugins/filter_parser/filter_parser.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
#include <fluent-bit/flb_filter.h>
2525
#include <fluent-bit/flb_parser.h>
2626
#include <fluent-bit/flb_sds.h>
27+
#include <fluent-bit/flb_record_accessor.h>
28+
#include <fluent-bit/flb_ra_key.h>
2729

2830
struct filter_parser {
2931
struct flb_parser *parser;
@@ -33,6 +35,7 @@ struct filter_parser {
3335
struct filter_parser_ctx {
3436
flb_sds_t key_name;
3537
int key_name_len;
38+
struct flb_record_accessor *ra_key;
3639
int reserve_data;
3740
int preserve_key;
3841
struct mk_list parsers;

0 commit comments

Comments
 (0)