Skip to content

Commit 0c2f9b7

Browse files
authored
feat(meta-service): return current seq-value in txn-put response (#18136)
* feat(meta-service): return current seq-value in txn-put response TxnPutResponse now returns both the value before and after the put operation. Each returned value is a `SeqV` struct containing the sequence number and value in bytes. This is a backward-compatible change as protobuf-based clients will ignore the new field. * chore: normalize values before compare for kv-api tests
1 parent 508dbf3 commit 0c2f9b7

File tree

6 files changed

+135
-51
lines changed

6 files changed

+135
-51
lines changed

โ€Žsrc/meta/client/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,13 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
144144
/// - 2025-05-08: since 1.2.736
145145
/// ๐Ÿ–ฅ server: add `WatchResponse::is_initialization`,
146146
///
147-
/// - 2025-06-09: since TODO: update when merged
147+
/// - 2025-06-09: since 1.2.755
148148
/// ๐Ÿ–ฅ server: remove `TxnReply::error`
149149
///
150+
/// - 2025-06-11: since TODO: update when merge
151+
/// ๐Ÿ–ฅ server: add `TxnPutResponse::current`
152+
///
153+
///
150154
/// Server feature set:
151155
/// ```yaml
152156
/// server_features:

โ€Žsrc/meta/kvapi/src/kvapi/test_suite.rs

Lines changed: 113 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -491,8 +491,12 @@ impl kvapi::TestSuite {
491491
let responses = &reply.responses;
492492
assert_eq!(responses.len(), expected.len());
493493

494+
let responses = normalize_txn_response(responses.clone());
495+
let expected = normalize_txn_response(expected.to_vec());
496+
494497
for i in 0..responses.len() {
495498
let resp = &responses[i];
499+
496500
let expect_resp = &expected[i];
497501

498502
assert_eq!(resp, expect_resp, "{}-th response", i);
@@ -521,6 +525,7 @@ impl kvapi::TestSuite {
521525
response: Some(txn_op_response::Response::Put(TxnPutResponse {
522526
key: txn_key.clone(),
523527
prev_value: None,
528+
current: Some(pb::SeqV::new(1, b"new_v1".to_vec())),
524529
})),
525530
}];
526531

@@ -659,6 +664,7 @@ impl kvapi::TestSuite {
659664
response: Some(txn_op_response::Response::Put(TxnPutResponse {
660665
key: txn_key.clone(),
661666
prev_value: Some(pb::SeqV::from(SeqV::new(1, val1.clone()))),
667+
current: Some(pb::SeqV::new(2, b("new_v1"))),
662668
})),
663669
}];
664670

@@ -766,14 +772,16 @@ impl kvapi::TestSuite {
766772
TxnOpResponse {
767773
response: Some(txn_op_response::Response::Put(TxnPutResponse {
768774
key: txn_key1.clone(),
769-
prev_value: Some(pb::SeqV::from(SeqV::new(4, val1.clone()))),
775+
prev_value: Some(pb::SeqV::new(4, val1.clone())),
776+
current: Some(pb::SeqV::new(6, val1_new.clone())),
770777
})),
771778
},
772779
// change k2
773780
TxnOpResponse {
774781
response: Some(txn_op_response::Response::Put(TxnPutResponse {
775782
key: txn_key2.clone(),
776-
prev_value: Some(pb::SeqV::from(SeqV::new(5, val2.clone()))),
783+
prev_value: Some(pb::SeqV::new(5, val2.clone())),
784+
current: Some(pb::SeqV::new(7, b("new_v2").clone())),
777785
})),
778786
},
779787
// get k1
@@ -813,35 +821,6 @@ impl kvapi::TestSuite {
813821

814822
// 4th case: get one key by value and set key transaction
815823
{
816-
/// Convert Some(KvMeta{ expire: None }) to None to simplify the comparison
817-
fn norm(vs: Vec<TxnOpResponse>) -> Vec<TxnOpResponse> {
818-
vs.into_iter()
819-
.map(|mut v| {
820-
//
821-
match &mut v.response {
822-
Some(Response::Get(TxnGetResponse {
823-
value: Some(pb::SeqV { meta, .. }),
824-
..
825-
})) => {
826-
if *meta == Some(pb::KvMeta { expire_at: None }) {
827-
*meta = None;
828-
}
829-
}
830-
Some(Response::Put(TxnPutResponse {
831-
prev_value: Some(pb::SeqV { meta, .. }),
832-
..
833-
})) => {
834-
if *meta == Some(pb::KvMeta { expire_at: None }) {
835-
*meta = None;
836-
}
837-
}
838-
_ => {}
839-
}
840-
v
841-
})
842-
.collect()
843-
}
844-
845824
let k1 = "txn_4_K1";
846825

847826
kv.upsert_kv(UpsertKV::update(k1, b"v1")).await?;
@@ -872,12 +851,19 @@ impl kvapi::TestSuite {
872851
let resp = kv.transaction(txn).await?;
873852

874853
let expected: Vec<TxnOpResponse> = vec![
875-
TxnOpResponse::put(k1, Some(pb::SeqV::new(8, b("v1")))),
854+
TxnOpResponse::put(
855+
k1,
856+
Some(pb::SeqV::new(8, b("v1"))),
857+
Some(pb::SeqV::new(9, b("v2"))),
858+
),
876859
TxnOpResponse::get(k1, Some(SeqV::new(9, b("v2")))),
877860
];
878861

879862
assert_eq!(resp.success, true);
880-
assert_eq!(norm(resp.responses), norm(expected));
863+
assert_eq!(
864+
normalize_txn_response(resp.responses),
865+
normalize_txn_response(expected)
866+
);
881867

882868
// Test less than value: success = false
883869

@@ -893,7 +879,10 @@ impl kvapi::TestSuite {
893879
vec![TxnOpResponse::get(k1, Some(SeqV::new(9, b("v2"))))];
894880

895881
assert_eq!(resp.success, false);
896-
assert_eq!(norm(resp.responses), norm(expected));
882+
assert_eq!(
883+
normalize_txn_response(resp.responses),
884+
normalize_txn_response(expected)
885+
);
897886

898887
// Test less than value: success = true
899888

@@ -906,12 +895,19 @@ impl kvapi::TestSuite {
906895
let resp = kv.transaction(txn).await?;
907896

908897
let expected: Vec<TxnOpResponse> = vec![
909-
TxnOpResponse::put(k1, Some(pb::SeqV::new(9, b("v2")))),
898+
TxnOpResponse::put(
899+
k1,
900+
Some(pb::SeqV::new(9, b("v2"))),
901+
Some(pb::SeqV::new(10, b("v3"))),
902+
),
910903
TxnOpResponse::get(k1, Some(SeqV::new(10, b("v3")))),
911904
];
912905

913906
assert_eq!(resp.success, true);
914-
assert_eq!(norm(resp.responses), norm(expected));
907+
assert_eq!(
908+
normalize_txn_response(resp.responses),
909+
normalize_txn_response(expected)
910+
);
915911

916912
// Test less equal value: success = false
917913

@@ -927,7 +923,10 @@ impl kvapi::TestSuite {
927923
vec![TxnOpResponse::get(k1, Some(SeqV::new(10, b("v3"))))];
928924

929925
assert_eq!(resp.success, false);
930-
assert_eq!(norm(resp.responses), norm(expected));
926+
assert_eq!(
927+
normalize_txn_response(resp.responses),
928+
normalize_txn_response(expected)
929+
);
931930

932931
// Test less equal value: success = true
933932

@@ -940,12 +939,19 @@ impl kvapi::TestSuite {
940939
let resp = kv.transaction(txn).await?;
941940

942941
let expected: Vec<TxnOpResponse> = vec![
943-
TxnOpResponse::put(k1, Some(pb::SeqV::new(10, b("v3")))),
942+
TxnOpResponse::put(
943+
k1,
944+
Some(pb::SeqV::new(10, b("v3"))),
945+
Some(pb::SeqV::new(11, b("v4"))),
946+
),
944947
TxnOpResponse::get(k1, Some(SeqV::new(11, b("v4")))),
945948
];
946949

947950
assert_eq!(resp.success, true);
948-
assert_eq!(norm(resp.responses), norm(expected));
951+
assert_eq!(
952+
normalize_txn_response(resp.responses),
953+
normalize_txn_response(expected)
954+
);
949955

950956
// Test greater than value: success = false
951957

@@ -961,7 +967,10 @@ impl kvapi::TestSuite {
961967
vec![TxnOpResponse::get(k1, Some(SeqV::new(11, b("v4"))))];
962968

963969
assert_eq!(resp.success, false);
964-
assert_eq!(norm(resp.responses), norm(expected));
970+
assert_eq!(
971+
normalize_txn_response(resp.responses),
972+
normalize_txn_response(expected)
973+
);
965974

966975
// Test greater than value: success = true
967976

@@ -974,12 +983,19 @@ impl kvapi::TestSuite {
974983
let resp = kv.transaction(txn).await?;
975984

976985
let expected: Vec<TxnOpResponse> = vec![
977-
TxnOpResponse::put(k1, Some(pb::SeqV::new(11, b("v4")))),
986+
TxnOpResponse::put(
987+
k1,
988+
Some(pb::SeqV::new(11, b("v4"))),
989+
Some(pb::SeqV::new(12, b("v5"))),
990+
),
978991
TxnOpResponse::get(k1, Some(SeqV::new(12, b("v5")))),
979992
];
980993

981994
assert_eq!(resp.success, true);
982-
assert_eq!(norm(resp.responses), norm(expected));
995+
assert_eq!(
996+
normalize_txn_response(resp.responses),
997+
normalize_txn_response(expected)
998+
);
983999

9841000
// Test greater equal value: success = false
9851001

@@ -995,7 +1011,10 @@ impl kvapi::TestSuite {
9951011
vec![TxnOpResponse::get(k1, Some(SeqV::new(12, b("v5"))))];
9961012

9971013
assert_eq!(resp.success, false);
998-
assert_eq!(norm(resp.responses), norm(expected));
1014+
assert_eq!(
1015+
normalize_txn_response(resp.responses),
1016+
normalize_txn_response(expected)
1017+
);
9991018

10001019
// Test greater equal value: success = true
10011020

@@ -1008,12 +1027,19 @@ impl kvapi::TestSuite {
10081027
let resp = kv.transaction(txn).await?;
10091028

10101029
let expected: Vec<TxnOpResponse> = vec![
1011-
TxnOpResponse::put(k1, Some(pb::SeqV::new(12, b("v5")))),
1030+
TxnOpResponse::put(
1031+
k1,
1032+
Some(pb::SeqV::new(12, b("v5"))),
1033+
Some(pb::SeqV::new(13, b("v6"))),
1034+
),
10121035
TxnOpResponse::get(k1, Some(SeqV::new(13, b("v6")))),
10131036
];
10141037

10151038
assert_eq!(resp.success, true);
1016-
assert_eq!(norm(resp.responses), norm(expected));
1039+
assert_eq!(
1040+
normalize_txn_response(resp.responses),
1041+
normalize_txn_response(expected)
1042+
);
10171043
}
10181044
Ok(())
10191045
}
@@ -1410,6 +1436,48 @@ impl kvapi::TestSuite {
14101436
}
14111437
}
14121438

1439+
/// Convert Some(KvMeta{ expire: None }) to None to simplify the comparison
1440+
fn normalize_txn_response(vs: Vec<TxnOpResponse>) -> Vec<TxnOpResponse> {
1441+
vs.into_iter()
1442+
.map(|mut v| {
1443+
//
1444+
match &mut v.response {
1445+
Some(Response::Get(TxnGetResponse {
1446+
value: Some(pb::SeqV { meta, .. }),
1447+
..
1448+
})) => {
1449+
if *meta == Some(pb::KvMeta { expire_at: None }) {
1450+
*meta = None;
1451+
}
1452+
}
1453+
Some(Response::Put(TxnPutResponse {
1454+
prev_value: Some(pb::SeqV { meta, .. }),
1455+
..
1456+
})) => {
1457+
if *meta == Some(pb::KvMeta { expire_at: None }) {
1458+
*meta = None;
1459+
}
1460+
}
1461+
_ => {}
1462+
}
1463+
1464+
match &mut v.response {
1465+
Some(Response::Put(TxnPutResponse {
1466+
current: Some(pb::SeqV { meta, .. }),
1467+
..
1468+
})) => {
1469+
if *meta == Some(pb::KvMeta { expire_at: None }) {
1470+
*meta = None;
1471+
}
1472+
}
1473+
_ => {}
1474+
}
1475+
1476+
v
1477+
})
1478+
.collect()
1479+
}
1480+
14131481
fn b(x: impl ToString) -> Vec<u8> {
14141482
x.to_string().as_bytes().to_vec()
14151483
}

โ€Žsrc/meta/raft-store/src/applier.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,11 +470,12 @@ where SM: StateMachineApi + 'static
470470
put.ttl_ms.map(Interval::from_millis),
471471
));
472472

473-
let (prev, _result) = self.upsert_kv(&upsert).await?;
473+
let (prev, result) = self.upsert_kv(&upsert).await?;
474474

475475
let put_resp = TxnPutResponse {
476476
key: put.key.clone(),
477477
prev_value: prev.map(pb::SeqV::from),
478+
current: result.map(pb::SeqV::from),
478479
};
479480

480481
resp.responses.push(TxnOpResponse {

โ€Žsrc/meta/types/proto/request.proto

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,12 @@ message TxnPutRequest {
5959

6060
message TxnPutResponse {
6161
string key = 1;
62+
63+
// The value before put
6264
optional SeqV prev_value = 2;
65+
66+
// The value after put
67+
optional SeqV current = 3;
6368
}
6469

6570
// Delete request and response
@@ -124,7 +129,7 @@ message TxnDeleteResponse {
124129
}
125130

126131
// Delete by prefix request and response
127-
message TxnDeleteByPrefixRequest { string prefix = 1; }
132+
message TxnDeleteByPrefixRequest {string prefix = 1;}
128133

129134
message TxnDeleteByPrefixResponse {
130135
string prefix = 1;

โ€Žsrc/meta/types/src/proto_display/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,10 @@ impl Display for TxnPutResponse {
270270
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
271271
write!(
272272
f,
273-
"Put-resp: key={}, prev_seq={:?}",
273+
"Put-resp: key={}, prev_seq={}, current_seq={}",
274274
self.key,
275-
self.prev_value.as_ref().map(|x| x.seq)
275+
self.prev_value.as_ref().map(|x| x.seq).display(),
276+
self.current.as_ref().map(|x| x.seq).display()
276277
)
277278
}
278279
}

โ€Žsrc/meta/types/src/proto_ext/txn_ext.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,11 +329,16 @@ impl pb::TxnOpResponse {
329329
}
330330

331331
/// Create a new `TxnOpResponse` of a `Put` operation.
332-
pub fn put(key: impl ToString, prev_value: Option<pb::SeqV>) -> Self {
332+
pub fn put(
333+
key: impl ToString,
334+
prev_value: Option<pb::SeqV>,
335+
current: Option<pb::SeqV>,
336+
) -> Self {
333337
pb::TxnOpResponse {
334338
response: Some(pb::txn_op_response::Response::Put(pb::TxnPutResponse {
335339
key: key.to_string(),
336340
prev_value,
341+
current,
337342
})),
338343
}
339344
}

0 commit comments

Comments
ย (0)