@@ -10,7 +10,7 @@ use std::sync::{Arc, Mutex, Weak};
1010use std:: task:: { Context , Poll , Waker } ;
1111
1212use dlopen2:: raw:: Library ;
13- use taos_query:: common:: { c_field_t, raw_data_t, RawData , SmlData } ;
13+ use taos_query:: common:: { c_field_t, raw_data_t, JsonMeta , RawData , SmlData } ;
1414use taos_query:: prelude:: { Code , Field , Precision , RawError } ;
1515use taos_query:: tmq:: Assignment ;
1616use taos_query:: RawBlock ;
@@ -1763,13 +1763,25 @@ impl RawRes {
17631763 }
17641764 }
17651765 #[ inline]
1766- pub ( crate ) fn tmq_get_json_meta ( & self ) -> String {
1766+ pub ( crate ) fn tmq_get_json_meta ( & self ) -> Result < JsonMeta , RawError > {
17671767 unsafe {
17681768 let meta = ( self . c . tmq . as_ref ( ) . unwrap ( ) . tmq_get_json_meta ) ( self . as_ptr ( ) ) ;
1769- let meta_cstr = CStr :: from_ptr ( meta) . to_string_lossy ( ) . into_owned ( ) ;
1770- ( self . c . tmq . as_ref ( ) . unwrap ( ) . tmq_free_json_meta ) ( meta) ;
1771- tracing:: debug!( json = meta_cstr, "Received TMQ json meta" ) ;
1772- meta_cstr
1769+ if meta. is_null ( ) {
1770+ return Err ( RawError :: from_string ( "tmq_get_json_meta returns null" ) ) ;
1771+ }
1772+
1773+ let meta_cstr = CStr :: from_ptr ( meta) ;
1774+ match serde_json:: from_slice ( meta_cstr. to_bytes ( ) ) {
1775+ Ok ( json_meta) => {
1776+ tracing:: trace!( json = %meta_cstr. to_string_lossy( ) , "Received TMQ json meta" ) ;
1777+ ( self . c . tmq . as_ref ( ) . unwrap ( ) . tmq_free_json_meta ) ( meta) ;
1778+ Ok ( json_meta)
1779+ }
1780+ Err ( err) => {
1781+ ( self . c . tmq . as_ref ( ) . unwrap ( ) . tmq_free_json_meta ) ( meta) ;
1782+ Err ( RawError :: from_string ( err. to_string ( ) ) )
1783+ }
1784+ }
17731785 }
17741786 }
17751787
0 commit comments