Skip to content

Commit 425e7b2

Browse files
authored
fix(meta-service): watcher metrics should be updated when sender is created and dropped (#18182)
* refactor(meta-service): remove direct `KVApi` impl on `MetaNode` This commit extracts the implementation of `KVApi` to a standalone struct. * fix(meta-service): watcher metrics should be updated when sender is created and dropped But not when it is added to or removed from the `Dispatcher`. Because a sender can be removed more than once, since internally the dispatcher is a span-map, it stores multiple instance of a sender. - See: databendlabs/watcher#7 * M src/meta/service/src/meta_service/meta_node_kv_api_impl.rs
1 parent dd87862 commit 425e7b2

File tree

9 files changed

+44
-30
lines changed

9 files changed

+44
-30
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ url = "2.5.4"
532532
uuid = { version = "1.10.0", features = ["std", "serde", "v4", "v7"] }
533533
volo-thrift = "0.10"
534534
walkdir = "2.3.2"
535-
watcher = { version = "0.4.0" }
535+
watcher = { version = "0.4.1" }
536536
wiremock = "0.6"
537537
wkt = "0.11.1"
538538
xorf = { version = "0.11.0", default-features = false, features = ["binary-fuse"] }
@@ -658,5 +658,5 @@ sub-cache = { git = "https://github.com/databendlabs/sub-cache", tag = "v0.2.1"
658658
tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" }
659659
tantivy-common = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370", package = "tantivy-common" }
660660
tantivy-jieba = { git = "https://github.com/datafuse-extras/tantivy-jieba", rev = "0e300e9" }
661-
watcher = { git = "https://github.com/databendlabs/watcher", tag = "v0.4.0" }
661+
watcher = { git = "https://github.com/databendlabs/watcher", tag = "v0.4.1" }
662662
xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", tag = "databend-alpha.4" }

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ impl MetaServiceImpl {
145145
let reply = match &req {
146146
MetaGrpcReq::UpsertKV(a) => {
147147
let res = m
148+
.kv_api()
148149
.upsert_kv(a.clone())
149150
.log_elapsed_info(format!("UpsertKV: {:?}", a))
150151
.await;

src/meta/service/src/meta_service/meta_node.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ use crate::meta_service::errors::grpc_error_to_network_err;
9393
use crate::meta_service::forwarder::MetaForwarder;
9494
use crate::meta_service::meta_leader::MetaLeader;
9595
use crate::meta_service::meta_node_kv_api_impl::MetaKVApi;
96+
use crate::meta_service::meta_node_kv_api_impl::MetaKVApiOwned;
9697
use crate::meta_service::meta_node_status::MetaNodeStatus;
9798
use crate::meta_service::watcher::DispatcherHandle;
9899
use crate::meta_service::watcher::WatchTypes;
@@ -1199,6 +1200,10 @@ impl MetaNode {
11991200
pub fn kv_api(&self) -> MetaKVApi {
12001201
MetaKVApi::new(self)
12011202
}
1203+
1204+
pub fn kv_api_owned(self: &Arc<Self>) -> MetaKVApiOwned {
1205+
MetaKVApiOwned::new(self.clone())
1206+
}
12021207
}
12031208

12041209
pub(crate) fn event_filter_from_filter_type(filter_type: FilterType) -> EventFilter {

src/meta/service/src/meta_service/meta_node_kv_api_impl.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
1517
use async_trait::async_trait;
1618
use databend_common_meta_client::MetaGrpcReadReq;
1719
use databend_common_meta_kvapi::kvapi;
@@ -117,31 +119,34 @@ impl<'a> kvapi::KVApi for MetaKVApi<'a> {
117119
}
118120
}
119121

120-
/// Impl kvapi::KVApi for MetaNode.
121-
///
122-
/// Write through raft-log.
123-
/// Read through local state machine, which may not be consistent.
124-
/// E.g. Read is not guaranteed to see a write.
122+
/// A wrapper of MetaNode that implements kvapi::KVApi.
123+
pub struct MetaKVApiOwned {
124+
inner: Arc<MetaNode>,
125+
}
126+
127+
impl MetaKVApiOwned {
128+
pub fn new(inner: Arc<MetaNode>) -> Self {
129+
Self { inner }
130+
}
131+
}
132+
125133
#[async_trait]
126-
impl kvapi::KVApi for MetaNode {
134+
impl kvapi::KVApi for MetaKVApiOwned {
127135
type Error = MetaAPIError;
128136

129137
async fn upsert_kv(&self, act: UpsertKV) -> Result<UpsertKVReply, Self::Error> {
130-
self.kv_api().upsert_kv(act).await
138+
self.inner.kv_api().upsert_kv(act).await
131139
}
132140

133-
#[fastrace::trace]
134141
async fn get_kv_stream(&self, keys: &[String]) -> Result<KVStream<Self::Error>, Self::Error> {
135-
self.kv_api().get_kv_stream(keys).await
142+
self.inner.kv_api().get_kv_stream(keys).await
136143
}
137144

138-
#[fastrace::trace]
139145
async fn list_kv(&self, prefix: &str) -> Result<KVStream<Self::Error>, Self::Error> {
140-
self.kv_api().list_kv(prefix).await
146+
self.inner.kv_api().list_kv(prefix).await
141147
}
142148

143-
#[fastrace::trace]
144149
async fn transaction(&self, txn: TxnRequest) -> Result<TxnReply, Self::Error> {
145-
self.kv_api().transaction(txn).await
150+
self.inner.kv_api().transaction(txn).await
146151
}
147152
}

src/meta/service/src/meta_service/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ mod errors;
1616
mod forwarder;
1717
mod meta_node_kv_api_impl;
1818

19+
pub use meta_node_kv_api_impl::MetaKVApi;
20+
pub use meta_node_kv_api_impl::MetaKVApiOwned;
21+
1922
pub(crate) mod snapshot_receiver_v1;
2023

2124
pub mod meta_leader;

src/meta/service/tests/it/meta_node/meta_node_kv_api.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::sync::Mutex;
1717

1818
use async_trait::async_trait;
1919
use databend_common_meta_kvapi::kvapi;
20-
use databend_meta::meta_service::MetaNode;
20+
use databend_meta::meta_service::MetaKVApiOwned;
2121
use maplit::btreeset;
2222
use test_harness::test;
2323

@@ -32,8 +32,8 @@ struct MetaNodeUnitTestBuilder {
3232
}
3333

3434
#[async_trait]
35-
impl kvapi::ApiBuilder<Arc<MetaNode>> for MetaNodeUnitTestBuilder {
36-
async fn build(&self) -> Arc<MetaNode> {
35+
impl kvapi::ApiBuilder<MetaKVApiOwned> for MetaNodeUnitTestBuilder {
36+
async fn build(&self) -> MetaKVApiOwned {
3737
let (_id, tc) = start_meta_node_leader().await.unwrap();
3838

3939
let meta_node = tc.meta_node();
@@ -43,20 +43,20 @@ impl kvapi::ApiBuilder<Arc<MetaNode>> for MetaNodeUnitTestBuilder {
4343
tcs.push(tc);
4444
}
4545

46-
meta_node
46+
meta_node.kv_api_owned()
4747
}
4848

49-
async fn build_cluster(&self) -> Vec<Arc<MetaNode>> {
49+
async fn build_cluster(&self) -> Vec<MetaKVApiOwned> {
5050
let (_log_index, tcs) = start_meta_node_cluster(btreeset! {0,1,2}, btreeset! {3,4})
5151
.await
5252
.unwrap();
5353

5454
let cluster = vec![
55-
tcs[0].meta_node(),
56-
tcs[1].meta_node(),
57-
tcs[2].meta_node(),
58-
tcs[3].meta_node(),
59-
tcs[4].meta_node(),
55+
tcs[0].meta_node().kv_api_owned(),
56+
tcs[1].meta_node().kv_api_owned(),
57+
tcs[2].meta_node().kv_api_owned(),
58+
tcs[3].meta_node().kv_api_owned(),
59+
tcs[4].meta_node().kv_api_owned(),
6060
];
6161

6262
{

src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ async fn test_meta_node_replicate_kv_with_expire() -> anyhow::Result<()> {
6868

6969
info!("--- get kv with expire now+3");
7070
let seq = {
71-
let resp = leader.get_kv(key).await?;
71+
let resp = leader.kv_api().get_kv(key).await?;
7272
let seq_v = resp.unwrap();
7373
assert_eq!(Some(KVMeta::new_expire(now_sec + 3)), seq_v.meta);
7474
seq_v.seq
@@ -85,7 +85,7 @@ async fn test_meta_node_replicate_kv_with_expire() -> anyhow::Result<()> {
8585

8686
info!("--- get updated kv with new expire now+1000, assert the updated value");
8787
{
88-
let resp = leader.get_kv(key).await?;
88+
let resp = leader.kv_api().get_kv(key).await?;
8989
let seq_v = resp.unwrap();
9090
let want = (now_sec + 1000) * 1000;
9191
let expire_ms = seq_v.meta.unwrap().get_expire_at_ms().unwrap();

src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -849,7 +849,7 @@ async fn assert_get_kv(
849849
value: &str,
850850
) -> anyhow::Result<()> {
851851
for (i, mn) in meta_nodes.iter().enumerate() {
852-
let got = mn.get_kv(key).await?;
852+
let got = mn.kv_api().get_kv(key).await?;
853853
assert_eq!(
854854
value.to_string().into_bytes(),
855855
got.unwrap().data,

0 commit comments

Comments
 (0)