3131#include <fluent-bit/flb_plugin.h>
3232#include <fluent-bit/flb_notification.h>
3333#include <fluent-bit/flb_scheduler.h>
34+ #include <fluent-bit/flb_record_accessor.h>
35+ #include <fluent-bit/flb_ra_key.h>
3436
3537#include <msgpack.h>
3638
@@ -53,6 +55,113 @@ struct worker_info {
5355
5456FLB_TLS_DEFINE (struct worker_info , worker_info );
5557
58+ static flb_sds_t cb_azb_msgpack_extract_log_key (void * out_context , const char * data ,
59+ uint64_t bytes )
60+ {
61+ struct flb_azure_blob * ctx = out_context ;
62+ flb_sds_t out_buf = NULL ;
63+ msgpack_unpacked result ;
64+ msgpack_object root ;
65+ msgpack_object map ;
66+ struct flb_record_accessor * ra = NULL ;
67+ struct flb_ra_value * rval = NULL ;
68+ size_t off = 0 ;
69+
70+ ra = flb_ra_create (ctx -> log_key , FLB_FALSE );
71+ if (!ra ) {
72+ flb_errno ();
73+ flb_plg_error (ctx -> ins , "invalid record accessor pattern '%s'" , ctx -> log_key );
74+ return NULL ;
75+ }
76+
77+ /* Unpack the data */
78+ msgpack_unpacked_init (& result );
79+ while (1 ) {
80+ msgpack_unpack_return ret = msgpack_unpack_next (& result , data , bytes , & off );
81+ if (ret == MSGPACK_UNPACK_SUCCESS ) {
82+ root = result .data ;
83+ if (root .type != MSGPACK_OBJECT_ARRAY ) {
84+ continue ;
85+ }
86+
87+ if (root .via .array .size < 2 ) {
88+ flb_plg_debug (ctx -> ins , "msgpack array has insufficient elements" );
89+ continue ;
90+ }
91+
92+ map = root .via .array .ptr [1 ];
93+
94+ /* Get value using record accessor */
95+ rval = flb_ra_get_value_object (ra , map );
96+ if (!rval ) {
97+ flb_plg_error (ctx -> ins , "could not find field '%s'" , ctx -> log_key );
98+ continue ;
99+ }
100+
101+ /* Convert value based on its type */
102+ if (rval -> type == FLB_RA_STRING ) {
103+ out_buf = flb_sds_create_size (rval -> o .via .str .size + 1 );
104+ if (out_buf ) {
105+ flb_sds_copy (out_buf , rval -> o .via .str .ptr , rval -> o .via .str .size );
106+ flb_sds_cat (out_buf , "\n" , 1 );
107+ }
108+ }
109+ else if (rval -> type == FLB_RA_FLOAT ) {
110+ out_buf = flb_sds_create_size (64 );
111+ if (out_buf ) {
112+ flb_sds_printf (& out_buf , "%f\n" , rval -> val .f64 );
113+ }
114+ }
115+ else if (rval -> type == FLB_RA_INT ) {
116+ out_buf = flb_sds_create_size (64 );
117+ if (out_buf ) {
118+ flb_sds_printf (& out_buf , "%" PRId64 "\n" , rval -> val .i64 );
119+ }
120+ }
121+ else {
122+ flb_errno ();
123+ flb_plg_error (ctx -> ins , "cannot convert given value for field '%s'" , ctx -> log_key );
124+ flb_ra_key_value_destroy (rval );
125+ rval = NULL ;
126+ break ;
127+ }
128+
129+ /* Check if buffer allocation succeeded */
130+ if (!out_buf ) {
131+ flb_errno ();
132+ flb_plg_error (ctx -> ins , "could not allocate output buffer" );
133+ }
134+
135+ flb_ra_key_value_destroy (rval );
136+ rval = NULL ;
137+
138+ /* Successfully found and processed log_key, exit loop */
139+ break ;
140+ }
141+ else if (ret == MSGPACK_UNPACK_CONTINUE ) {
142+ /* Buffer exhausted or truncated data, stop processing */
143+ flb_plg_debug (ctx -> ins , "msgpack unpack needs more data or data truncated" );
144+ break ;
145+ }
146+ else if (ret == MSGPACK_UNPACK_PARSE_ERROR ) {
147+ flb_errno ();
148+ flb_plg_error (ctx -> ins , "msgpack parse error" );
149+ break ;
150+ }
151+ else {
152+ flb_errno ();
153+ flb_plg_error (ctx -> ins , "unexpected msgpack unpack return code %d" , ret );
154+ break ;
155+ }
156+ }
157+
158+ /* Clean up */
159+ msgpack_unpacked_destroy (& result );
160+ flb_ra_destroy (ra );
161+
162+ return out_buf ;
163+ }
164+
56165static int azure_blob_format (struct flb_config * config ,
57166 struct flb_input_instance * ins ,
58167 void * plugin_context ,
@@ -65,11 +174,16 @@ static int azure_blob_format(struct flb_config *config,
65174 flb_sds_t out_buf ;
66175 struct flb_azure_blob * ctx = plugin_context ;
67176
68- out_buf = flb_pack_msgpack_to_json_format (data , bytes ,
69- FLB_PACK_JSON_FORMAT_LINES ,
70- FLB_PACK_JSON_DATE_ISO8601 ,
71- ctx -> date_key ,
72- config -> json_escape_unicode );
177+ if (ctx -> log_key ) {
178+ out_buf = cb_azb_msgpack_extract_log_key (ctx , data , bytes );
179+ }
180+ else {
181+ out_buf = flb_pack_msgpack_to_json_format (data , bytes ,
182+ FLB_PACK_JSON_FORMAT_LINES ,
183+ FLB_PACK_JSON_DATE_ISO8601 ,
184+ ctx -> date_key ,
185+ config -> json_escape_unicode );
186+ }
73187 if (!out_buf ) {
74188 return -1 ;
75189 }
@@ -713,7 +827,7 @@ static int ensure_container(struct flb_azure_blob *ctx)
713827 ctx -> container_name );
714828 return FLB_FALSE ;
715829 }
716-
830+
717831 flb_plg_error (ctx -> ins , "get container request failed, status=%i" ,
718832 status );
719833
@@ -1780,6 +1894,14 @@ static struct flb_config_map config_map[] = {
17801894 "Set the block type: appendblob or blockblob"
17811895 },
17821896
1897+ {
1898+ FLB_CONFIG_MAP_STR , "log_key" , NULL ,
1899+ 0 , FLB_TRUE , offsetof(struct flb_azure_blob , log_key ),
1900+ "By default, the whole log record will be sent to blob storage. "
1901+ "If you specify a key name with this option, then only the value of "
1902+ "that key will be sent"
1903+ },
1904+
17831905 {
17841906 FLB_CONFIG_MAP_STR , "compress" , NULL ,
17851907 0 , FLB_FALSE , 0 ,
@@ -1939,7 +2061,7 @@ static struct flb_config_map config_map[] = {
19392061 "Whether to delete the buffered file early after successful blob creation. Default is false"
19402062 },
19412063
1942- {
2064+ {
19432065 FLB_CONFIG_MAP_INT , "blob_uri_length" , "64" ,
19442066 0 , FLB_TRUE , offsetof(struct flb_azure_blob , blob_uri_length ),
19452067 "Set the length of generated blob uri before ingesting to Azure Kusto. Default is 64"
0 commit comments