Skip to content

Commit 53a8f20

Browse files
Zhang Yanpodrmingdrmer
authored andcommitted
feat: add server-side KvTransaction gRPC API (client gated by feature flag)
The existing `Transaction` endpoint stores protobuf `TxnRequest` directly in the raft log, coupling wire format to persistence. Any proto schema change risks breaking raft log deserialization. The new `KvTransaction` endpoint introduces a three-layer architecture: protobuf transport types for the wire, Rust-native serde types (`kv_transaction::Transaction`) for raft log storage, and conversions between them. This lets the proto schema evolve freely without affecting stored data. The new API uses a cleaner branch-based model where each branch has an optional predicate and a list of operations, evaluated in order. This replaces the legacy dual execution paths, stringly-typed `execution_path`, and misleading `success` field of `TxnRequest`. The server now exposes the `KvTransaction` RPC endpoint, but the client does not use it by default — `ClientHandle::transaction_v2()` is gated behind the `transaction-v2` feature flag and only enabled in tests. Production client code continues to use the old `Transaction` API. The new endpoint will be adopted on the client side in a future change once the server has been deployed widely enough. The applier is rewritten to operate on `kv_transaction::*` types. The old `Cmd::Transaction(TxnRequest)` path converts to `kv_transaction::Transaction` internally and delegates to the same code. Both paths return `AppliedState::KvTxnReply(pb::KvTransactionReply)`. Changes: - Add `kv_transaction` module with `Transaction`, `Branch`, `Predicate`, `Condition`, `Operation`, `Operand`, `CompareOperator` storage types - Add `Cmd::KvTransaction(Transaction)` variant - Replace `AppliedState::TxnReply` with `AppliedState::KvTxnReply` - Add `KvTransaction` RPC using redirect pattern (not forward), returning leader endpoint in gRPC metadata for client-side redirect - Gate `ClientHandle::transaction_v2()` behind `transaction-v2` feature flag - Register `Feature::KvTransaction` at version 260217.0.0
1 parent 8d685cc commit 53a8f20

File tree

31 files changed

+2446
-245
lines changed

31 files changed

+2446
-245
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ byteorder = "1.5.0"
5555
chrono = { version = "0.4.40", features = ["serde"] }
5656
deepsize = "0.2.0"
5757
derive_more = { version = "2.1.1", features = ["full"] }
58-
display-more = "0.2.1"
58+
display-more = "0.2.5"
5959
env_logger = "0.11"
6060
fastrace = { version = "0.7.14", features = ["enable"] }
6161
feature-set = "0.1.1"

crates/client/client/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ thiserror = { workspace = true }
3838
tokio = { workspace = true }
3939
tonic = { workspace = true }
4040

41+
[features]
42+
transaction-v2 = []
43+
4144
[dev-dependencies]
4245
anyhow = { workspace = true }
4346
databend-meta-version = { workspace = true }

crates/client/client/src/client_handle.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,14 @@ impl<RT: SpawnApi> ClientHandle<RT> {
162162
self.request(txn).await.map_err(MetaError::from)
163163
}
164164

