Skip to content

Commit 88554d9

Browse files
Zhang Yanpodrmingdrmer
authored andcommitted
refactor: extract leader_or_forward() from gRPC handler functions
Three handlers (`handle_kv_transaction`, `handle_kv_list`, `handle_kv_get_many`) duplicated the same 6-line "assume leader or return forward-to-leader Status" block. Extract it into `leader_or_forward()` so each handler reduces to a single `self.leader_or_forward().await?` call. The post-write forward check in `handle_kv_transaction` is intentionally kept inline — it handles a different error path (`ClientWriteError::ForwardToLeader`) for leadership changes that occur between the assume check and the raft write.
1 parent 6fa45df commit 88554d9

File tree

4 files changed

+21
-26
lines changed

4 files changed

+21
-26
lines changed

crates/common/types/src/cmd/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ mod tests {
179179
let want = concat!(
180180
r#"{"Transaction":{"#,
181181
r#""condition":[{"key":"k","expected":0,"target":{"Value":[118]}}],"#,
182-
r#""if_then":[{"request":{"Put":{"key":"k","value":[118],"expire_at":null,"ttl_ms":100}}}],"#,
182+
r#""if_then":[{"request":{"Put":{"key":"k","value":[118],"expire_at":null,"ttl_ms":100,"match_seq":null}}}],"#,
183183
r#""else_then":[]"#,
184184
r#"}}"#
185185
);

crates/common/types/tests/it/txn_serde.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ fn test_txn_request_serde() -> anyhow::Result<()> {
2828
let want = concat!(
2929
r#"{"#,
3030
r#""condition":[{"key":"k","expected":0,"target":{"Value":[118]}}],"#,
31-
r#""if_then":[{"request":{"Put":{"key":"k","value":[118],"expire_at":null,"ttl_ms":100}}}],"#,
31+
r#""if_then":[{"request":{"Put":{"key":"k","value":[118],"expire_at":null,"ttl_ms":100,"match_seq":null}}}],"#,
3232
r#""else_then":[]"#,
3333
r#"}"#
3434
);

crates/server/service/src/meta_node/meta_node.rs

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1639,20 +1639,27 @@ impl<SP: SpawnApi> MetaNode<SP> {
16391639
Endpoint::parse(addr).ok()
16401640
}
16411641

1642+
/// Return a `MetaLeader` if this node is the leader.
1643+
///
1644+
/// Otherwise, resolve the leader endpoint and return a forward-to-leader gRPC `Status`.
1645+
async fn leader_or_forward(&self) -> Result<MetaLeader<'_, SP>, Status> {
1646+
match self.assume_leader().await {
1647+
Ok(leader) => Ok(leader),
1648+
Err(forward) => {
1649+
let endpoint = self.get_leader_endpoint(forward.leader_id).await;
1650+
Err(GrpcHelper::status_forward_to_leader(endpoint.as_ref()))
1651+
}
1652+
}
1653+
}
1654+
16421655
/// Handle KvTransaction request. Must be leader to process.
16431656
///
16441657
/// If this node is not the leader, returns a `Status` error with leader endpoint in metadata.
16451658
pub async fn handle_kv_transaction(
16461659
&self,
16471660
txn: kv_transaction::Transaction,
16481661
) -> Result<KvTransactionReply, Status> {
1649-
let leader = match self.assume_leader().await {
1650-
Ok(leader) => leader,
1651-
Err(forward) => {
1652-
let endpoint = self.get_leader_endpoint(forward.leader_id).await;
1653-
return Err(GrpcHelper::status_forward_to_leader(endpoint.as_ref()));
1654-
}
1655-
};
1662+
let leader = self.leader_or_forward().await?;
16561663

16571664
let entry = LogEntry::new(Cmd::KvTransaction(txn));
16581665
let applied = match leader.write(entry).await {
@@ -1678,13 +1685,7 @@ impl<SP: SpawnApi> MetaNode<SP> {
16781685
prefix: String,
16791686
limit: Option<u64>,
16801687
) -> Result<BoxStream<'static, Result<StreamItem, Status>>, Status> {
1681-
let leader = match self.assume_leader().await {
1682-
Ok(leader) => leader,
1683-
Err(forward) => {
1684-
let endpoint = self.get_leader_endpoint(forward.leader_id).await;
1685-
return Err(GrpcHelper::status_forward_to_leader(endpoint.as_ref()));
1686-
}
1687-
};
1688+
let leader = self.leader_or_forward().await?;
16881689

16891690
let strm = leader
16901691
.kv_list(&prefix, limit)
@@ -1702,13 +1703,7 @@ impl<SP: SpawnApi> MetaNode<SP> {
17021703
&self,
17031704
input: impl Stream<Item = Result<KvGetManyRequest, Status>> + Send + 'static,
17041705
) -> Result<BoxStream<'static, Result<StreamItem, Status>>, Status> {
1705-
let leader = match self.assume_leader().await {
1706-
Ok(leader) => leader,
1707-
Err(forward) => {
1708-
let endpoint = self.get_leader_endpoint(forward.leader_id).await;
1709-
return Err(GrpcHelper::status_forward_to_leader(endpoint.as_ref()));
1710-
}
1711-
};
1706+
let leader = self.leader_or_forward().await?;
17121707

17131708
let strm = leader
17141709
.kv_get_many(input)

crates/server/service/tests/it/grpc/metasrv_grpc_export.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,9 @@ async fn test_export() -> anyhow::Result<()> {
8787
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":1},"payload":"Blank"}}]"#,
8888
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":2},"payload":{"Normal":{"time_ms":1111111111111,"cmd":{"AddNode":{"node_id":0,"node":{"name":"0","endpoint":{"addr":"localhost","port":29000},"grpc_api_advertise_address":"127.0.0.1:29000"},"overriding":false}}}}}}]"#,
8989
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":3},"payload":{"Membership":{"configs":[[0]],"nodes":{"0":{}}}}}}]"#,
90-
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":4},"payload":{"Normal":{"time_ms":1111111111111,"cmd":{"Transaction":{"condition":[{"key":"foo","expected":2,"target":{"Seq":0}}],"if_then":[{"request":{"Put":{"key":"foo","value":[102,111,111],"expire_at":null}}}],"else_then":[{"request":{"Get":{"key":"foo"}}}]}}}}}}]"#,
91-
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":5},"payload":{"Normal":{"time_ms":1111111111111,"cmd":{"Transaction":{"condition":[{"key":"bar","expected":2,"target":{"Seq":0}}],"if_then":[{"request":{"Put":{"key":"bar","value":[98,97,114],"expire_at":null}}}],"else_then":[{"request":{"Get":{"key":"bar"}}}]}}}}}}]"#,
92-
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":6},"payload":{"Normal":{"time_ms":1111111111111,"cmd":{"Transaction":{"condition":[{"key":"wow","expected":2,"target":{"Seq":0}}],"if_then":[{"request":{"Put":{"key":"wow","value":[119,111,119],"expire_at":null}}}],"else_then":[{"request":{"Get":{"key":"wow"}}}]}}}}}}]"#,
90+
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":4},"payload":{"Normal":{"time_ms":1111111111111,"cmd":{"Transaction":{"condition":[{"key":"foo","expected":2,"target":{"Seq":0}}],"if_then":[{"request":{"Put":{"key":"foo","value":[102,111,111],"expire_at":null,"match_seq":null}}}],"else_then":[{"request":{"Get":{"key":"foo"}}}]}}}}}}]"#,
91+
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":5},"payload":{"Normal":{"time_ms":1111111111111,"cmd":{"Transaction":{"condition":[{"key":"bar","expected":2,"target":{"Seq":0}}],"if_then":[{"request":{"Put":{"key":"bar","value":[98,97,114],"expire_at":null,"match_seq":null}}}],"else_then":[{"request":{"Get":{"key":"bar"}}}]}}}}}}]"#,
92+
r#"["raft_log",{"LogEntry":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":6},"payload":{"Normal":{"time_ms":1111111111111,"cmd":{"Transaction":{"condition":[{"key":"wow","expected":2,"target":{"Seq":0}}],"if_then":[{"request":{"Put":{"key":"wow","value":[119,111,119],"expire_at":null,"match_seq":null}}}],"else_then":[{"request":{"Get":{"key":"wow"}}}]}}}}}}]"#,
9393
r#"["state_machine/0",{"Sequences":{"key":"generic-kv","value":3}}]"#,
9494
r#"["state_machine/0",{"StateMachineMeta":{"key":"LastApplied","value":{"LogId":{"leader_id":{"term":1,"node_id":0},"index":6}}}}]"#,
9595
r#"["state_machine/0",{"StateMachineMeta":{"key":"LastMembership","value":{"Membership":{"log_id":{"leader_id":{"term":1,"node_id":0},"index":3},"membership":{"configs":[[0]],"nodes":{"0":{}}}}}}}]"#,

0 commit comments

Comments
 (0)