Skip to content

Commit 7e32113

Browse files
authored
refactor(meta): upgrade openraft to v0.10.0-alpha.13 (#19193)
refactor(meta): upgrade openraft to v0.10.0-alpha.13 and handle oversized payloads Upgrade openraft dependency and adapt to API changes in the new version. The main changes include a new streaming apply API, updated watch receiver trait methods, and automatic payload size management for AppendEntries. In openraft 0.10.0, `PayloadTooLarge` error was removed. This change implements automatic payload size management in `append_entries()` by reducing entry count and retrying when payloads exceed gRPC limits. Changes: - Upgrade openraft from `v0.10.0-alpha.11` to `v0.10.0-alpha.13` - Update `TypeConfig` to use new `Responder<T>` and `ErrorSource` types - Add type aliases: `WatchReceiver<T>`, `Unreachable`, `EntryResponder` - Change `SMV003::apply_entries()` to accept stream instead of iterator - Replace `.borrow()` with `.borrow_watched()` for watch receivers - Add `From<SnapshotStoreError> for io::Error` implementation - Update tests to use stream-based apply API - Add retry loop in `append_entries()` to handle oversized payloads - Add `build_partial_append_request()` helper for creating partial requests - Check payload size before sending against `advisory_encoding_size()` - Handle `tonic::Code::ResourceExhausted` by reducing entries and retrying - Return `Unreachable` error when single entry exceeds size limit - Remove obsolete `new_append_entries_raft_req()` function
1 parent 1cbe48f commit 7e32113

File tree

22 files changed

+306
-244
lines changed

22 files changed

+306
-244
lines changed

Cargo.lock

Lines changed: 27 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226
571571
color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" }
572572
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "9954bff" }
573573
map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.4.2" }
574-
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.11" }
574+
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.13" }
575575
orc-rust = { git = "https://github.com/datafuse-extras/orc-rust", rev = "fc812ad7010" }
576576
recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "16e433a" }
577577
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" }

src/meta/raft-store/src/sm_v003/sm_v003.rs

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,14 @@ use std::sync::Arc;
2020
use std::sync::Mutex;
2121
use std::time::Duration;
2222

23-
use databend_common_meta_types::AppliedState;
24-
use databend_common_meta_types::raft_types::Entry;
25-
use databend_common_meta_types::raft_types::StorageError;
23+
use databend_common_meta_types::raft_types::EntryResponder;
2624
use databend_common_meta_types::snapshot_db::DB;
2725
use databend_common_meta_types::sys_data::SysData;
26+
use futures::Stream;
27+
use futures::StreamExt;
2828
use log::debug;
2929
use log::info;
3030
use map_api::mvcc::ScopedGet;
31-
use openraft::entry::RaftEntry;
3231
use state_machine_api::SeqV;
3332
use state_machine_api::StateMachineApi;
3433
use state_machine_api::UserKey;
@@ -243,29 +242,20 @@ impl SMV003 {
243242
WriterAcquirer::new(self.write_semaphore.clone())
244243
}
245244

246-
pub async fn apply_entries(
247-
&self,
248-
entries: impl IntoIterator<Item = Entry>,
249-
) -> Result<Vec<AppliedState>, StorageError> {
245+
pub async fn apply_entries<S>(&self, mut entries: S) -> Result<(), io::Error>
246+
where S: Stream<Item = Result<EntryResponder, io::Error>> + Unpin {
250247
let mut applier = self.new_applier().await;
251248

252-
let mut res = vec![];
253-
254-
for ent in entries.into_iter() {
255-
let log_id = ent.log_id();
256-
let r = applier
257-
.apply(&ent)
258-
.await
259-
.map_err(|e| StorageError::apply(log_id, &e))?;
260-
res.push(r);
249+
while let Some(result) = entries.next().await {
250+
let (entry, responder) = result?;
251+
let applied = applier.apply(&entry).await?;
252+
if let Some(responder) = responder {
253+
responder.send(applied);
254+
}
261255
}
262256

263-
applier
264-
.commit()
265-
.await
266-
.map_err(|e| StorageError::write(&e))?;
267-
268-
Ok(res)
257+
applier.commit().await?;
258+
Ok(())
269259
}
270260

271261
pub fn sys_data(&self) -> SysData {

src/meta/raft-store/src/sm_v003/snapshot_store_error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,9 @@ impl From<SnapshotStoreError> for MetaStorageError {
8787
})
8888
}
8989
}
90+
91+
impl From<SnapshotStoreError> for io::Error {
92+
fn from(e: SnapshotStoreError) -> Self {
93+
io::Error::other(e)
94+
}
95+
}

