Skip to content

Commit d3bc3a6

Browse files
authored
refactor: Simplify dereferencing logic in MetaStore and update async methods for better clarity (#17938)
1 parent 937894f commit d3bc3a6

File tree

6 files changed

+35
-112
lines changed

6 files changed

+35
-112
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta/store/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ databend-common-meta-kvapi = { workspace = true }
2020
databend-common-meta-semaphore = { workspace = true }
2121
databend-common-meta-types = { workspace = true }
2222
databend-meta = { workspace = true }
23+
futures = { workspace = true }
2324
log = { workspace = true }
2425
tempfile = { workspace = true }
2526
tokio = { workspace = true }

src/meta/store/src/lib.rs

Lines changed: 19 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,19 @@ pub(crate) mod local;
1717
use std::ops::Deref;
1818
use std::pin::Pin;
1919
use std::sync::Arc;
20-
use std::task::Context;
21-
use std::task::Poll;
2220
use std::time::Duration;
2321

2422
use databend_common_grpc::RpcClientConf;
2523
use databend_common_meta_client::errors::CreationError;
2624
use databend_common_meta_client::ClientHandle;
2725
use databend_common_meta_client::MetaGrpcClient;
28-
use databend_common_meta_kvapi::kvapi;
29-
use databend_common_meta_kvapi::kvapi::KVStream;
30-
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
3126
use databend_common_meta_semaphore::acquirer::Permit;
3227
use databend_common_meta_semaphore::errors::AcquireError;
3328
use databend_common_meta_semaphore::Semaphore;
3429
use databend_common_meta_types::protobuf::WatchRequest;
3530
use databend_common_meta_types::protobuf::WatchResponse;
3631
use databend_common_meta_types::MetaError;
37-
use databend_common_meta_types::TxnReply;
38-
use databend_common_meta_types::TxnRequest;
39-
use databend_common_meta_types::UpsertKV;
32+
use futures::stream::TryStreamExt;
4033
pub use local::LocalMetaService;
4134
use log::info;
4235
use tokio_stream::Stream;
@@ -56,6 +49,17 @@ pub enum MetaStore {
5649
R(Arc<ClientHandle>),
5750
}
5851

52+
impl Deref for MetaStore {
53+
type Target = Arc<ClientHandle>;
54+
55+
fn deref(&self) -> &Self::Target {
56+
match self {
57+
MetaStore::L(l) => l.deref(),
58+
MetaStore::R(r) => r,
59+
}
60+
}
61+
}
62+
5963
impl MetaStore {
6064
/// Create a local meta service for testing.
6165
///
@@ -79,24 +83,16 @@ impl MetaStore {
7983
}
8084
}
8185

82-
pub async fn get_local_addr(&self) -> std::result::Result<Option<String>, MetaError> {
83-
let client = match self {
84-
MetaStore::L(l) => l.deref().deref(),
85-
MetaStore::R(grpc_client) => grpc_client,
86-
};
87-
88-
let client_info = client.get_client_info().await?;
89-
Ok(Some(client_info.client_addr))
86+
pub async fn get_local_addr(&self) -> Result<String, MetaError> {
87+
let client_info = self.get_client_info().await?;
88+
Ok(client_info.client_addr)
9089
}
9190

9291
pub async fn watch(&self, request: WatchRequest) -> Result<WatchStream, MetaError> {
93-
let client = match self {
94-
MetaStore::L(l) => l.deref(),
95-
MetaStore::R(grpc_client) => grpc_client,
96-
};
92+
let client = self.deref();
9793

9894
let streaming = client.request(request).await?;
99-
Ok(Box::pin(WatchResponseStream::create(streaming)))
95+
Ok(Box::pin(streaming.map_err(MetaError::from)))
10096
}
10197

10298
pub async fn new_acquired(
@@ -106,11 +102,7 @@ impl MetaStore {
106102
id: impl ToString,
107103
lease: Duration,
108104
) -> Result<Permit, AcquireError> {
109-
let client = match self {
110-
MetaStore::L(l) => l.deref(),
111-
MetaStore::R(grpc_client) => grpc_client,
112-
};
113-
105+
let client = self.deref();
114106
Semaphore::new_acquired(client.clone(), prefix, capacity, id, lease).await
115107
}
116108

@@ -121,48 +113,11 @@ impl MetaStore {
121113
id: impl ToString,
122114
lease: Duration,
123115
) -> Result<Permit, AcquireError> {
124-
let client = match self {
125-
MetaStore::L(l) => l.deref(),
126-
MetaStore::R(grpc_client) => grpc_client,
127-
};
128-
116+
let client = self.deref();
129117
Semaphore::new_acquired_by_time(client.clone(), prefix, capacity, id, lease).await
130118
}
131119
}
132120

133-
#[async_trait::async_trait]
134-
impl kvapi::KVApi for MetaStore {
135-
type Error = MetaError;
136-
137-
async fn upsert_kv(&self, act: UpsertKV) -> Result<UpsertKVReply, Self::Error> {
138-
match self {
139-
MetaStore::L(x) => x.upsert_kv(act).await,
140-
MetaStore::R(x) => x.upsert_kv(act).await,
141-
}
142-
}
143-
144-
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error> {
145-
match self {
146-
MetaStore::L(x) => x.get_kv_stream(keys).await,
147-
MetaStore::R(x) => x.get_kv_stream(keys).await,
148-
}
149-
}
150-
151-
async fn list_kv(&self, prefix: &str) -> Result<KVStream<Self::Error>, Self::Error> {
152-
match self {
153-
MetaStore::L(x) => x.list_kv(prefix).await,
154-
MetaStore::R(x) => x.list_kv(prefix).await,
155-
}
156-
}
157-
158-
async fn transaction(&self, txn: TxnRequest) -> Result<TxnReply, Self::Error> {
159-
match self {
160-
MetaStore::L(x) => x.transaction(txn).await,
161-
MetaStore::R(x) => x.transaction(txn).await,
162-
}
163-
}
164-
}
165-
166121
impl MetaStoreProvider {
167122
pub fn new(rpc_conf: RpcClientConf) -> Self {
168123
MetaStoreProvider { rpc_conf }
@@ -188,37 +143,3 @@ impl MetaStoreProvider {
188143
}
189144
}
190145
}
191-
192-
pub struct WatchResponseStream<E, S>
193-
where
194-
E: Into<MetaError> + Send + 'static,
195-
S: Stream<Item = Result<WatchResponse, E>> + Send + Unpin + 'static,
196-
{
197-
inner: S,
198-
}
199-
200-
impl<E, S> WatchResponseStream<E, S>
201-
where
202-
E: Into<MetaError> + Send + 'static,
203-
S: Stream<Item = Result<WatchResponse, E>> + Send + Unpin + 'static,
204-
{
205-
pub fn create(inner: S) -> WatchResponseStream<E, S> {
206-
WatchResponseStream { inner }
207-
}
208-
}
209-
210-
impl<E, S> Stream for WatchResponseStream<E, S>
211-
where
212-
E: Into<MetaError> + Send + 'static,
213-
S: Stream<Item = Result<WatchResponse, E>> + Send + Unpin + 'static,
214-
{
215-
type Item = Result<WatchResponse, MetaError>;
216-
217-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
218-
Pin::new(&mut self.inner).poll_next(cx).map(|x| match x {
219-
None => None,
220-
Some(Ok(resp)) => Some(Ok(resp)),
221-
Some(Err(e)) => Some(Err(e.into())),
222-
})
223-
}
224-
}

src/query/management/src/warehouse/warehouse_api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ pub trait WarehouseApi: Sync + Send {
126126
cluster: &str,
127127
) -> Result<Vec<NodeInfo>>;
128128

129-
async fn get_local_addr(&self) -> Result<Option<String>>;
129+
async fn get_local_addr(&self) -> Result<String>;
130130

131131
async fn list_online_nodes(&self) -> Result<Vec<NodeInfo>>;
132132

src/query/management/src/warehouse/warehouse_mgr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2121,7 +2121,7 @@ impl WarehouseApi for WarehouseMgr {
21212121

21222122
#[async_backtrace::framed]
21232123
#[fastrace::trace]
2124-
async fn get_local_addr(&self) -> Result<Option<String>> {
2124+
async fn get_local_addr(&self) -> Result<String> {
21252125
Ok(self.metastore.get_local_addr().await?)
21262126
}
21272127

src/query/service/src/clusters/cluster.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -512,18 +512,18 @@ impl ClusterDiscovery {
512512
if let Ok(socket_addr) = SocketAddr::from_str(lookup_ip) {
513513
let ip_addr = socket_addr.ip();
514514
if ip_addr.is_loopback() || ip_addr.is_unspecified() {
515-
if let Some(local_addr) = self.warehouse_manager.get_local_addr().await? {
516-
let local_socket_addr = SocketAddr::from_str(&local_addr)?;
517-
let new_addr = format!("{}:{}", local_socket_addr.ip(), socket_addr.port());
518-
warn!(
519-
"Detected loopback or unspecified address as {} endpoint. \
520-
We rewrite it(\"{}\" -> \"{}\") for advertising to other nodes. \
521-
If there are proxies between nodes, you can specify endpoint with --{}.",
522-
typ, lookup_ip, new_addr, typ
523-
);
524-
525-
*lookup_ip = new_addr;
526-
}
515+
let local_addr = self.warehouse_manager.get_local_addr().await?;
516+
517+
let local_socket_addr = SocketAddr::from_str(&local_addr)?;
518+
let new_addr = format!("{}:{}", local_socket_addr.ip(), socket_addr.port());
519+
warn!(
520+
"Detected loopback or unspecified address as {} endpoint. \
521+
We rewrite it(\"{}\" -> \"{}\") for advertising to other nodes. \
522+
If there are proxies between nodes, you can specify endpoint with --{}.",
523+
typ, lookup_ip, new_addr, typ
524+
);
525+
526+
*lookup_ip = new_addr;
527527
}
528528
}
529529
}

0 commit comments

Comments
 (0)