@@ -43,6 +43,7 @@ use databend_common_grpc::GrpcConnectionError;
4343use databend_common_grpc:: RpcClientConf ;
4444use databend_common_grpc:: RpcClientTlsConfig ;
4545use databend_common_meta_api:: reply:: reply_to_api_result;
46+ use databend_common_meta_kvapi:: kvapi:: ListKVReq ;
4647use databend_common_meta_types:: anyerror:: AnyError ;
4748use databend_common_meta_types:: protobuf as pb;
4849use databend_common_meta_types:: protobuf:: meta_service_client:: MetaServiceClient ;
@@ -54,6 +55,7 @@ use databend_common_meta_types::protobuf::HandshakeRequest;
5455use databend_common_meta_types:: protobuf:: MemberListReply ;
5556use databend_common_meta_types:: protobuf:: MemberListRequest ;
5657use databend_common_meta_types:: protobuf:: RaftRequest ;
58+ use databend_common_meta_types:: protobuf:: StreamItem ;
5759use databend_common_meta_types:: protobuf:: WatchRequest ;
5860use databend_common_meta_types:: protobuf:: WatchResponse ;
5961use databend_common_meta_types:: ConnectionError ;
@@ -91,6 +93,7 @@ use tonic::Request;
9193use tonic:: Status ;
9294
9395use crate :: endpoints:: Endpoints ;
96+ use crate :: errors:: CreationError ;
9497use crate :: established_client:: EstablishedClient ;
9598use crate :: from_digit_ver;
9699use crate :: grpc_action:: RequestFor ;
@@ -101,6 +104,7 @@ use crate::to_digit_ver;
101104use crate :: ClientWorkerRequest ;
102105use crate :: MetaGrpcReadReq ;
103106use crate :: MetaGrpcReq ;
107+ use crate :: Streamed ;
104108use crate :: METACLI_COMMIT_SEMVER ;
105109use crate :: MIN_METASRV_SEMVER ;
106110
@@ -294,6 +298,16 @@ impl Drop for ClientHandle {
294298}
295299
296300impl ClientHandle {
301+ pub async fn list ( & self , prefix : & str ) -> Result < BoxStream < StreamItem > , MetaError > {
302+ let strm = self
303+ . request ( Streamed ( ListKVReq {
304+ prefix : prefix. to_string ( ) ,
305+ } ) )
306+ . await ?;
307+
308+ Ok ( strm)
309+ }
310+
297311 /// Send a request to the internal worker task, which will be running in another runtime.
298312 #[ fastrace:: trace]
299313 #[ async_backtrace:: framed]
@@ -305,7 +319,10 @@ impl ClientHandle {
305319 <Result < Req :: Reply , E > as TryFrom < Response > >:: Error : std:: fmt:: Display ,
306320 E : From < MetaClientError > + Debug ,
307321 {
308- let rx = self . send_request_to_worker ( req) ?;
322+ let rx = self
323+ . send_request_to_worker ( req)
324+ . map_err ( MetaClientError :: from) ?;
325+
309326 UnlimitedFuture :: create ( async move {
310327 let _g = grpc_metrics:: client_request_inflight. counter_guard ( ) ;
311328 rx. await
@@ -326,7 +343,10 @@ impl ClientHandle {
326343 {
327344 let _g = grpc_metrics:: client_request_inflight. counter_guard ( ) ;
328345
329- let rx = self . send_request_to_worker ( req) ?;
346+ let rx = self
347+ . send_request_to_worker ( req)
348+ . map_err ( MetaClientError :: from) ?;
349+
330350 let recv_res = rx. blocking_recv ( ) ;
331351 Self :: parse_worker_result ( recv_res)
332352 }
@@ -335,7 +355,7 @@ impl ClientHandle {
335355 fn send_request_to_worker < Req > (
336356 & self ,
337357 req : Req ,
338- ) -> Result < oneshot:: Receiver < Response > , MetaClientError >
358+ ) -> Result < oneshot:: Receiver < Response > , ConnectionError >
339359 where
340360 Req : Into < message:: Request > ,
341361 {
@@ -365,7 +385,7 @@ impl ClientHandle {
365385 ) ) ;
366386
367387 error ! ( "{}" , err) ;
368- MetaClientError :: ClientRuntimeError ( err)
388+ ConnectionError :: new ( err, "Meta ClientHandle failed to send request to worker" )
369389 } ) ?;
370390
371391 Ok ( rx)
@@ -381,13 +401,14 @@ impl ClientHandle {
381401 E : From < MetaClientError > + Debug ,
382402 {
383403 let response = res. map_err ( |e| {
404+ let err = AnyError :: new ( & e) . add_context ( || "when recv resp from MetaGrpcClient worker" ) ;
384405 error ! (
385406 error : ? =( & e) ;
386407 "Meta ClientHandle recv response from meta client worker failed"
387408 ) ;
388- MetaClientError :: ClientRuntimeError (
389- AnyError :: new ( & e ) . add_context ( || "when recv resp from MetaGrpcClient worker") ,
390- )
409+ let conn_err =
410+ ConnectionError :: new ( err , "Meta ClientHandle failed to receive from worker") ;
411+ MetaClientError :: from ( conn_err )
391412 } ) ?;
392413
393414 let res: Result < Reply , E > = response
@@ -462,7 +483,7 @@ impl MetaGrpcClient {
462483 ///
463484 /// The worker is a singleton and the returned handle is cheap to clone.
464485 /// When all handles are dropped the worker will quit, then the runtime will be destroyed.
465- pub fn try_new ( conf : & RpcClientConf ) -> Result < Arc < ClientHandle > , MetaClientError > {
486+ pub fn try_new ( conf : & RpcClientConf ) -> Result < Arc < ClientHandle > , CreationError > {
466487 Self :: try_create (
467488 conf. get_endpoints ( ) ,
468489 & conf. username ,
@@ -481,7 +502,7 @@ impl MetaGrpcClient {
481502 timeout : Option < Duration > ,
482503 auto_sync_interval : Option < Duration > ,
483504 tls_config : Option < RpcClientTlsConfig > ,
484- ) -> Result < Arc < ClientHandle > , MetaClientError > {
505+ ) -> Result < Arc < ClientHandle > , CreationError > {
485506 Self :: endpoints_non_empty ( & endpoints_str) ?;
486507
487508 let endpoints = Arc :: new ( Mutex :: new ( Endpoints :: new ( endpoints_str. clone ( ) ) ) ) ;
@@ -494,9 +515,7 @@ impl MetaGrpcClient {
494515 Some ( format ! ( "meta-client-rt-{}" , endpoints_str. join( "," ) ) ) ,
495516 )
496517 . map_err ( |e| {
497- MetaClientError :: ClientRuntimeError (
498- AnyError :: new ( & e) . add_context ( || "when creating meta-client" ) ,
499- )
518+ CreationError :: new_runtime_error ( e. to_string ( ) ) . context ( "when creating meta-client" )
500519 } ) ?;
501520 let rt = Arc :: new ( rt) ;
502521
@@ -768,11 +787,9 @@ impl MetaGrpcClient {
768787 ) )
769788 }
770789
771- pub fn endpoints_non_empty ( endpoints : & [ String ] ) -> Result < ( ) , MetaClientError > {
790+ pub fn endpoints_non_empty ( endpoints : & [ String ] ) -> Result < ( ) , CreationError > {
772791 if endpoints. is_empty ( ) {
773- return Err ( MetaClientError :: ConfigError ( AnyError :: error (
774- "endpoints is empty" ,
775- ) ) ) ;
792+ return Err ( CreationError :: new_config_error ( "endpoints is empty" ) ) ;
776793 }
777794 Ok ( ( ) )
778795 }
@@ -784,8 +801,8 @@ impl MetaGrpcClient {
784801
785802 #[ fastrace:: trace]
786803 #[ async_backtrace:: framed]
787- pub async fn set_endpoints ( & self , endpoints : Vec < String > ) -> Result < ( ) , MetaError > {
788- Self :: endpoints_non_empty ( & endpoints) ? ;
804+ pub fn set_endpoints ( & self , endpoints : Vec < String > ) {
805+ debug_assert ! ( ! endpoints. is_empty ( ) ) ;
789806
790807 // Older meta nodes may not store endpoint information and need to be filtered out.
791808 let distinct_cnt = endpoints. iter ( ) . filter ( |n| !( * n) . is_empty ( ) ) . count ( ) ;
@@ -798,12 +815,11 @@ impl MetaGrpcClient {
798815 endpoints. len( ) ,
799816 endpoints
800817 ) ;
801- return Ok ( ( ) ) ;
818+ return ;
802819 }
803820
804821 let mut eps = self . endpoints . lock ( ) ;
805822 eps. replace_nodes ( endpoints) ;
806- Ok ( ( ) )
807823 }
808824
809825 #[ fastrace:: trace]
@@ -833,7 +849,12 @@ impl MetaGrpcClient {
833849 let result: Vec < String > = endpoints?. data ;
834850 debug ! ( "received meta endpoints: {:?}" , result) ;
835851
836- self . set_endpoints ( result) . await ?;
852+ if result. is_empty ( ) {
853+ error ! ( "Can not update local endpoints, the returned result is empty" ) ;
854+ } else {
855+ self . set_endpoints ( result) ;
856+ }
857+
837858 Ok ( ( ) )
838859 }
839860
0 commit comments