Skip to content

Commit c62b165

Browse files
authored
ra: add append API, flb_ra_append_kv_pair (#5390)
* ra: add test case to update root key * ra_key: add case to modify root key/val * ra: add missing NULL checking * ra: add append APIs * tests: internal: record_accessor: add test for append APIs Signed-off-by: Takahiro Yamashita <[email protected]>
1 parent 785d275 commit c62b165

File tree

5 files changed

+722
-8
lines changed

5 files changed

+722
-8
lines changed

include/fluent-bit/flb_ra_key.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ int flb_ra_key_strcmp(flb_sds_t ckey, msgpack_object map,
6565
int flb_ra_key_regex_match(flb_sds_t ckey, msgpack_object map,
6666
struct mk_list *subkeys, struct flb_regex *regex,
6767
struct flb_regex_search *result);
68+
int flb_ra_key_value_append(struct flb_ra_parser *rp, msgpack_object obj,
69+
msgpack_object *in_val, msgpack_packer *mp_pck);
6870
int flb_ra_key_value_update(struct flb_ra_parser *rp, msgpack_object obj,
6971
msgpack_object *in_key, msgpack_object *in_val,
7072
msgpack_packer *mp_pck);

include/fluent-bit/flb_record_accessor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ int flb_ra_get_kv_pair(struct flb_record_accessor *ra, msgpack_object map,
5151

5252
struct flb_ra_value *flb_ra_get_value_object(struct flb_record_accessor *ra,
5353
msgpack_object map);
54+
int flb_ra_append_kv_pair(struct flb_record_accessor *ra, msgpack_object map,
55+
void **out_map, size_t *out_size, msgpack_object *in_val);
5456
int flb_ra_update_kv_pair(struct flb_record_accessor *ra, msgpack_object map,
5557
void **out_map, size_t *out_size,
5658
msgpack_object *in_key, msgpack_object *in_val);

src/flb_ra_key.c

Lines changed: 233 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -405,14 +405,14 @@ static int update_subkey_array(msgpack_object *obj, struct mk_list *subkeys,
405405

406406
/* check the current msgpack object is an array */
407407
if (obj->type != MSGPACK_OBJECT_ARRAY) {
408-
flb_error("%s: object is not map", __FUNCTION__);
408+
flb_error("%s: object is not array", __FUNCTION__);
409409
return -1;
410410
}
411411
size = obj->via.array.size;
412412
/* Index limit and ensure no overflow */
413413
if (entry->array_id == INT_MAX ||
414414
size < entry->array_id + 1) {
415-
flb_error("%s: out of index", __FUNCTION__);
415+
flb_trace("%s: out of index", __FUNCTION__);
416416
return -1;
417417
}
418418

@@ -431,7 +431,7 @@ static int update_subkey_array(msgpack_object *obj, struct mk_list *subkeys,
431431
}
432432

433433
if (subkeys->next == NULL) {
434-
flb_error("%s: end of subkey", __FUNCTION__);
434+
flb_trace("%s: end of subkey", __FUNCTION__);
435435
return -1;
436436
}
437437
ret = update_subkey(&obj->via.array.ptr[i], subkeys->next,
@@ -459,14 +459,14 @@ static int update_subkey_map(msgpack_object *obj, struct mk_list *subkeys,
459459
entry = mk_list_entry_first(subkeys, struct flb_ra_subentry, _head);
460460
/* check the current msgpack object is a map */
461461
if (obj->type != MSGPACK_OBJECT_MAP) {
462-
flb_error("%s: object is not map", __FUNCTION__);
462+
flb_trace("%s: object is not map", __FUNCTION__);
463463
return -1;
464464
}
465465
size = obj->via.map.size;
466466

467467
ret_id = ra_key_val_id(entry->str, *obj);
468468
if (ret_id < 0) {
469-
flb_error("%s: not found", __FUNCTION__);
469+
flb_trace("%s: not found", __FUNCTION__);
470470
return -1;
471471
}
472472

@@ -494,7 +494,7 @@ static int update_subkey_map(msgpack_object *obj, struct mk_list *subkeys,
494494
continue;
495495
}
496496
if (subkeys->next == NULL) {
497-
flb_error("%s: end of subkey", __FUNCTION__);
497+
flb_trace("%s: end of subkey", __FUNCTION__);
498498
return -1;
499499
}
500500
msgpack_pack_object(mp_pck, obj->via.map.ptr[i].key);
@@ -547,6 +547,32 @@ int flb_ra_key_value_update(struct flb_ra_parser *rp, msgpack_object map,
547547
map_size = map.via.map.size;
548548

549549
msgpack_pack_map(mp_pck, map_size);
550+
if (levels == 0) {
551+
/* no subkeys */
552+
for (i=0; i<map_size; i++) {
553+
if (i != kv_id) {
554+
/* pack original key/val */
555+
msgpack_pack_object(mp_pck, map.via.map.ptr[i].key);
556+
msgpack_pack_object(mp_pck, map.via.map.ptr[i].val);
557+
continue;
558+
}
559+
560+
/* update key/val */
561+
if (in_key != NULL) {
562+
msgpack_pack_object(mp_pck, *in_key);
563+
}
564+
else {
565+
msgpack_pack_object(mp_pck, map.via.map.ptr[i].key);
566+
}
567+
if (in_val != NULL) {
568+
msgpack_pack_object(mp_pck, *in_val);
569+
}
570+
else {
571+
msgpack_pack_object(mp_pck, map.via.map.ptr[i].val);
572+
}
573+
}
574+
return 0;
575+
}
550576

551577
for (i=0; i<map_size; i++) {
552578
msgpack_pack_object(mp_pck, map.via.map.ptr[i].key);
@@ -565,6 +591,207 @@ int flb_ra_key_value_update(struct flb_ra_parser *rp, msgpack_object map,
565591
return 0;
566592
}
567593

594+
static int append_subkey(msgpack_object *obj, struct mk_list *subkeys,
595+
int levels, int *matched,
596+
msgpack_object *in_val,
597+
msgpack_packer *mp_pck);
598+
599+
600+
static int append_subkey_array(msgpack_object *obj, struct mk_list *subkeys,
601+
int levels, int *matched,
602+
msgpack_object *in_val,
603+
msgpack_packer *mp_pck)
604+
{
605+
struct flb_ra_subentry *entry;
606+
int i;
607+
int ret;
608+
int size;
609+
610+
/* check the current msgpack object is an array */
611+
if (obj->type != MSGPACK_OBJECT_ARRAY) {
612+
flb_trace("%s: object is not array", __FUNCTION__);
613+
return -1;
614+
}
615+
size = obj->via.array.size;
616+
entry = mk_list_entry_first(subkeys, struct flb_ra_subentry, _head);
617+
618+
if (levels == *matched) {
619+
/* append val */
620+
msgpack_pack_array(mp_pck, size+1);
621+
for (i=0; i<size; i++) {
622+
msgpack_pack_object(mp_pck, obj->via.array.ptr[i]);
623+
}
624+
msgpack_pack_object(mp_pck, *in_val);
625+
626+
*matched = -1;
627+
return 0;
628+
}
629+
630+
/* Index limit and ensure no overflow */
631+
if (entry->array_id == INT_MAX ||
632+
size < entry->array_id + 1) {
633+
flb_trace("%s: out of index", __FUNCTION__);
634+
return -1;
635+
}
636+
637+
msgpack_pack_array(mp_pck, size);
638+
for (i=0; i<size; i++) {
639+
if (i != entry->array_id) {
640+
msgpack_pack_object(mp_pck, obj->via.array.ptr[i]);
641+
continue;
642+
}
643+
if (*matched >= 0) {
644+
*matched += 1;
645+
}
646+
if (subkeys->next == NULL) {
647+
flb_trace("%s: end of subkey", __FUNCTION__);
648+
return -1;
649+
}
650+
ret = append_subkey(&obj->via.array.ptr[i], subkeys->next,
651+
levels, matched,
652+
in_val, mp_pck);
653+
if (ret < 0) {
654+
return -1;
655+
}
656+
}
657+
return 0;
658+
}
659+
660+
static int append_subkey_map(msgpack_object *obj, struct mk_list *subkeys,
661+
int levels, int *matched,
662+
msgpack_object *in_val,
663+
msgpack_packer *mp_pck)
664+
{
665+
struct flb_ra_subentry *entry;
666+
int i;
667+
int ret_id;
668+
int size;
669+
int ret;
670+
671+
/* check the current msgpack object is a map */
672+
if (obj->type != MSGPACK_OBJECT_MAP) {
673+
flb_trace("%s: object is not map", __FUNCTION__);
674+
return -1;
675+
}
676+
size = obj->via.map.size;
677+
entry = mk_list_entry_first(subkeys, struct flb_ra_subentry, _head);
678+
679+
if (levels == *matched) {
680+
/* append val */
681+
msgpack_pack_map(mp_pck, size+1);
682+
for (i=0; i<size; i++) {
683+
msgpack_pack_object(mp_pck, obj->via.map.ptr[i].key);
684+
msgpack_pack_object(mp_pck, obj->via.map.ptr[i].val);
685+
}
686+
msgpack_pack_str(mp_pck, flb_sds_len(entry->str));
687+
msgpack_pack_str_body(mp_pck, entry->str, flb_sds_len(entry->str));
688+
msgpack_pack_object(mp_pck, *in_val);
689+
690+
*matched = -1;
691+
return 0;
692+
}
693+
694+
695+
ret_id = ra_key_val_id(entry->str, *obj);
696+
if (ret_id < 0) {
697+
flb_trace("%s: not found", __FUNCTION__);
698+
return -1;
699+
}
700+
701+
msgpack_pack_map(mp_pck, size);
702+
for (i=0; i<size; i++) {
703+
if (i != ret_id) {
704+
msgpack_pack_object(mp_pck, obj->via.map.ptr[i].key);
705+
msgpack_pack_object(mp_pck, obj->via.map.ptr[i].val);
706+
continue;
707+
}
708+
709+
if (*matched >= 0) {
710+
*matched += 1;
711+
}
712+
if (subkeys->next == NULL) {
713+
flb_trace("%s: end of subkey", __FUNCTION__);
714+
return -1;
715+
}
716+
msgpack_pack_object(mp_pck, obj->via.map.ptr[i].key);
717+
ret = append_subkey(&(obj->via.map.ptr[i].val), subkeys->next,
718+
levels, matched,
719+
in_val, mp_pck);
720+
if (ret < 0) {
721+
return -1;
722+
}
723+
}
724+
return 0;
725+
}
726+
727+
static int append_subkey(msgpack_object *obj, struct mk_list *subkeys,
728+
int levels, int *matched,
729+
msgpack_object *in_val,
730+
msgpack_packer *mp_pck)
731+
{
732+
struct flb_ra_subentry *entry;
733+
734+
entry = mk_list_entry_first(subkeys, struct flb_ra_subentry, _head);
735+
736+
if (entry->type == FLB_RA_PARSER_ARRAY_ID) {
737+
return append_subkey_array(obj, subkeys,
738+
levels, matched,
739+
in_val, mp_pck);
740+
}
741+
return append_subkey_map(obj, subkeys, levels, matched, in_val, mp_pck);
742+
}
743+
744+
int flb_ra_key_value_append(struct flb_ra_parser *rp, msgpack_object map,
745+
msgpack_object *in_val, msgpack_packer *mp_pck)
746+
{
747+
int ref_level;
748+
int map_size;
749+
int i;
750+
int kv_id;
751+
int ret;
752+
int matched = 0;
753+
754+
map_size = map.via.map.size;
755+
756+
/* Decrement since the last key doesn't exist */
757+
ref_level = mk_list_size(rp->key->subkeys) - 1;
758+
if (ref_level < 0) {
759+
/* no subkeys */
760+
msgpack_pack_map(mp_pck, map_size+1);
761+
for (i=0; i<map_size; i++) {
762+
msgpack_pack_object(mp_pck, map.via.map.ptr[i].key);
763+
msgpack_pack_object(mp_pck, map.via.map.ptr[i].val);
764+
}
765+
msgpack_pack_str(mp_pck, flb_sds_len(rp->key->name));
766+
msgpack_pack_str_body(mp_pck, rp->key->name, flb_sds_len(rp->key->name));
767+
msgpack_pack_object(mp_pck, *in_val);
768+
return 0;
769+
}
770+
771+
/* Get the key position in the map */
772+
kv_id = ra_key_val_id(rp->key->name, map);
773+
if (kv_id == -1) {
774+
return -1;
775+
}
776+
777+
msgpack_pack_map(mp_pck, map_size);
778+
for (i=0; i<map_size; i++) {
779+
msgpack_pack_object(mp_pck, map.via.map.ptr[i].key);
780+
if (i != kv_id) {
781+
msgpack_pack_object(mp_pck, map.via.map.ptr[i].val);
782+
continue;
783+
}
784+
ret = append_subkey(&(map.via.map.ptr[i].val), rp->key->subkeys,
785+
ref_level, &matched,
786+
in_val, mp_pck);
787+
if (ret < 0) {
788+
return -1;
789+
}
790+
}
791+
792+
return 0;
793+
}
794+
568795
void flb_ra_key_value_destroy(struct flb_ra_value *v)
569796
{
570797
if (v->type == FLB_RA_STRING) {

src/flb_record_accessor.c

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,7 @@ int flb_ra_update_kv_pair(struct flb_record_accessor *ra, msgpack_object map,
679679
flb_error("%s: no inputs", __FUNCTION__);
680680
return -1;
681681
}
682-
else if (ra == NULL || out_map == NULL) {
682+
else if (ra == NULL || out_map == NULL || out_size == NULL) {
683683
/* invalid input */
684684
flb_error("%s: invalid input", __FUNCTION__);
685685
return -1;
@@ -708,3 +708,66 @@ int flb_ra_update_kv_pair(struct flb_record_accessor *ra, msgpack_object map,
708708

709709
return 0;
710710
}
711+
712+
/**
713+
* Add key and/or value of the map using record accessor.
714+
* If key already exists, the API fails.
715+
*
716+
* @param ra the record accessor to specify key.
717+
* @param map the original map.
718+
* @param in_val the pointer to add val.
719+
* @param out_map the updated map. If the API fails, out_map will be NULL.
720+
*
721+
* @return result of the API. 0:success, -1:fail
722+
*/
723+
724+
int flb_ra_append_kv_pair(struct flb_record_accessor *ra, msgpack_object map,
725+
void **out_map, size_t *out_size,
726+
msgpack_object *in_val)
727+
{
728+
struct flb_ra_parser *rp;
729+
msgpack_packer mp_pck;
730+
msgpack_sbuffer mp_sbuf;
731+
int ret;
732+
733+
msgpack_object *s_key = NULL;
734+
msgpack_object *o_key = NULL;
735+
msgpack_object *o_val = NULL;
736+
737+
if (in_val == NULL) {
738+
/* no key and value. nothing to do */
739+
flb_error("%s: no value", __FUNCTION__);
740+
return -1;
741+
}
742+
else if (ra == NULL || out_map == NULL|| out_size == NULL) {
743+
/* invalid input */
744+
flb_error("%s: invalid input", __FUNCTION__);
745+
return -1;
746+
}
747+
748+
flb_ra_get_kv_pair(ra, map, &s_key, &o_key, &o_val);
749+
if (o_key != NULL && o_val != NULL) {
750+
/* key and value already exist */
751+
flb_error("%s: already exist", __FUNCTION__);
752+
return -1;
753+
}
754+
755+
rp = get_ra_parser(ra);
756+
if (rp == NULL || rp->key == NULL) {
757+
return -1;
758+
}
759+
760+
msgpack_sbuffer_init(&mp_sbuf);
761+
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
762+
763+
ret = flb_ra_key_value_append(rp, map, in_val, &mp_pck);
764+
if (ret < 0) {
765+
msgpack_sbuffer_destroy(&mp_sbuf);
766+
return -1;
767+
}
768+
769+
*out_map = mp_sbuf.data;
770+
*out_size = mp_sbuf.size;
771+
772+
return 0;
773+
}

0 commit comments

Comments
 (0)