2525#include <fluent-bit/flb_oauth2.h>
2626#include <fluent-bit/flb_regex.h>
2727#include <fluent-bit/flb_pthread.h>
28+ #include <fluent-bit/flb_kv.h>
2829
2930#include <msgpack.h>
3031
@@ -911,12 +912,12 @@ static int process_local_resource_id(struct flb_stackdriver *ctx,
911912}
912913
913914/*
914- * parse_labels
915+ * get_payload_labels
915916 * - Iterate throught the original payload (obj) and find out the entry that matches
916917 * the labels_key
917918 * - Used to convert all labels under labels_key to root-level `labels` field
918919 */
919- static msgpack_object * parse_labels (struct flb_stackdriver * ctx , msgpack_object * obj )
920+ static msgpack_object * get_payload_labels (struct flb_stackdriver * ctx , msgpack_object * obj )
920921{
921922 int i ;
922923 int len ;
@@ -940,6 +941,50 @@ static msgpack_object *parse_labels(struct flb_stackdriver *ctx, msgpack_object
940941 return NULL ;
941942}
942943
944+ static void pack_labels (struct flb_stackdriver * ctx ,
945+ msgpack_packer * mp_pck ,
946+ msgpack_object * payload_labels_ptr )
947+ {
948+ int i ;
949+ int ret ;
950+ int labels_size = 0 ;
951+ char * val ;
952+ struct mk_list * head ;
953+ struct flb_kv * list_kv ;
954+ msgpack_object_kv * obj_kv = NULL ;
955+
956+ /* Determine size of labels map */
957+ labels_size = mk_list_size (& ctx -> config_labels );
958+ if (payload_labels_ptr != NULL &&
959+ payload_labels_ptr -> type == MSGPACK_OBJECT_MAP ) {
960+ labels_size += payload_labels_ptr -> via .map .size ;
961+ }
962+
963+ msgpack_pack_map (mp_pck , labels_size );
964+
965+ /* pack labels from the payload */
966+ if (payload_labels_ptr != NULL &&
967+ payload_labels_ptr -> type == MSGPACK_OBJECT_MAP ) {
968+
969+ for (i = 0 ; i < payload_labels_ptr -> via .map .size ; i ++ ) {
970+ obj_kv = & payload_labels_ptr -> via .map .ptr [i ];
971+ msgpack_pack_object (mp_pck , obj_kv -> key );
972+ msgpack_pack_object (mp_pck , obj_kv -> val );
973+ }
974+ }
975+
976+ /* pack labels set in configuration */
977+ /* in msgpack duplicate keys are overriden by the last set */
978+ /* static label keys override payload labels */
979+ mk_list_foreach (head , & ctx -> config_labels ){
980+ list_kv = mk_list_entry (head , struct flb_kv , _head );
981+ msgpack_pack_str (mp_pck , flb_sds_len (list_kv -> key ));
982+ msgpack_pack_str_body (mp_pck , list_kv -> key , flb_sds_len (list_kv -> key ));
983+ msgpack_pack_str (mp_pck , flb_sds_len (list_kv -> val ));
984+ msgpack_pack_str_body (mp_pck , list_kv -> val , flb_sds_len (list_kv -> val ));
985+ }
986+ }
987+
943988static void cb_results (const char * name , const char * value ,
944989 size_t vlen , void * data )
945990{
@@ -1496,6 +1541,10 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
14961541 /* Count number of records */
14971542 array_size = total_records ;
14981543
1544+ /* Parameters for labels */
1545+ msgpack_object * payload_labels_ptr ;
1546+ int labels_size = 0 ;
1547+
14991548 /*
15001549 * Search each entry and validate insertId.
15011550 * Reject the entry if insertId is invalid.
@@ -1975,17 +2024,26 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
19752024 entry_size += 1 ;
19762025 }
19772026
1978- /* Extract labels */
1979- labels_ptr = parse_labels (ctx , obj );
1980- if (labels_ptr != NULL ) {
1981- if (labels_ptr -> type != MSGPACK_OBJECT_MAP ) {
1982- flb_plg_error (ctx -> ins , "the type of labels should be map" );
1983- flb_sds_destroy (operation_id );
1984- flb_sds_destroy (operation_producer );
1985- msgpack_unpacked_destroy (& result );
1986- msgpack_sbuffer_destroy (& mp_sbuf );
1987- return NULL ;
1988- }
2027+ /* Extract payload labels */
2028+ payload_labels_ptr = get_payload_labels (ctx , obj );
2029+ if (payload_labels_ptr != NULL &&
2030+ payload_labels_ptr -> type != MSGPACK_OBJECT_MAP ) {
2031+ flb_plg_error (ctx -> ins , "the type of payload labels should be map" );
2032+ flb_sds_destroy (operation_id );
2033+ flb_sds_destroy (operation_producer );
2034+ msgpack_unpacked_destroy (& result );
2035+ msgpack_sbuffer_destroy (& mp_sbuf );
2036+ return -1 ;
2037+ }
2038+
2039+ /* Number of parsed labels */
2040+ labels_size = mk_list_size (& ctx -> config_labels );
2041+ if (payload_labels_ptr != NULL &&
2042+ payload_labels_ptr -> type == MSGPACK_OBJECT_MAP ) {
2043+ labels_size += payload_labels_ptr -> via .map .size ;
2044+ }
2045+
2046+ if (labels_size > 0 ) {
19892047 entry_size += 1 ;
19902048 }
19912049
@@ -2043,10 +2101,10 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
20432101 }
20442102
20452103 /* labels */
2046- if (labels_ptr != NULL ) {
2104+ if (labels_size > 0 ) {
20472105 msgpack_pack_str (& mp_pck , 6 );
20482106 msgpack_pack_str_body (& mp_pck , "labels" , 6 );
2049- msgpack_pack_object ( & mp_pck , * labels_ptr );
2107+ pack_labels ( ctx , & mp_pck , payload_labels_ptr );
20502108 }
20512109
20522110 /* Clean up id and producer if operation extracted */
@@ -2453,6 +2511,11 @@ static struct flb_config_map config_map[] = {
24532511 0 , FLB_TRUE , offsetof(struct flb_stackdriver , task_id ),
24542512 "Set the resource task id"
24552513 },
2514+ {
2515+ FLB_CONFIG_MAP_CLIST , "labels" , NULL ,
2516+ 0 , FLB_TRUE , offsetof(struct flb_stackdriver , labels ),
2517+ "Set the labels"
2518+ },
24562519 {
24572520 FLB_CONFIG_MAP_STR , "labels_key" , DEFAULT_LABELS_KEY ,
24582521 0 , FLB_TRUE , offsetof(struct flb_stackdriver , labels_key ),
0 commit comments