165+
#[cfg(any(test, feature = "transaction-v2"))]
166+
pub async fn transaction_v2(
167+
&self,
168+
txn: databend_meta_types::protobuf::KvTransactionRequest,
169+
) -> Result<databend_meta_types::protobuf::KvTransactionReply, MetaError> {
170+
self.request(txn).await.map_err(MetaError::from)
171+
}
172+
165173
pub async fn get_kv(&self, key: &str) -> Result<GetKVReply, MetaError> {
166174
let mut res = self.mget_kv(&[key.to_string()]).await?;
167175
Ok(res.pop().flatten())

crates/client/client/src/established_client.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,17 @@ impl EstablishedClient {
306306
self.client.transaction(request).await.update_client(self)
307307
}
308308

309+
#[async_backtrace::framed]
310+
pub async fn kv_transaction(
311+
&mut self,
312+
request: impl tonic::IntoRequest<pb::KvTransactionRequest>,
313+
) -> Result<Response<pb::KvTransactionReply>, Status> {
314+
self.client
315+
.kv_transaction(request)
316+
.await
317+
.update_client(self)
318+
}
319+
309320
#[async_backtrace::framed]
310321
pub async fn member_list(
311322
&mut self,

crates/client/client/src/grpc_action.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ use databend_meta_types::TxnRequest;
7979
use databend_meta_types::UpsertKV;
8080
use databend_meta_types::protobuf::ClientInfo;
8181
use databend_meta_types::protobuf::ClusterStatus;
82+
use databend_meta_types::protobuf::KvTransactionReply;
83+
use databend_meta_types::protobuf::KvTransactionRequest;
8284
use databend_meta_types::protobuf::MemberListReply;
8385
use databend_meta_types::protobuf::RaftRequest;
8486
use databend_meta_types::protobuf::StreamItem;
@@ -256,6 +258,10 @@ impl RequestFor for TxnRequest {
256258
type Reply = TxnReply;
257259
}
258260

261+
impl RequestFor for KvTransactionRequest {
262+
type Reply = KvTransactionReply;
263+
}
264+
259265
impl RequestFor for GetClusterStatus {
260266
type Reply = ClusterStatus;
261267
}

crates/client/client/src/grpc_client.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,10 @@ impl<RT: RuntimeApi> MetaGrpcClient<RT> {
349349
let resp = self.transaction(r).await;
350350
Response::Txn(resp)
351351
}
352+
message::Request::KvTransaction(r) => {
353+
let resp = self.transaction_v2(r).await;
354+
Response::KvTransaction(resp)
355+
}
352356
message::Request::Watch(r) => {
353357
let resp = self.watch(r).await;
354358
Response::Watch(resp)
@@ -859,6 +863,43 @@ impl<RT: RuntimeApi> MetaGrpcClient<RT> {
859863
Err(net_err.into())
860864
}
861865

866+
#[fastrace::trace]
867+
#[async_backtrace::framed]
868+
pub(crate) async fn transaction_v2(
869+
&self,
870+
txn: pb::KvTransactionRequest,
871+
) -> Result<pb::KvTransactionReply, MetaClientError> {
872+
debug!("{self}::transaction_v2 request: {txn:?}");
873+
874+
let mut rpc_handler = RpcHandler::new(self);
875+
876+
for _i in 0..RPC_RETRIES {
877+
let req = RT::prepare_request(Request::new(txn.clone()));
878+
879+
let established = rpc_handler.new_established_client().await?;
880+
881+
let result = established
882+
.kv_transaction(req)
883+
.inspect_elapsed_over(threshold(), info_spent("transaction_v2"))
884+
.await;
885+
886+
let retryable = rpc_handler.process_response_result(&txn, result)?;
887+
888+
let response = match retryable {
889+
ResponseAction::Success(resp) => resp,
890+
ResponseAction::ShouldRetry => {
891+
continue;
892+
}
893+
};
894+
895+
let reply = response.into_inner();
896+
return Ok(reply);
897+
}
898+
899+
let net_err = rpc_handler.create_network_error();
900+
Err(net_err.into())
901+
}
902+
862903
fn get_current_endpoint(&self) -> Option<String> {
863904
let es = self.endpoints.lock();
864905
es.current().map(|x| x.to_string())

crates/client/client/src/message.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use databend_meta_types::TxnRequest;
2222
use databend_meta_types::protobuf::ClientInfo;
2323
use databend_meta_types::protobuf::ClusterStatus;
2424
use databend_meta_types::protobuf::ExportedChunk;
25+
use databend_meta_types::protobuf::KvTransactionReply;
26+
use databend_meta_types::protobuf::KvTransactionRequest;
2527
use databend_meta_types::protobuf::MemberListReply;
2628
use databend_meta_types::protobuf::StreamItem;
2729
use databend_meta_types::protobuf::WatchRequest;
@@ -89,6 +91,9 @@ pub enum Request {
8991
/// Run a transaction on remote
9092
Txn(TxnRequest),
9193

94+
/// Run a kv_transaction on remote (new API)
95+
KvTransaction(KvTransactionRequest),
96+
9297
/// Watch KV changes, expecting a Stream that reports KV change events
9398
Watch(WatchRequest),
9499

@@ -129,6 +134,7 @@ impl Request {
129134
Request::StreamedGetMany(_) => "StreamGetMany",
130135
Request::StreamList(_) => "StreamList",
131136
Request::Txn(_) => "Txn",
137+
Request::KvTransaction(_) => "KvTransaction",
132138
Request::Watch(_) => "Watch",
133139
Request::WatchWithInitialization(_) => "WatchWithInitialization",
134140
Request::Export(_) => "Export",
@@ -153,6 +159,7 @@ pub enum Response {
153159
),
154160
StreamList(Result<BoxStream<StreamItem>, MetaError>),
155161
Txn(Result<TxnReply, MetaClientError>),
162+
KvTransaction(Result<KvTransactionReply, MetaClientError>),
156163
Watch(Result<tonic::codec::Streaming<WatchResponse>, MetaClientError>),
157164
WatchWithInitialization(Result<tonic::codec::Streaming<WatchResponse>, MetaClientError>),
158165
Export(Result<tonic::codec::Streaming<ExportedChunk>, MetaClientError>),
@@ -178,6 +185,9 @@ impl fmt::Debug for Response {
178185
Response::Txn(x) => {
179186
write!(f, "Txn({:?})", x)
180187
}
188+
Response::KvTransaction(x) => {
189+
write!(f, "KvTransaction({:?})", x)
190+
}
181191
Response::Watch(x) => {
182192
write!(f, "Watch({:?})", x)
183193
}
@@ -221,6 +231,7 @@ impl Response {
221231
Response::StreamedGetMany(res) => to_err(res),
222232
Response::StreamList(res) => to_err(res),
223233
Response::Txn(res) => to_err(res),
234+
Response::KvTransaction(res) => to_err(res),
224235
Response::Watch(res) => to_err(res),
225236
Response::WatchWithInitialization(res) => to_err(res),
226237
Response::Export(res) => to_err(res),

crates/client/client/tests/it/grpc_server.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::time::Duration;
1919
use databend_meta_client::MIN_SERVER_VERSION;
2020
use databend_meta_runtime_api::SpawnApi;
2121
use databend_meta_runtime_api::TokioRuntime;
22+
use databend_meta_types::protobuf as pb;
2223
use databend_meta_types::protobuf::ClientInfo;
2324
use databend_meta_types::protobuf::ClusterStatus;
2425
use databend_meta_types::protobuf::Empty;
@@ -144,6 +145,13 @@ impl MetaService for GrpcServiceForTestImpl {
144145
unimplemented!()
145146
}
146147

148+
async fn kv_transaction(
149+
&self,
150+
_request: Request<pb::KvTransactionRequest>,
151+
) -> Result<Response<pb::KvTransactionReply>, Status> {
152+
unimplemented!()
153+
}
154+
147155
async fn member_list(
148156
&self,
149157
_request: Request<MemberListRequest>,

crates/common/types/build.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,14 @@ fn build_proto() {
130130
"TxnReply",
131131
"#[derive(Eq, serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]",
132132
)
133+
.type_attribute(
134+
"KvTransactionRequest",
135+
"#[derive(Eq, deepsize::DeepSizeOf)]",
136+
)
137+
.type_attribute(
138+
"KvTransactionReply",
139+
"#[derive(Eq, serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]",
140+
)
133141
.type_attribute(
134142
"WatchRequest",
135143
"#[derive(Eq, deepsize::DeepSizeOf)]",

crates/common/types/proto/meta.proto

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,31 @@ message TxnReply {
261261
string execution_path = 4;
262262
}
263263

264+
// A transaction request with cleaner branch-based structure.
265+
//
266+
// Replaces the legacy `TxnRequest` which couples the wire format to the raft log
267+
// storage format and carries historical complexity (dual execution paths,
268+
// stringly-typed `execution_path`, misleading `success` field).
269+
//
270+
// Each branch is a ConditionalOperation: optional predicate + operations.
271+
// Branches are evaluated in order; the first matching branch is executed.
272+
// A branch with no predicate is unconditional (use as "else" at the end).
273+
//
274+
// Since: 2025-02-17, version 260217.0.0
275+
message KvTransactionRequest {
276+
repeated ConditionalOperation branches = 1;
277+
}
278+
279+
// Response from KvTransaction.
280+
// Since: 2025-02-17, version 260217.0.0
281+
message KvTransactionReply {
282+
// Index of the branch that was executed, or absent if no branch matched.
283+
optional uint32 executed_branch = 1;
284+
285+
// Responses from the operations in the executed branch.
286+
repeated TxnOpResponse responses = 2;
287+
}
288+
264289
message ClusterStatus {
265290
uint64 id = 1;
266291
string binary_version = 2;
@@ -523,6 +548,10 @@ service MetaService {
523548

524549
rpc Transaction(TxnRequest) returns (TxnReply);
525550

551+
// Branch-based transaction API, replacing the legacy Transaction RPC.
552+
// Since: 2025-02-17, version 260217.0.0
553+
rpc KvTransaction(KvTransactionRequest) returns (KvTransactionReply);
554+
526555
// Get MetaSrv member list endpoints
527556
rpc MemberList(MemberListRequest) returns (MemberListReply);
528557

0 commit comments

Comments
 (0)