@@ -298,6 +298,13 @@ static void flb_loki_kv_exit(struct flb_loki *ctx)
298298 mk_list_foreach_safe (head , tmp , & ctx -> structured_metadata_list ) {
299299 kv = mk_list_entry (head , struct flb_loki_kv , _head );
300300
301+ /* unlink and destroy */
302+ mk_list_del (& kv -> _head );
303+ flb_loki_kv_destroy (kv );
304+ }
305+ mk_list_foreach_safe (head , tmp , & ctx -> structured_metadata_map_keys_list ) {
306+ kv = mk_list_entry (head , struct flb_loki_kv , _head );
307+
301308 /* unlink and destroy */
302309 mk_list_del (& kv -> _head );
303310 flb_loki_kv_destroy (kv );
@@ -416,6 +423,93 @@ static void pack_kv(struct flb_loki *ctx,
416423 }
417424}
418425
426+ /*
427+ * Similar to pack_kv above, except will only use msgpack_objects of type
428+ * MSGPACK_OBJECT_MAP, and will iterate over the keys adding each entry as a
429+ * separate item. Non-string map values are serialised to JSON, as Loki requires
430+ * all values to be strings.
431+ */
432+ static void pack_maps (struct flb_loki * ctx ,
433+ msgpack_packer * mp_pck ,
434+ char * tag , int tag_len ,
435+ msgpack_object * map ,
436+ struct flb_mp_map_header * mh ,
437+ struct mk_list * list )
438+ {
439+ struct mk_list * head ;
440+ struct flb_loki_kv * kv ;
441+
442+ msgpack_object * start_key ;
443+ msgpack_object * out_key ;
444+ msgpack_object * out_val ;
445+
446+ msgpack_object_map accessed_map ;
447+ uint32_t accessed_map_index ;
448+ msgpack_object_kv accessed_map_kv ;
449+
450+ char * accessed_map_val_json ;
451+
452+ mk_list_foreach (head , list ) {
453+ /* get the flb_loki_kv for this iteration of the loop */
454+ kv = mk_list_entry (head , struct flb_loki_kv , _head );
455+
456+ /* record accessor key/value pair */
457+ if (kv -> ra_key != NULL && kv -> ra_val == NULL ) {
458+
459+ /* try to get the value for the record accessor */
460+ if (flb_ra_get_kv_pair (kv -> ra_key , * map , & start_key , & out_key , & out_val )
461+ != -1 ) {
462+
463+ /*
464+ * we require the value to be a map, or it doesn't make sense as
465+ * this is adding a map's key / values
466+ */
467+ if (out_val -> type != MSGPACK_OBJECT_MAP || out_val -> via .map .size <= 0 ) {
468+ flb_plg_debug (ctx -> ins , "No valid map data found for key %s" ,
469+ kv -> ra_key -> pattern );
470+ }
471+ else {
472+ accessed_map = out_val -> via .map ;
473+
474+ /* for each entry in the accessed map... */
475+ for (accessed_map_index = 0 ; accessed_map_index < accessed_map .size ;
476+ accessed_map_index ++ ) {
477+
478+ /* get the entry */
479+ accessed_map_kv = accessed_map .ptr [accessed_map_index ];
480+
481+ /* Pack the key and value */
482+ flb_mp_map_header_append (mh );
483+
484+ pack_label_key (mp_pck , (char * ) accessed_map_kv .key .via .str .ptr ,
485+ accessed_map_kv .key .via .str .size );
486+
487+ /* If the value is a string, just pack it... */
488+ if (accessed_map_kv .val .type == MSGPACK_OBJECT_STR ) {
489+ msgpack_pack_str_with_body (mp_pck ,
490+ accessed_map_kv .val .via .str .ptr ,
491+ accessed_map_kv .val .via .str .size );
492+ }
493+ /*
494+ * ...otherwise convert value to JSON string, as Loki always
495+ * requires a string value
496+ */
497+ else {
498+ accessed_map_val_json = flb_msgpack_to_json_str (1024 ,
499+ & accessed_map_kv .val );
500+ if (accessed_map_val_json ) {
501+ msgpack_pack_str_with_body (mp_pck , accessed_map_val_json ,
502+ strlen (accessed_map_val_json ));
503+ flb_free (accessed_map_val_json );
504+ }
505+ }
506+ }
507+ }
508+ }
509+ }
510+ }
511+ }
512+
419513static flb_sds_t pack_structured_metadata (struct flb_loki * ctx ,
420514 msgpack_packer * mp_pck ,
421515 char * tag , int tag_len ,
@@ -424,7 +518,17 @@ static flb_sds_t pack_structured_metadata(struct flb_loki *ctx,
424518 struct flb_mp_map_header mh ;
425519 /* Initialize dynamic map header */
426520 flb_mp_map_header_init (& mh , mp_pck );
427- pack_kv (ctx , mp_pck , tag , tag_len , map , & mh , & ctx -> structured_metadata_list );
521+ if (ctx -> structured_metadata_map_keys ) {
522+ pack_maps (ctx , mp_pck , tag , tag_len , map , & mh ,
523+ & ctx -> structured_metadata_map_keys_list );
524+ }
525+ /*
526+ * explicit structured_metadata entries override
527+ * structured_metadata_map_keys entries
528+ * */
529+ if (ctx -> structured_metadata ) {
530+ pack_kv (ctx , mp_pck , tag , tag_len , map , & mh , & ctx -> structured_metadata_list );
531+ }
428532 flb_mp_map_header_end (& mh );
429533 return 0 ;
430534}
@@ -788,6 +892,7 @@ static int parse_labels(struct flb_loki *ctx)
788892
789893 flb_loki_kv_init (& ctx -> labels_list );
790894 flb_loki_kv_init (& ctx -> structured_metadata_list );
895+ flb_loki_kv_init (& ctx -> structured_metadata_map_keys_list );
791896
792897 if (ctx -> structured_metadata ) {
793898 ret = parse_kv (ctx , ctx -> structured_metadata , & ctx -> structured_metadata_list , & ra_used );
@@ -796,6 +901,28 @@ static int parse_labels(struct flb_loki *ctx)
796901 }
797902 }
798903
904+ /* Append structured metadata map keys set in the configuration */
905+ if (ctx -> structured_metadata_map_keys ) {
906+ mk_list_foreach (head , ctx -> structured_metadata_map_keys ) {
907+ entry = mk_list_entry (head , struct flb_slist_entry , _head );
908+ if (entry -> str [0 ] != '$' ) {
909+ flb_plg_error (ctx -> ins ,
910+ "invalid structured metadata map key, the name must start "
911+ "with '$'" );
912+ return -1 ;
913+ }
914+
915+ ret = flb_loki_kv_append (ctx , & ctx -> structured_metadata_map_keys_list ,
916+ entry -> str , NULL );
917+ if (ret == -1 ) {
918+ return -1 ;
919+ }
920+ else if (ret > 0 ) {
921+ ra_used ++ ;
922+ }
923+ }
924+ }
925+
799926 if (ctx -> labels ) {
800927 ret = parse_kv (ctx , ctx -> labels , & ctx -> labels_list , & ra_used );
801928 if (ret == -1 ) {
@@ -971,6 +1098,7 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins,
9711098 ctx -> ins = ins ;
9721099 flb_loki_kv_init (& ctx -> labels_list );
9731100 flb_loki_kv_init (& ctx -> structured_metadata_list );
1101+ flb_loki_kv_init (& ctx -> structured_metadata_map_keys_list );
9741102
9751103 /* Register context with plugin instance */
9761104 flb_output_set_context (ins , ctx );
@@ -1539,12 +1667,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
15391667 while ((ret = flb_log_event_decoder_next (
15401668 & log_decoder ,
15411669 & log_event )) == FLB_EVENT_DECODER_SUCCESS ) {
1542- msgpack_pack_array (& mp_pck , ctx -> structured_metadata ? 3 : 2 );
1670+ msgpack_pack_array (& mp_pck , ctx -> structured_metadata ||
1671+ ctx -> structured_metadata_map_keys ? 3 : 2 );
15431672
15441673 /* Append the timestamp */
15451674 pack_timestamp (& mp_pck , & log_event .timestamp );
15461675 pack_record (ctx , & mp_pck , log_event .body , dynamic_tenant_id );
1547- if (ctx -> structured_metadata ) {
1676+ if (ctx -> structured_metadata || ctx -> structured_metadata_map_keys ) {
15481677 pack_structured_metadata (ctx , & mp_pck , tag , tag_len , NULL );
15491678 }
15501679 }
@@ -1575,12 +1704,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
15751704 msgpack_pack_str_body (& mp_pck , "values" , 6 );
15761705 msgpack_pack_array (& mp_pck , 1 );
15771706
1578- msgpack_pack_array (& mp_pck , ctx -> structured_metadata ? 3 : 2 );
1707+ msgpack_pack_array (& mp_pck , ctx -> structured_metadata ||
1708+ ctx -> structured_metadata_map_keys ? 3 : 2 );
15791709
15801710 /* Append the timestamp */
15811711 pack_timestamp (& mp_pck , & log_event .timestamp );
15821712 pack_record (ctx , & mp_pck , log_event .body , dynamic_tenant_id );
1583- if (ctx -> structured_metadata ) {
1713+ if (ctx -> structured_metadata || ctx -> structured_metadata_map_keys ) {
15841714 pack_structured_metadata (ctx , & mp_pck , tag , tag_len , log_event .body );
15851715 }
15861716 }
@@ -1905,6 +2035,13 @@ static struct flb_config_map config_map[] = {
19052035 0 , FLB_TRUE , offsetof(struct flb_loki , structured_metadata ),
19062036 "optional structured metadata fields for API requests."
19072037 },
2038+
2039+ {
2040+ FLB_CONFIG_MAP_CLIST , "structured_metadata_map_keys" , NULL ,
2041+ 0 , FLB_TRUE , offsetof(struct flb_loki , structured_metadata_map_keys ),
2042+ "optional structured metadata fields, as derived dynamically from configured maps "
2043+ "keys, for API requests."
2044+ },
19082045
19092046 {
19102047 FLB_CONFIG_MAP_BOOL , "auto_kubernetes_labels" , "false" ,
0 commit comments