Skip to content

Commit 4f707b2

Browse files
authored
feat(meta-client): add watch_with_initialization() method (#17944)
This method requires the databend-meta server to respond a `is_initialization` flag in each watch response for the client to distinguish between initialization events and key-value change events. This feature is required by the cache mod.
1 parent fb33ae5 commit 4f707b2

File tree

14 files changed

+109
-15
lines changed

14 files changed

+109
-15
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta/app/src/data_id.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::tenant_key::resource::TenantResource;
3131
/// e.g. TableId, DatabaseId as a value.
3232
///
3333
/// `DataId<R>` can be dereferenced to `u64`.
34+
/// `DataId<R>` will take place of `Id<T>` in future.
3435
///
3536
/// When an id is used as a key in a key-value store,
3637
/// it is serialized in another way to keep order.

src/meta/client/src/client_handle.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use databend_common_meta_kvapi::kvapi::ListKVReq;
2929
use databend_common_meta_types::protobuf::ClientInfo;
3030
use databend_common_meta_types::protobuf::ClusterStatus;
3131
use databend_common_meta_types::protobuf::StreamItem;
32+
use databend_common_meta_types::protobuf::WatchRequest;
33+
use databend_common_meta_types::protobuf::WatchResponse;
3234
use databend_common_meta_types::ConnectionError;
3335
use databend_common_meta_types::MetaClientError;
3436
use databend_common_meta_types::MetaError;
@@ -45,6 +47,7 @@ use crate::grpc_metrics;
4547
use crate::message;
4648
use crate::message::Response;
4749
use crate::ClientWorkerRequest;
50+
use crate::InitFlag;
4851
use crate::RequestFor;
4952
use crate::Streamed;
5053

@@ -104,6 +107,22 @@ impl ClientHandle {
104107
Ok(strm)
105108
}
106109

110+
/// Watch without requiring the initialization support
111+
pub async fn watch(
112+
&self,
113+
watch: WatchRequest,
114+
) -> Result<tonic::codec::Streaming<WatchResponse>, MetaClientError> {
115+
self.request(watch).await
116+
}
117+
118+
/// Watch with requiring the initialization support
119+
pub async fn watch_with_initialization(
120+
&self,
121+
watch: WatchRequest,
122+
) -> Result<tonic::codec::Streaming<WatchResponse>, MetaClientError> {
123+
self.request((watch, InitFlag)).await
124+
}
125+
107126
/// Send a request to the internal worker task, which will be running in another runtime.
108127
#[fastrace::trace]
109128
#[async_backtrace::framed]

src/meta/client/src/established_client.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use databend_common_meta_types::protobuf::StreamItem;
2828
use databend_common_meta_types::protobuf::WatchRequest;
2929
use databend_common_meta_types::protobuf::WatchResponse;
3030
use databend_common_meta_types::GrpcHelper;
31+
use databend_common_meta_types::MetaHandshakeError;
3132
use databend_common_meta_types::TxnReply;
3233
use databend_common_meta_types::TxnRequest;
3334
use log::error;
@@ -44,6 +45,7 @@ use crate::endpoints::Endpoints;
4445
use crate::grpc_client::AuthInterceptor;
4546
use crate::grpc_client::RealClient;
4647
use crate::required::Features;
48+
use crate::FeatureSpec;
4749

4850
/// Update the client state according to the result of an RPC.
4951
trait HandleRPCResult<T> {
@@ -148,11 +150,15 @@ impl EstablishedClient {
148150
self.features.contains_key(feature)
149151
}
150152

151-
pub fn ensure_feature(&self, feature: &str) -> Result<(), Status> {
153+
pub fn ensure_feature_spec(&self, spec: &FeatureSpec) -> Result<(), MetaHandshakeError> {
154+
self.ensure_feature(spec.0)
155+
}
156+
157+
pub fn ensure_feature(&self, feature: &str) -> Result<(), MetaHandshakeError> {
152158
if self.has_feature(feature) {
153159
Ok(())
154160
} else {
155-
Err(Status::failed_precondition(format!(
161+
Err(MetaHandshakeError::new(format!(
156162
"Feature {} is not supported by the server; server:{{version: {}, features: {:?}}}",
157163
feature, self.server_protocol_version, self.features
158164
)))

src/meta/client/src/grpc_action.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use crate::message::GetClusterStatus;
4444
use crate::message::GetEndpoints;
4545
use crate::message::MakeEstablishedClient;
4646
use crate::message::Streamed;
47+
use crate::InitFlag;
4748

4849
/// Bind a request type to its corresponding response type.
4950
pub trait RequestFor: Clone + fmt::Debug {
@@ -185,6 +186,10 @@ impl RequestFor for WatchRequest {
185186
type Reply = tonic::codec::Streaming<WatchResponse>;
186187
}
187188

189+
impl RequestFor for (WatchRequest, InitFlag) {
190+
type Reply = tonic::codec::Streaming<WatchResponse>;
191+
}
192+
188193
impl RequestFor for ExportReq {
189194
type Reply = tonic::codec::Streaming<WatchResponse>;
190195
}

src/meta/client/src/grpc_client.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ use crate::grpc_action::RequestFor;
8888
use crate::grpc_metrics;
8989
use crate::message;
9090
use crate::message::Response;
91+
use crate::required::features;
9192
use crate::required::std;
9293
use crate::required::supported_features;
9394
use crate::required::Features;
@@ -365,6 +366,10 @@ impl MetaGrpcClient {
365366
let resp = self.watch(r).await;
366367
Response::Watch(resp)
367368
}
369+
message::Request::WatchWithInitialization((r, _)) => {
370+
let resp = self.watch_with_initialization(r).await;
371+
Response::WatchWithInitialization(resp)
372+
}
368373
message::Request::Export(r) => {
369374
let resp = self.export(r).await;
370375
Response::Export(resp)
@@ -726,6 +731,27 @@ impl MetaGrpcClient {
726731
Ok(res.into_inner())
727732
}
728733

734+
/// Create a watching stream that receives KV change events.
735+
///
736+
/// This method is similar to `watch`, but it also sends all existing key-values with `is_initialization=true`
737+
/// before starting the watch stream.
738+
#[fastrace::trace]
739+
#[async_backtrace::framed]
740+
pub(crate) async fn watch_with_initialization(
741+
&self,
742+
watch_request: WatchRequest,
743+
) -> Result<tonic::codec::Streaming<WatchResponse>, MetaClientError> {
744+
debug!("{}: handle watch request: {:?}", self, watch_request);
745+
746+
let mut client = self.get_established_client().await?;
747+
client.ensure_feature_spec(&features::WATCH)?;
748+
client.ensure_feature_spec(&features::WATCH_INITIAL_FLUSH)?;
749+
client.ensure_feature_spec(&features::WATCH_INIT_FLAG)?;
750+
751+
let res = client.watch(watch_request).await?;
752+
Ok(res.into_inner())
753+
}
754+
729755
/// Export all data in json from metasrv.
730756
#[fastrace::trace]
731757
#[async_backtrace::framed]

src/meta/client/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub use grpc_action::MetaGrpcReq;
3939
pub use grpc_action::RequestFor;
4040
pub use grpc_client::MetaGrpcClient;
4141
pub use message::ClientWorkerRequest;
42+
pub use message::InitFlag;
4243
pub use message::Streamed;
4344
pub use required::FeatureSpec;
4445
pub use required::VersionTuple;

src/meta/client/src/message.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ impl<T> Streamed<T> {
7171
}
7272
}
7373

74+
/// A marker type to indicate that the watch response should return a flag event indicating the end of initialization.
75+
#[derive(Debug, Clone)]
76+
pub struct InitFlag;
77+
7478
/// Meta-client handle-to-worker request body
7579
#[derive(Debug, Clone, derive_more::From)]
7680
pub enum Request {
@@ -89,6 +93,17 @@ pub enum Request {
8993
/// Watch KV changes, expecting a Stream that reports KV change events
9094
Watch(WatchRequest),
9195

96+
/// Watch KV changes with optional initialization flush of all existing values.
97+
///
98+
/// When `initial_flush` is set to true in the WatchRequest:
99+
/// - First sends all existing key-values with `is_initialization=true`
100+
/// - Then sends a special event with no data to mark the end of initialization
101+
/// - Finally streams real-time changes as they occur
102+
///
103+
/// This provides a reliable way to initialize client-side caches and then
104+
/// keep them synchronized with server state.
105+
WatchWithInitialization((WatchRequest, InitFlag)),
106+
92107
/// Export all data
93108
Export(ExportReq),
94109

@@ -113,6 +128,7 @@ impl Request {
113128
Request::Upsert(_) => "Upsert",
114129
Request::Txn(_) => "Txn",
115130
Request::Watch(_) => "Watch",
131+
Request::WatchWithInitialization(_) => "WatchWithInitialization",
116132
Request::Export(_) => "Export",
117133
Request::MakeEstablishedClient(_) => "MakeClient",
118134
Request::GetEndpoints(_) => "GetEndpoints",
@@ -130,6 +146,7 @@ pub enum Response {
130146
Upsert(Result<UpsertKVReply, MetaError>),
131147
Txn(Result<TxnReply, MetaError>),
132148
Watch(Result<tonic::codec::Streaming<WatchResponse>, MetaClientError>),
149+
WatchWithInitialization(Result<tonic::codec::Streaming<WatchResponse>, MetaClientError>),
133150
Export(Result<tonic::codec::Streaming<ExportedChunk>, MetaError>),
134151
MakeEstablishedClient(Result<EstablishedClient, MetaClientError>),
135152
GetEndpoints(Result<Vec<String>, MetaError>),
@@ -155,6 +172,9 @@ impl fmt::Debug for Response {
155172
Response::Watch(x) => {
156173
write!(f, "Watch({:?})", x)
157174
}
175+
Response::WatchWithInitialization(x) => {
176+
write!(f, "WatchWithInitialization({:?})", x)
177+
}
158178
Response::Export(x) => {
159179
write!(f, "Export({:?})", x)
160180
}
@@ -197,6 +217,10 @@ impl Response {
197217
.as_ref()
198218
.err()
199219
.map(|x| x as &(dyn std::error::Error + 'static)),
220+
Response::WatchWithInitialization(res) => res
221+
.as_ref()
222+
.err()
223+
.map(|x| x as &(dyn std::error::Error + 'static)),
200224
Response::Export(res) => res
201225
.as_ref()
202226
.err()

src/meta/client/src/required.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ pub mod features {
3434
pub const EXPORT_V1: FeatureSpec = ("export_v1", (1, 2, 315));
3535
pub const WATCH: FeatureSpec = ("watch", (1, 2, 259));
3636
pub const WATCH_INITIAL_FLUSH: FeatureSpec = ("watch/initial_flush", (1, 2, 677));
37+
/// WatchResponse contains a flag to indicate whether the event is initialization event or change event.
38+
pub const WATCH_INIT_FLAG: FeatureSpec = ("watch/init_flag", (1, 2, 736));
3739
pub const MEMBER_LIST: FeatureSpec = ("member_list", (1, 2, 259));
3840
pub const GET_CLUSTER_STATUS: FeatureSpec = ("get_cluster_status", (1, 2, 259));
3941
pub const GET_CLIENT_INFO: FeatureSpec = ("get_client_info", (1, 2, 259));
@@ -50,6 +52,7 @@ pub fn all() -> &'static [FeatureSpec] {
5052
features::EXPORT_V1,
5153
features::WATCH,
5254
features::WATCH_INITIAL_FLUSH,
55+
features::WATCH_INIT_FLAG,
5356
features::MEMBER_LIST,
5457
features::GET_CLUSTER_STATUS,
5558
features::GET_CLIENT_INFO,
@@ -81,6 +84,8 @@ pub fn std() -> &'static [FeatureSpec] {
8184
// features::EXPORT_V1,
8285
features::WATCH,
8386
features::WATCH_INITIAL_FLUSH,
87+
// MIN_METASRV_VER does not include this feature, thus it is optional
88+
// features::WATCH_INIT_FLAG,
8489
features::MEMBER_LIST,
8590
features::GET_CLUSTER_STATUS,
8691
features::GET_CLIENT_INFO,
@@ -110,6 +115,7 @@ pub fn read_write_watch() -> &'static [FeatureSpec] {
110115
features::TRANSACTION,
111116
features::WATCH,
112117
features::WATCH_INITIAL_FLUSH,
118+
features::WATCH_INIT_FLAG,
113119
];
114120

115121
REQUIRES

src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ async fn test_watch_initialization_flush() -> anyhow::Result<()> {
365365
filter_type: FilterType::All.into(),
366366
initial_flush: true,
367367
};
368-
client.request(watch).await?
368+
client.watch_with_initialization(watch).await?
369369
};
370370

371371
let cache = Arc::new(Mutex::new(BTreeMap::new()));

0 commit comments

Comments
 (0)