@@ -100,18 +100,22 @@ static flb_sds_t cb_azb_msgpack_extract_log_key(void *out_context, const char *d
100100
101101 /* Convert value based on its type */
102102 if (rval -> type == FLB_RA_STRING ) {
103- out_buf = flb_sds_create_len (rval -> o .via .str .ptr , rval -> o .via .str .size );
103+ out_buf = flb_sds_create_size (rval -> o .via .str .size + 1 );
104+ if (out_buf ) {
105+ flb_sds_copy (out_buf , rval -> o .via .str .ptr , rval -> o .via .str .size );
106+ flb_sds_cat (out_buf , "\n" , 1 );
107+ }
104108 }
105109 else if (rval -> type == FLB_RA_FLOAT ) {
106110 out_buf = flb_sds_create_size (64 );
107111 if (out_buf ) {
108- flb_sds_printf (& out_buf , "%f" , rval -> val .f64 );
112+ flb_sds_printf (& out_buf , "%f\n " , rval -> val .f64 );
109113 }
110114 }
111115 else if (rval -> type == FLB_RA_INT ) {
112116 out_buf = flb_sds_create_size (64 );
113117 if (out_buf ) {
114- flb_sds_printf (& out_buf , "%" PRId64 , rval -> val .i64 );
118+ flb_sds_printf (& out_buf , "%" PRId64 "\n" , rval -> val .i64 );
115119 }
116120 }
117121 else {
@@ -131,11 +135,13 @@ static flb_sds_t cb_azb_msgpack_extract_log_key(void *out_context, const char *d
131135 flb_ra_key_value_destroy (rval );
132136 rval = NULL ;
133137
138+ /* Successfully found and processed log_key, exit loop */
134139 break ;
135140 }
136141 else if (ret == MSGPACK_UNPACK_CONTINUE ) {
137- /* Continue unpacking */
138- continue ;
142+ /* Buffer exhausted or truncated data, stop processing */
143+ flb_plg_debug (ctx -> ins , "msgpack unpack needs more data or data truncated" );
144+ break ;
139145 }
140146 else if (ret == MSGPACK_UNPACK_PARSE_ERROR ) {
141147 flb_errno ();
@@ -144,7 +150,7 @@ static flb_sds_t cb_azb_msgpack_extract_log_key(void *out_context, const char *d
144150 }
145151 else {
146152 flb_errno ();
147- flb_plg_error (ctx -> ins , "unexpected msgpack unpack return code" );
153+ flb_plg_error (ctx -> ins , "unexpected msgpack unpack return code %d" , ret );
148154 break ;
149155 }
150156 }
0 commit comments