src/meta/service/src/meta_node/meta_handle.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@ use databend_common_meta_types::raft_types::Fatal;
4545
use databend_common_meta_types::raft_types::NodeId;
4646
use databend_common_meta_types::raft_types::RaftMetrics;
4747
use databend_common_meta_types::raft_types::Wait;
48+
use databend_common_meta_types::raft_types::WatchReceiver;
4849
use databend_common_meta_types::sys_data::SysData;
4950
use futures::Stream;
5051
use futures::stream::BoxStream;
5152
use tokio::sync::mpsc;
5253
use tokio::sync::oneshot;
53-
use tokio::sync::watch;
5454
use tonic::Status;
5555

5656
use crate::analysis::count_prefix::count_prefix;
@@ -330,9 +330,7 @@ impl MetaHandle {
330330
.await
331331
}
332332

333-
pub async fn handle_raft_metrics(
334-
&self,
335-
) -> Result<watch::Receiver<RaftMetrics>, MetaNodeStopped> {
333+
pub async fn handle_raft_metrics(&self) -> Result<WatchReceiver<RaftMetrics>, MetaNodeStopped> {
336334
self.request(move |meta_node| {
337335
let fu = async move { meta_node.raft.metrics() };
338336

src/meta/service/src/meta_node/meta_node.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use databend_common_base::base::BuildInfoRef;
2626
use databend_common_base::base::tokio;
2727
use databend_common_base::base::tokio::sync::Mutex;
2828
use databend_common_base::base::tokio::sync::watch;
29-
use databend_common_base::base::tokio::sync::watch::error::RecvError;
3029
use databend_common_base::base::tokio::task::JoinHandle;
3130
use databend_common_base::base::tokio::time::Instant;
3231
use databend_common_base::base::tokio::time::sleep;
@@ -41,6 +40,8 @@ use databend_common_meta_raft_store::raft_log_v004::RaftLogStat;
4140
use databend_common_meta_raft_store::utils::seq_marked_to_seqv;
4241
use databend_common_meta_sled_store::openraft;
4342
use databend_common_meta_sled_store::openraft::ChangeMembers;
43+
use databend_common_meta_sled_store::openraft::async_runtime::RecvError;
44+
use databend_common_meta_sled_store::openraft::async_runtime::WatchReceiver as WatchReceiverTrait;
4445
use databend_common_meta_sled_store::openraft::error::RaftError;
4546
use databend_common_meta_stoerr::MetaStorageError;
4647
use databend_common_meta_types::AppliedState;
@@ -67,6 +68,7 @@ use databend_common_meta_types::raft_types::MembershipNode;
6768
use databend_common_meta_types::raft_types::NodeId;
6869
use databend_common_meta_types::raft_types::RaftMetrics;
6970
use databend_common_meta_types::raft_types::TypeConfig;
71+
use databend_common_meta_types::raft_types::WatchReceiver;
7072
use databend_common_meta_types::raft_types::new_log_id;
7173
use databend_common_meta_types::snapshot_db::DBStat;
7274
use fastrace::func_name;
@@ -327,7 +329,10 @@ impl MetaNode {
327329
if r.is_err() {
328330
break;
329331
}
330-
info!("waiting for raft to shutdown, metrics: {:?}", rx.borrow());
332+
info!(
333+
"waiting for raft to shutdown, metrics: {:?}",
334+
rx.borrow_watched()
335+
);
331336
}
332337
info!("shutdown raft");
333338

@@ -351,7 +356,7 @@ impl MetaNode {
351356
}
352357

353358
/// Spawn a monitor to watch raft state changes and report metrics changes.
354-
pub async fn subscribe_metrics(mn: Arc<Self>, metrics_rx: watch::Receiver<RaftMetrics>) {
359+
pub async fn subscribe_metrics(mn: Arc<Self>, metrics_rx: WatchReceiver<RaftMetrics>) {
355360
info!("Start a task subscribing raft metrics and forward to metrics API");
356361

357362
let fut = Self::report_metrics_loop(mn.clone(), metrics_rx);
@@ -369,7 +374,7 @@ impl MetaNode {
369374
/// Report metrics changes periodically.
370375
async fn report_metrics_loop(
371376
meta_node: Arc<Self>,
372-
mut metrics_rx: watch::Receiver<RaftMetrics>,
377+
mut metrics_rx: WatchReceiver<RaftMetrics>,
373378
) -> Result<(), AnyError> {
374379
const RATE_LIMIT_INTERVAL: Duration = Duration::from_millis(200);
375380
let mut last_leader: Option<u64> = None;
@@ -388,7 +393,7 @@ impl MetaNode {
388393
break;
389394
}
390395

391-
let mm = metrics_rx.borrow().clone();
396+
let mm = metrics_rx.borrow_watched().clone();
392397

393398
// Report metrics about server state and role.
394399
server_metrics::set_node_is_health(
@@ -403,7 +408,7 @@ impl MetaNode {
403408
// metrics about raft log and state machine.
404409
server_metrics::set_current_term(mm.current_term);
405410
server_metrics::set_last_log_index(mm.last_log_index.unwrap_or_default());
406-
server_metrics::set_proposals_applied(mm.last_applied.unwrap_or_default().index);
411+
server_metrics::set_proposals_applied(mm.last_applied.map(|id| id.index).unwrap_or(0));
407412
server_metrics::set_last_seq(meta_node.get_last_seq().await);
408413

409414
{
@@ -1193,7 +1198,7 @@ impl MetaNode {
11931198
let snapshot_key_count = self.get_snapshot_key_count().await;
11941199
let snapshot_key_space_stat = self.get_snapshot_key_space_stat().await;
11951200

1196-
let metrics = self.raft.metrics().borrow().clone();
1201+
let metrics = self.raft.metrics().borrow_watched().clone();
11971202

11981203
let leader = if let Some(leader_id) = metrics.current_leader {
11991204
self.get_node(&leader_id).await
@@ -1455,7 +1460,7 @@ impl MetaNode {
14551460
let mut expire_at: Option<Instant> = None;
14561461

14571462
loop {
1458-
if let Some(l) = rx.borrow().current_leader {
1463+
if let Some(l) = rx.borrow_watched().current_leader {
14591464
return Ok(Some(l));
14601465
}
14611466

src/meta/service/src/meta_service/meta_leader.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use databend_common_meta_client::MetaGrpcReadReq;
2121
use databend_common_meta_kvapi::kvapi::KVApi;
2222
use databend_common_meta_kvapi::kvapi::KvApiExt;
2323
use databend_common_meta_sled_store::openraft::ChangeMembers;
24+
use databend_common_meta_sled_store::openraft::async_runtime::WatchReceiver;
2425
use databend_common_meta_stoerr::MetaStorageError;
2526
use databend_common_meta_types::AppliedState;
2627
use databend_common_meta_types::Cmd;
@@ -178,7 +179,7 @@ impl<'a> MetaLeader<'a> {
178179
let role = req.role();
179180
let node_id = req.node_id;
180181
let endpoint = req.endpoint;
181-
let metrics = self.raft.metrics().borrow().clone();
182+
let metrics = self.raft.metrics().borrow_watched().clone();
182183
let membership = metrics.membership_config.membership();
183184

184185
let voters = membership.voter_ids().collect::<BTreeSet<_>>();

0 commit comments

Comments
 (0)