3131#include <fluent-bit/flb_plugin.h>
3232#include <fluent-bit/flb_notification.h>
3333#include <fluent-bit/flb_scheduler.h>
34+ #include <fluent-bit/flb_record_accessor.h>
35+ #include <fluent-bit/flb_ra_key.h>
3436
3537#include <msgpack.h>
3638
@@ -53,6 +55,113 @@ struct worker_info {
5355
5456FLB_TLS_DEFINE (struct worker_info , worker_info );
5557
58+ static flb_sds_t cb_azb_msgpack_extract_log_key (void * out_context , const char * data ,
59+ uint64_t bytes )
60+ {
61+ struct flb_azure_blob * ctx = out_context ;
62+ flb_sds_t out_buf = NULL ;
63+ msgpack_unpacked result ;
64+ msgpack_object root ;
65+ msgpack_object map ;
66+ struct flb_record_accessor * ra = NULL ;
67+ struct flb_ra_value * rval = NULL ;
68+ size_t off = 0 ;
69+
70+ ra = flb_ra_create (ctx -> log_key , FLB_FALSE );
71+ if (!ra ) {
72+ flb_plg_error (ctx -> ins , "invalid record accessor pattern '%s'" , ctx -> log_key );
73+ flb_errno ();
74+ return NULL ;
75+ }
76+
77+ /* Unpack the data */
78+ msgpack_unpacked_init (& result );
79+ while (1 ) {
80+ msgpack_unpack_return ret = msgpack_unpack_next (& result , data , bytes , & off );
81+ if (ret == MSGPACK_UNPACK_SUCCESS ) {
82+ root = result .data ;
83+ if (root .type != MSGPACK_OBJECT_ARRAY ) {
84+ continue ;
85+ }
86+
87+ if (root .via .array .size < 2 ) {
88+ flb_plg_debug (ctx -> ins , "msgpack array has insufficient elements" );
89+ continue ;
90+ }
91+
92+ map = root .via .array .ptr [1 ];
93+
94+ /* Get value using record accessor */
95+ rval = flb_ra_get_value_object (ra , map );
96+ if (!rval ) {
97+ flb_plg_error (ctx -> ins , "could not find field '%s'" , ctx -> log_key );
98+ continue ;
99+ }
100+
101+ /* Convert value based on its type */
102+ if (rval -> type == FLB_RA_STRING ) {
103+ out_buf = flb_sds_create_size (rval -> o .via .str .size + 1 );
104+ if (out_buf ) {
105+ flb_sds_copy (out_buf , rval -> o .via .str .ptr , rval -> o .via .str .size );
106+ flb_sds_cat (out_buf , "\n" , 1 );
107+ }
108+ }
109+ else if (rval -> type == FLB_RA_FLOAT ) {
110+ out_buf = flb_sds_create_size (64 );
111+ if (out_buf ) {
112+ flb_sds_printf (& out_buf , "%f\n" , rval -> val .f64 );
113+ }
114+ }
115+ else if (rval -> type == FLB_RA_INT ) {
116+ out_buf = flb_sds_create_size (64 );
117+ if (out_buf ) {
118+ flb_sds_printf (& out_buf , "%" PRId64 "\n" , rval -> val .i64 );
119+ }
120+ }
121+ else {
122+ flb_errno ();
123+ flb_plg_error (ctx -> ins , "cannot convert given value for field '%s'" , ctx -> log_key );
124+ flb_ra_key_value_destroy (rval );
125+ rval = NULL ;
126+ break ;
127+ }
128+
129+ /* Check if buffer allocation succeeded */
130+ if (!out_buf ) {
131+ flb_errno ();
132+ flb_plg_error (ctx -> ins , "could not allocate output buffer" );
133+ }
134+
135+ flb_ra_key_value_destroy (rval );
136+ rval = NULL ;
137+
138+ /* Successfully found and processed log_key, exit loop */
139+ break ;
140+ }
141+ else if (ret == MSGPACK_UNPACK_CONTINUE ) {
142+ /* Buffer exhausted or truncated data, stop processing */
143+ flb_plg_debug (ctx -> ins , "msgpack unpack needs more data or data truncated" );
144+ break ;
145+ }
146+ else if (ret == MSGPACK_UNPACK_PARSE_ERROR ) {
147+ flb_errno ();
148+ flb_plg_error (ctx -> ins , "msgpack parse error" );
149+ break ;
150+ }
151+ else {
152+ flb_errno ();
153+ flb_plg_error (ctx -> ins , "unexpected msgpack unpack return code %d" , ret );
154+ break ;
155+ }
156+ }
157+
158+ /* Clean up */
159+ msgpack_unpacked_destroy (& result );
160+ flb_ra_destroy (ra );
161+
162+ return out_buf ;
163+ }
164+
56165static int azure_blob_format (struct flb_config * config ,
57166 struct flb_input_instance * ins ,
58167 void * plugin_context ,
@@ -65,10 +174,15 @@ static int azure_blob_format(struct flb_config *config,
65174 flb_sds_t out_buf ;
66175 struct flb_azure_blob * ctx = plugin_context ;
67176
68- out_buf = flb_pack_msgpack_to_json_format (data , bytes ,
69- FLB_PACK_JSON_FORMAT_LINES ,
70- FLB_PACK_JSON_DATE_ISO8601 ,
71- ctx -> date_key );
177+ if (ctx -> log_key ) {
178+ out_buf = cb_azb_msgpack_extract_log_key (ctx , data , bytes );
179+ }
180+ else {
181+ out_buf = flb_pack_msgpack_to_json_format (data , bytes ,
182+ FLB_PACK_JSON_FORMAT_LINES ,
183+ FLB_PACK_JSON_DATE_ISO8601 ,
184+ ctx -> date_key );
185+ }
72186 if (!out_buf ) {
73187 return -1 ;
74188 }
@@ -712,7 +826,7 @@ static int ensure_container(struct flb_azure_blob *ctx)
712826 ctx -> container_name );
713827 return FLB_FALSE ;
714828 }
715-
829+
716830 flb_plg_error (ctx -> ins , "get container request failed, status=%i" ,
717831 status );
718832
@@ -1779,6 +1893,14 @@ static struct flb_config_map config_map[] = {
17791893 "Set the block type: appendblob or blockblob"
17801894 },
17811895
1896+ {
1897+ FLB_CONFIG_MAP_STR , "log_key" , NULL ,
1898+ 0 , FLB_TRUE , offsetof(struct flb_azure_blob , log_key ),
1899+ "By default, the whole log record will be sent to blob storage. "
1900+ "If you specify a key name with this option, then only the value of "
1901+ "that key will be sent"
1902+ },
1903+
17821904 {
17831905 FLB_CONFIG_MAP_STR , "compress" , NULL ,
17841906 0 , FLB_FALSE , 0 ,
@@ -1938,7 +2060,7 @@ static struct flb_config_map config_map[] = {
19382060 "Whether to delete the buffered file early after successful blob creation. Default is false"
19392061 },
19402062
1941- {
2063+ {
19422064 FLB_CONFIG_MAP_INT , "blob_uri_length" , "64" ,
19432065 0 , FLB_TRUE , offsetof(struct flb_azure_blob , blob_uri_length ),
19442066 "Set the length of generated blob uri before ingesting to Azure Kusto. Default is 64"
0 commit comments