2020#include <fluent-bit/flb_output_plugin.h>
2121#include <fluent-bit/flb_kv.h>
2222#include <fluent-bit/flb_pack.h>
23+ #include <fluent-bit/flb_mp.h>
2324#include <fluent-bit/flb_log_event_decoder.h>
2425#include <fluent-bit/flb_log_event_encoder.h>
2526
2627#include "vivo.h"
2728#include "vivo_http.h"
2829#include "vivo_stream.h"
2930
30- static flb_sds_t format_logs (struct flb_event_chunk * event_chunk , struct flb_config * config )
31+ static flb_sds_t format_logs (struct flb_input_instance * src_ins ,
32+ struct flb_event_chunk * event_chunk , struct flb_config * config )
3133{
32- struct flb_log_event_decoder log_decoder ;
33- struct flb_log_event log_event ;
34+ int len ;
3435 int result ;
35- int i ;
36+ char * name ;
3637 flb_sds_t out_js ;
3738 flb_sds_t out_buf = NULL ;
3839 msgpack_sbuffer tmp_sbuf ;
3940 msgpack_packer tmp_pck ;
41+ struct flb_log_event log_event ;
42+ struct flb_log_event_decoder log_decoder ;
43+ struct flb_mp_map_header mh ;
4044
4145 result = flb_log_event_decoder_init (& log_decoder ,
4246 (char * ) event_chunk -> data ,
@@ -56,90 +60,118 @@ static flb_sds_t format_logs(struct flb_event_chunk *event_chunk, struct flb_con
5660 msgpack_sbuffer_init (& tmp_sbuf );
5761 msgpack_packer_init (& tmp_pck , & tmp_sbuf , msgpack_sbuffer_write );
5862
63+ /*
64+ * Here is an example of the packaging done for Logs
65+ *
66+ * {
67+ * "source_type": "forward",
68+ * "source_name": "forward.0",
69+ * "tag": "dummy.0",
70+ * "records": [
71+ * {
72+ * "timestamp": 1759591426808913765,
73+ * "metadata": {
74+ * "level": "info"
75+ * },
76+ * "message": "dummy"
77+ * },
78+ * {
79+ * "timestamp": 1759591426908563348,
80+ * "metadata": {
81+ * "level": "debug",
82+ * "service": "auth"
83+ * },
84+ * "message": "dummy"
85+ * }
86+ * ]
87+ * }
88+ */
89+
90+ msgpack_pack_map (& tmp_pck , 4 );
91+
92+ /* source_type: internal type of the plugin */
93+ name = src_ins -> p -> name ;
94+ len = strlen (name );
95+
96+ msgpack_pack_str (& tmp_pck , 11 );
97+ msgpack_pack_str_body (& tmp_pck , "source_type" , 11 );
98+ msgpack_pack_str (& tmp_pck , len );
99+ msgpack_pack_str_body (& tmp_pck , name , len );
100+
101+ /* source_name: internal name or alias set by the user */
102+ name = (char * ) flb_input_name (src_ins );
103+ len = strlen (name );
104+ msgpack_pack_str (& tmp_pck , 11 );
105+ msgpack_pack_str_body (& tmp_pck , "source_name" , 11 );
106+ msgpack_pack_str (& tmp_pck , len );
107+ msgpack_pack_str_body (& tmp_pck , name , len );
108+
109+ /* tag */
110+ msgpack_pack_str (& tmp_pck , 3 );
111+ msgpack_pack_str_body (& tmp_pck , "tag" , 3 );
112+ msgpack_pack_str (& tmp_pck , flb_sds_len (event_chunk -> tag ));
113+ msgpack_pack_str_body (& tmp_pck , event_chunk -> tag , flb_sds_len (event_chunk -> tag ));
114+
115+ /* records */
116+ msgpack_pack_str (& tmp_pck , 7 );
117+ msgpack_pack_str_body (& tmp_pck , "records" , 7 );
118+
119+ flb_mp_array_header_init (& mh , & tmp_pck );
120+
59121 while ((result = flb_log_event_decoder_next (
60122 & log_decoder ,
61123 & log_event )) == FLB_EVENT_DECODER_SUCCESS ) {
124+
125+ flb_mp_array_header_append (& mh );
126+
62127 /*
63- * If the caller specified FLB_PACK_JSON_DATE_FLUENT, we format the data
64- * by using the following structure:
65- *
66- * [[TIMESTAMP, {"_tag": "...", ...MORE_METADATA}], {RECORD CONTENT}]
128+ * [[TIMESTAMP, {"....": "...", ...MORE_METADATA}], {RECORD CONTENT}]
67129 */
68130 msgpack_pack_array (& tmp_pck , 2 );
69131 msgpack_pack_array (& tmp_pck , 2 );
70132 msgpack_pack_uint64 (& tmp_pck , flb_time_to_nanosec (& log_event .timestamp ));
71133
72- /* add tag only */
73- msgpack_pack_map (& tmp_pck , 1 + log_event .metadata -> via .map .size );
74-
75- msgpack_pack_str (& tmp_pck , 4 );
76- msgpack_pack_str_body (& tmp_pck , "_tag" , 4 );
77-
78- msgpack_pack_str (& tmp_pck , flb_sds_len (event_chunk -> tag ));
79- msgpack_pack_str_body (& tmp_pck , event_chunk -> tag , flb_sds_len (event_chunk -> tag ));
80-
81- /* Append remaining keys/values */
82- for (i = 0 ;
83- i < log_event .metadata -> via .map .size ;
84- i ++ ) {
85- msgpack_pack_object (& tmp_pck ,
86- log_event .metadata -> via .map .ptr [i ].key );
87- msgpack_pack_object (& tmp_pck ,
88- log_event .metadata -> via .map .ptr [i ].val );
89- }
134+ /* pack metadata */
135+ msgpack_pack_object (& tmp_pck , * log_event .metadata );
90136
91137 /* pack the remaining content */
92- msgpack_pack_map (& tmp_pck , log_event .body -> via .map .size );
93-
94- /* Append remaining keys/values */
95- for (i = 0 ;
96- i < log_event .body -> via .map .size ;
97- i ++ ) {
98- msgpack_pack_object (& tmp_pck ,
99- log_event .body -> via .map .ptr [i ].key );
100- msgpack_pack_object (& tmp_pck ,
101- log_event .body -> via .map .ptr [i ].val );
102- }
103-
104- /* Concatenate by using break lines */
105- out_js = flb_msgpack_raw_to_json_sds (tmp_sbuf .data , tmp_sbuf .size ,
106- config -> json_escape_unicode );
107- if (!out_js ) {
108- flb_sds_destroy (out_buf );
109- msgpack_sbuffer_destroy (& tmp_sbuf );
110- flb_log_event_decoder_destroy (& log_decoder );
111- return NULL ;
112- }
113-
114- /*
115- * One map record has been converted, now append it to the
116- * outgoing out_buf sds variable.
117- */
118- flb_sds_cat_safe (& out_buf , out_js , flb_sds_len (out_js ));
119- flb_sds_cat_safe (& out_buf , "\n" , 1 );
120-
121- flb_sds_destroy (out_js );
122- msgpack_sbuffer_clear (& tmp_sbuf );
138+ msgpack_pack_object (& tmp_pck , * log_event .body );
123139 }
124140
141+ flb_mp_array_header_end (& mh );
142+
125143 /* Release the unpacker */
126144 flb_log_event_decoder_destroy (& log_decoder );
127145
146+ /* Convert the complete msgpack structure to JSON */
147+ out_js = flb_msgpack_raw_to_json_sds (tmp_sbuf .data , tmp_sbuf .size ,
148+ config -> json_escape_unicode );
149+
128150 msgpack_sbuffer_destroy (& tmp_sbuf );
129151
130- return out_buf ;
152+ /* append a newline */
153+ flb_sds_cat_safe (& out_js , "\n" , 1 );
154+
155+ if (!out_js ) {
156+ flb_sds_destroy (out_buf );
157+ return NULL ;
158+ }
159+
160+ /* Replace out_buf with the complete JSON */
161+ flb_sds_destroy (out_buf );
162+ return out_js ;
131163}
132164
133165static int logs_event_chunk_append (struct vivo_exporter * ctx ,
166+ struct flb_input_instance * src_ins ,
134167 struct flb_event_chunk * event_chunk ,
135168 struct flb_config * config )
136169{
137170 size_t len ;
138171 flb_sds_t json ;
139172 struct vivo_stream_entry * entry ;
140173
141-
142- json = format_logs (event_chunk , config );
174+ json = format_logs (src_ins , event_chunk , config );
143175 if (!json ) {
144176 flb_plg_error (ctx -> ins , "cannot convert logs chunk to JSON" );
145177 return -1 ;
@@ -207,6 +239,7 @@ static int cb_vivo_init(struct flb_output_instance *ins,
207239 return -1 ;
208240 }
209241 ctx -> ins = ins ;
242+ ctx -> config = config ;
210243
211244 ret = flb_output_config_map_set (ins , (void * ) ctx );
212245 if (ret == -1 ) {
@@ -272,7 +305,7 @@ static void cb_vivo_flush(struct flb_event_chunk *event_chunk,
272305 }
273306#endif
274307 if (event_chunk -> type == FLB_EVENT_TYPE_LOGS ) {
275- ret = logs_event_chunk_append (ctx , event_chunk , config );
308+ ret = logs_event_chunk_append (ctx , ins , event_chunk , config );
276309 }
277310 else if (event_chunk -> type == FLB_EVENT_TYPE_TRACES ) {
278311 ret = metrics_traces_event_chunk_append (ctx , ctx -> stream_traces , event_chunk , config );
0 commit comments