@@ -10,7 +10,7 @@ use std::sync::{Arc, Mutex, Weak};
1010use std:: task:: { Context , Poll , Waker } ;
1111
1212use dlopen2:: raw:: Library ;
13- use taos_query:: common:: { c_field_t, raw_data_t, RawData , SmlData } ;
13+ use taos_query:: common:: { c_field_t, raw_data_t, JsonMeta , RawData , SmlData } ;
1414use taos_query:: prelude:: { Code , Field , Precision , RawError } ;
1515use taos_query:: tmq:: Assignment ;
1616use taos_query:: RawBlock ;
@@ -1763,13 +1763,25 @@ impl RawRes {
17631763 }
17641764 }
17651765 #[ inline]
1766- pub ( crate ) fn tmq_get_json_meta ( & self ) -> String {
1766+ pub ( crate ) fn tmq_get_json_meta ( & self ) -> Result < JsonMeta , RawError > {
17671767 unsafe {
17681768 let meta = ( self . c . tmq . as_ref ( ) . unwrap ( ) . tmq_get_json_meta ) ( self . as_ptr ( ) ) ;
1769- let meta_cstr = CStr :: from_ptr ( meta) . to_string_lossy ( ) . into_owned ( ) ;
1770- ( self . c . tmq . as_ref ( ) . unwrap ( ) . tmq_free_json_meta ) ( meta) ;
1771- tracing:: debug!( json = meta_cstr, "Received TMQ json meta" ) ;
1772- meta_cstr
1769+ if meta. is_null ( ) {
1770+ return Err ( RawError :: from_string ( "tmq_get_json_meta returns null" ) ) ;
1771+ }
1772+
1773+ let meta_cstr = CStr :: from_ptr ( meta) ;
1774+ match serde_json:: from_slice ( meta_cstr. to_bytes ( ) ) {
1775+ Ok ( json_meta) => {
1776+ tracing:: trace!( json = %meta_cstr. to_string_lossy( ) , "Received TMQ json meta" ) ;
1777+ ( self . c . tmq . as_ref ( ) . unwrap ( ) . tmq_free_json_meta ) ( meta) ;
1778+ Ok ( json_meta)
1779+ }
1780+ Err ( err) => {
1781+ ( self . c . tmq . as_ref ( ) . unwrap ( ) . tmq_free_json_meta ) ( meta) ;
1782+ Err ( RawError :: from_string ( err. to_string ( ) ) )
1783+ }
1784+ }
17731785 }
17741786 }
17751787
@@ -1781,16 +1793,16 @@ impl RawRes {
17811793 raw_type : 0 ,
17821794 } ;
17831795 unsafe {
1784- let _code = ( self . c . tmq . as_ref ( ) . unwrap ( ) . tmq_get_raw ) ( self . as_ptr ( ) , & mut meta as _ ) ;
1785- if _code == 0 {
1796+ let code = ( self . c . tmq . as_ref ( ) . unwrap ( ) . tmq_get_raw ) ( self . as_ptr ( ) , & mut meta as _ ) ;
1797+ if code == 0 {
17861798 return Ok ( RawData :: new (
17871799 meta,
17881800 self . c . tmq . as_ref ( ) . unwrap ( ) . tmq_free_raw ,
17891801 ) ) ;
17901802 } else {
1791- let c_str = ( self . c . tmq . as_ref ( ) . unwrap ( ) . tmq_err2str ) ( tmq_resp_err_t ( _code ) ) ;
1803+ let c_str = ( self . c . tmq . as_ref ( ) . unwrap ( ) . tmq_err2str ) ( tmq_resp_err_t ( code ) ) ;
17921804 let err = CStr :: from_ptr ( c_str) . to_string_lossy ( ) . into_owned ( ) ;
1793- Err ( RawError :: new_with_context ( _code , err, "tmq_get_raw error" ) )
1805+ Err ( RawError :: new_with_context ( code , err, "tmq_get_raw error" ) )
17941806 }
17951807 }
17961808 }
0 commit comments