30
30
#include <fluent-bit/flb_log_event_decoder.h>
31
31
#include <fluent-bit/flb_plugin.h>
32
32
#include <fluent-bit/flb_notification.h>
33
+ #include <fluent-bit/flb_record_accessor.h>
34
+ #include <fluent-bit/flb_ra_key.h>
33
35
34
36
#include <msgpack.h>
35
37
@@ -54,153 +56,100 @@ FLB_TLS_DEFINE(struct worker_info, worker_info);
54
56
static flb_sds_t cb_azb_msgpack_extract_log_key (void * out_context , const char * data ,
55
57
uint64_t bytes )
56
58
{
57
- int i ;
58
- int records = 0 ;
59
- int map_size ;
60
- int check = FLB_FALSE ;
61
- int found = FLB_FALSE ;
62
- int log_key_missing = 0 ;
63
- int ret ;
64
- int alloc_error = 0 ;
65
59
struct flb_azure_blob * ctx = out_context ;
66
- char * val_buf ;
67
- char * key_str = NULL ;
68
- size_t key_str_size = 0 ;
69
- size_t msgpack_size = bytes + bytes / 4 ;
70
- size_t val_offset = 0 ;
71
- flb_sds_t out_buf ;
60
+ flb_sds_t out_buf = NULL ;
61
+ msgpack_unpacked result ;
62
+ msgpack_object root ;
72
63
msgpack_object map ;
73
- msgpack_object key ;
74
- msgpack_object val ;
75
- struct flb_log_event_decoder log_decoder ;
76
- struct flb_log_event log_event ;
64
+ struct flb_record_accessor * ra = NULL ;
65
+ struct flb_ra_value * rval = NULL ;
66
+ size_t off = 0 ;
77
67
78
- /* Iterate the original buffer and perform adjustments */
79
- records = flb_mp_count (data , bytes );
80
- if (records <= 0 ) {
81
- return NULL ;
82
- }
83
-
84
- /* Allocate buffer to store log_key contents */
85
- val_buf = flb_calloc (1 , msgpack_size );
86
- if (val_buf == NULL ) {
87
- flb_plg_error (ctx -> ins , "Could not allocate enough "
88
- "memory to read record" );
68
+ ra = flb_ra_create (ctx -> log_key , FLB_FALSE );
69
+ if (!ra ) {
70
+ flb_plg_error (ctx -> ins , "invalid record accessor pattern '%s'" , ctx -> log_key );
89
71
flb_errno ();
90
72
return NULL ;
91
73
}
92
74
93
- ret = flb_log_event_decoder_init (& log_decoder , (char * ) data , bytes );
94
-
95
- if (ret != FLB_EVENT_DECODER_SUCCESS ) {
96
- flb_plg_error (ctx -> ins ,
97
- "Log event decoder initialization error : %d" , ret );
98
-
99
- flb_free (val_buf );
100
-
101
- return NULL ;
102
- }
103
-
104
-
105
- while (!alloc_error &&
106
- (ret = flb_log_event_decoder_next (
107
- & log_decoder ,
108
- & log_event )) == FLB_EVENT_DECODER_SUCCESS ) {
109
-
110
- /* Get the record/map */
111
- map = * log_event .body ;
112
-
113
- if (map .type != MSGPACK_OBJECT_MAP ) {
114
- continue ;
115
- }
75
+ /* Unpack the data */
76
+ msgpack_unpacked_init (& result );
77
+ while (1 ) {
78
+ msgpack_unpack_return ret = msgpack_unpack_next (& result , data , bytes , & off );
79
+ if (ret == MSGPACK_UNPACK_SUCCESS ) {
80
+ root = result .data ;
81
+ if (root .type != MSGPACK_OBJECT_ARRAY ) {
82
+ continue ;
83
+ }
116
84
117
- map_size = map .via .map .size ;
85
+ if (root .via .array .size < 2 ) {
86
+ flb_plg_debug (ctx -> ins , "msgpack array has insufficient elements" );
87
+ continue ;
88
+ }
118
89
119
- /* Reset variables for found log_key and correct type */
120
- found = FLB_FALSE ;
121
- check = FLB_FALSE ;
90
+ map = root .via .array .ptr [1 ];
122
91
123
- /* Extract log_key from record and append to output buffer */
124
- for (i = 0 ; i < map_size ; i ++ ) {
125
- key = map .via .map .ptr [i ].key ;
126
- val = map .via .map .ptr [i ].val ;
92
+ /* Get value using record accessor */
93
+ rval = flb_ra_get_value_object (ra , map );
94
+ if (!rval ) {
95
+ flb_plg_error (ctx -> ins , "could not find field '%s'" , ctx -> log_key );
96
+ continue ;
97
+ }
127
98
128
- if (key .type == MSGPACK_OBJECT_BIN ) {
129
- key_str = (char * ) key .via .bin .ptr ;
130
- key_str_size = key .via .bin .size ;
131
- check = FLB_TRUE ;
99
+ /* Convert value based on its type */
100
+ if (rval -> type == FLB_RA_STRING ) {
101
+ out_buf = flb_sds_create_len (rval -> o .via .str .ptr , rval -> o .via .str .size );
132
102
}
133
- if (key .type == MSGPACK_OBJECT_STR ) {
134
- key_str = (char * ) key .via .str .ptr ;
135
- key_str_size = key .via .str .size ;
136
- check = FLB_TRUE ;
103
+ else if (rval -> type == FLB_RA_FLOAT ) {
104
+ out_buf = flb_sds_create_size (64 );
105
+ if (out_buf ) {
106
+ flb_sds_printf (& out_buf , "%f" , rval -> val .f64 );
107
+ }
137
108
}
138
-
139
- if (check == FLB_TRUE ) {
140
- if (strncmp (ctx -> log_key , key_str , key_str_size ) == 0 ) {
141
- found = FLB_TRUE ;
142
-
143
- /*
144
- * Copy contents of value into buffer. Necessary to copy
145
- * strings because flb_msgpack_to_json does not handle nested
146
- * JSON gracefully and double escapes them.
147
- */
148
- if (val .type == MSGPACK_OBJECT_BIN ) {
149
- memcpy (val_buf + val_offset , val .via .bin .ptr , val .via .bin .size );
150
- val_offset += val .via .bin .size ;
151
- val_buf [val_offset ] = '\n' ;
152
- val_offset ++ ;
153
- }
154
- else if (val .type == MSGPACK_OBJECT_STR ) {
155
- memcpy (val_buf + val_offset , val .via .str .ptr , val .via .str .size );
156
- val_offset += val .via .str .size ;
157
- val_buf [val_offset ] = '\n' ;
158
- val_offset ++ ;
159
- }
160
- else {
161
- ret = flb_msgpack_to_json (val_buf + val_offset ,
162
- msgpack_size - val_offset , & val );
163
- if (ret < 0 ) {
164
- break ;
165
- }
166
- val_offset += ret ;
167
- val_buf [val_offset ] = '\n' ;
168
- val_offset ++ ;
169
- }
170
- /* Exit early once log_key has been found for current record */
171
- break ;
109
+ else if (rval -> type == FLB_RA_INT ) {
110
+ out_buf = flb_sds_create_size (64 );
111
+ if (out_buf ) {
112
+ flb_sds_printf (& out_buf , "%" PRId64 , rval -> val .i64 );
172
113
}
173
114
}
174
- }
175
-
176
- /* If log_key was not found in the current record, mark log key as missing */
177
- if ( found == FLB_FALSE ) {
178
- log_key_missing ++ ;
179
- }
180
- }
115
+ else {
116
+ flb_errno ();
117
+ flb_plg_error ( ctx -> ins , "cannot convert given value for field '%s'" , ctx -> log_key );
118
+ flb_ra_key_value_destroy ( rval );
119
+ rval = NULL ;
120
+ break ;
121
+ }
181
122
182
- /* Throw error once per chunk if at least one log key was not found */
183
- if ( log_key_missing > 0 ) {
184
- flb_plg_error ( ctx -> ins , "Could not find log_key '%s' in %d records" ,
185
- ctx -> log_key , log_key_missing );
186
- }
123
+ /* Check if buffer allocation succeeded */
124
+ if (! out_buf ) {
125
+ flb_errno ();
126
+ flb_plg_error ( ctx -> ins , "could not allocate output buffer" );
127
+ }
187
128
188
- flb_log_event_decoder_destroy (& log_decoder );
129
+ flb_ra_key_value_destroy (rval );
130
+ rval = NULL ;
189
131
190
- /* If nothing was read, destroy buffer */
191
- if (val_offset == 0 ) {
192
- flb_free (val_buf );
193
- return NULL ;
132
+ break ;
133
+ }
134
+ else if (ret == MSGPACK_UNPACK_CONTINUE ) {
135
+ /* Continue unpacking */
136
+ continue ;
137
+ }
138
+ else if (ret == MSGPACK_UNPACK_PARSE_ERROR ) {
139
+ flb_errno ();
140
+ flb_plg_error (ctx -> ins , "msgpack parse error" );
141
+ break ;
142
+ }
143
+ else {
144
+ flb_errno ();
145
+ flb_plg_error (ctx -> ins , "unexpected msgpack unpack return code" );
146
+ break ;
147
+ }
194
148
}
195
- val_buf [val_offset ] = '\0' ;
196
149
197
- /* Create output buffer to store contents */
198
- out_buf = flb_sds_create (val_buf );
199
- if (out_buf == NULL ) {
200
- flb_plg_error (ctx -> ins , "Error creating buffer to store log_key contents." );
201
- flb_errno ();
202
- }
203
- flb_free (val_buf );
150
+ /* Clean up */
151
+ msgpack_unpacked_destroy (& result );
152
+ flb_ra_destroy (ra );
204
153
205
154
return out_buf ;
206
155
}
0 commit comments