Skip to content

Commit 3372b48

Browse files
committed
refactor(meta): handle oversized AppendEntries payloads with retry loop
In openraft 0.10.0, `PayloadTooLarge` error was removed. This change implements automatic payload size management in `append_entries()` by reducing entry count and retrying when payloads exceed gRPC limits. Changes: - Add retry loop in `append_entries()` to handle oversized payloads - Add `build_partial_append_request()` helper for creating partial requests - Check payload size before sending against `advisory_encoding_size()` - Handle `tonic::Code::ResourceExhausted` by reducing entries and retrying - Return `Unreachable` error when single entry exceeds size limit - Remove obsolete `new_append_entries_raft_req()` function
1 parent 1c1e787 commit 3372b48

File tree

1 file changed

+85
-78
lines changed

1 file changed

+85
-78
lines changed

src/meta/service/src/network.rs

Lines changed: 85 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,13 @@ use backon::BackoffBuilder;
2323
use backon::ExponentialBuilder;
2424
use databend_common_base::base::tokio;
2525
use databend_common_base::base::tokio::sync::mpsc;
26-
use databend_common_base::base::tokio::time::Instant;
2726
use databend_common_base::future::TimedFutureExt;
2827
use databend_common_base::runtime;
2928
use databend_common_base::runtime::spawn_named;
3029
use databend_common_meta_raft_store::leveled_store::persisted_codec::PersistedCodec;
3130
use databend_common_meta_sled_store::openraft;
3231
use databend_common_meta_sled_store::openraft::MessageSummary;
3332
use databend_common_meta_sled_store::openraft::RaftNetworkFactory;
34-
// PayloadTooLarge was removed in openraft 0.10.0
35-
// TODO: Check if payload size limiting is still needed in the new API
3633
use databend_common_meta_sled_store::openraft::error::ReplicationClosed;
3734
use databend_common_meta_sled_store::openraft::network::RPCOption;
3835
use databend_common_meta_sled_store::openraft::network::v2::RaftNetworkV2;
@@ -43,7 +40,6 @@ use databend_common_meta_types::MetaNetworkError;
4340
use databend_common_meta_types::protobuf as pb;
4441
use databend_common_meta_types::protobuf::InstallEntryV004;
4542
use databend_common_meta_types::protobuf::RaftReply;
46-
use databend_common_meta_types::protobuf::RaftRequest;
4743
use databend_common_meta_types::protobuf::SnapshotChunkRequestV003;
4844
use databend_common_meta_types::raft_types::AppendEntriesRequest;
4945
use databend_common_meta_types::raft_types::AppendEntriesResponse;
@@ -279,59 +275,31 @@ impl Network {
279275
RPCError::Unreachable(Unreachable::new(&e))
280276
}
281277

282-
/// Create a new RaftRequest for AppendEntriesRequest,
283-
/// if it is too large, return `PayloadTooLarge` error
284-
/// to tell Openraft to split it in to smaller chunks.
285-
fn new_append_entries_raft_req<E>(
286-
&self,
287-
rpc: &AppendEntriesRequest,
288-
) -> Result<RaftRequest, RPCError<E>>
289-
where
290-
E: std::error::Error,
291-
{
292-
let start = Instant::now();
293-
let raft_req = GrpcHelper::encode_raft_request(rpc).map_err(|e| Unreachable::new(&e))?;
294-
debug!(
295-
"Raft NetworkConnection: new_append_entries_raft_req() encode_raft_request: target={}, elapsed={:?}",
296-
self.target,
297-
start.elapsed()
298-
);
299-
300-
if raft_req.data.len() <= GrpcConfig::advisory_encoding_size() {
301-
return Ok(raft_req);
278+
/// Build a partial AppendEntriesRequest with only the first `n` entries.
279+
fn build_partial_append_request(
280+
original: &AppendEntriesRequest,
281+
n: usize,
282+
) -> AppendEntriesRequest {
283+
AppendEntriesRequest {
284+
vote: original.vote,
285+
prev_log_id: original.prev_log_id,
286+
leader_commit: original.leader_commit,
287+
entries: original.entries[..n].to_vec(),
302288
}
289+
}
303290

304-
// data.len() is too large
305-
306-
let l = rpc.entries.len();
307-
if l == 0 {
308-
// impossible.
309-
Ok(raft_req)
310-
} else if l == 1 {
311-
warn!(
312-
"append_entries req too large: target={}, len={}, can not split",
313-
self.target,
314-
raft_req.data.len()
315-
);
316-
// can not split, just try to send this big request
317-
Ok(raft_req)
318-
} else {
319-
// l > 1
320-
let n = std::cmp::max(1, l / 2);
321-
warn!(
322-
"append_entries req too large: target={}, len={}, reduce NO entries from {} to {}",
323-
self.target,
324-
raft_req.data.len(),
325-
l,
326-
n
327-
);
328-
// TODO: In openraft 0.10.0, PayloadTooLarge was removed.
329-
// When a message too large error is received.
330-
// append_entries method should truncate the log entries to send and retry.
331-
// The payload size limiting may be handled differently now.
332-
// For now, just allow the request through.
333-
Ok(raft_req)
291+
/// Reduce entry count by half. Returns `None` if already at minimum.
292+
fn try_reduce_entries(&self, current: usize, reason: &str) -> Option<usize> {
293+
if current <= 1 {
294+
return None;
334295
}
296+
297+
let new_count = current / 2;
298+
warn!(
299+
"append_entries: target={}, {}, reducing entries {} -> {}",
300+
self.target, reason, current, new_count
301+
);
302+
Some(new_count)
335303
}
336304

337305
pub(crate) fn back_off(&self) -> impl Iterator<Item = Duration> + use<> {
@@ -749,6 +717,10 @@ impl Network {
749717
}
750718

751719
impl RaftNetworkV2<TypeConfig> for Network {
720+
/// Send AppendEntries RPC with automatic payload size management.
721+
///
722+
/// If the payload exceeds gRPC size limit, reduces entry count and retries.
723+
/// Returns error if a single entry exceeds the limit.
752724
#[logcall::logcall(err = "debug")]
753725
#[fastrace::trace]
754726
async fn append_entries(
@@ -763,36 +735,71 @@ impl RaftNetworkV2<TypeConfig> for Network {
763735
"send_append_entries",
764736
);
765737

766-
let raft_req = self.new_append_entries_raft_req(&rpc)?;
767-
let req = GrpcHelper::traced_req(raft_req);
738+
let mut entries_to_send = rpc.entries.len();
768739

769-
let bytes = req.get_ref().data.len() as u64;
770-
raft_metrics::network::incr_sendto_bytes(&self.target, bytes);
740+
loop {
741+
let partial_rpc = Self::build_partial_append_request(&rpc, entries_to_send);
742+
let raft_req =
743+
GrpcHelper::encode_raft_request(&partial_rpc).map_err(|e| Unreachable::new(&e))?;
744+
let payload_size = raft_req.data.len();
745+
746+
// Check size before sending
747+
if payload_size > GrpcConfig::advisory_encoding_size() {
748+
let reason = format!("payload too large: {} bytes", payload_size);
749+
match self.try_reduce_entries(entries_to_send, &reason) {
750+
Some(n) => {
751+
entries_to_send = n;
752+
continue;
753+
}
754+
None => {
755+
let err = AnyError::error(reason);
756+
return Err(RPCError::Unreachable(Unreachable::new(&err)));
757+
}
758+
}
759+
}
771760

772-
let mut client = self
773-
.take_client()
774-
.log_elapsed_debug("Raft NetworkConnection append_entries take_client()")
775-
.await?;
761+
// Send the request
762+
let req = GrpcHelper::traced_req(raft_req);
763+
raft_metrics::network::incr_sendto_bytes(&self.target, req.get_ref().data.len() as u64);
776764

777-
let grpc_res = client
778-
.append_entries(req)
779-
.with_timing(observe_append_send_spent(self.target))
780-
.await;
781-
debug!(
782-
"append_entries resp from: target={}: {:?}",
783-
self.target, grpc_res
784-
);
765+
let mut client = self
766+
.take_client()
767+
.log_elapsed_debug("Raft NetworkConnection append_entries take_client()")
768+
.await?;
785769

786-
match &grpc_res {
787-
Ok(_) => {
788-
self.client.lock().await.replace(client);
789-
}
790-
Err(e) => {
791-
warn!(target = self.target, rpc = rpc.summary(); "append_entries failed: {}", e);
770+
let grpc_res = client
771+
.append_entries(req)
772+
.with_timing(observe_append_send_spent(self.target))
773+
.await;
774+
775+
debug!(
776+
"append_entries resp from: target={}: {:?}",
777+
self.target, grpc_res
778+
);
779+
780+
match &grpc_res {
781+
Ok(_) => {
782+
self.client.lock().await.replace(client);
783+
return self.parse_grpc_resp::<_, openraft::error::Infallible>(grpc_res);
784+
}
785+
Err(status) if status.code() == tonic::Code::ResourceExhausted => {
786+
match self.try_reduce_entries(entries_to_send, "ResourceExhausted") {
787+
Some(n) => {
788+
entries_to_send = n;
789+
continue;
790+
}
791+
None => {
792+
let err = AnyError::error("ResourceExhausted: single entry too large");
793+
return Err(RPCError::Unreachable(Unreachable::new(&err)));
794+
}
795+
}
796+
}
797+
Err(e) => {
798+
warn!(target = self.target, rpc = partial_rpc.summary(); "append_entries failed: {}", e);
799+
return self.parse_grpc_resp::<_, openraft::error::Infallible>(grpc_res);
800+
}
792801
}
793802
}
794-
795-
self.parse_grpc_resp::<_, openraft::error::Infallible>(grpc_res)
796803
}
797804

798805
/// Send snapshot to target node. Currently uses V004 streaming protocol.

0 commit comments

Comments
 (0)