Skip to content

Commit bc9ce75

Browse files
committed
feat: error handling improvements
1 parent 0f2d6d1 commit bc9ce75

23 files changed

+499
-286
lines changed

crates/raft/src/auto_recovery.rs

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::error::{
2828
ApplyOperationsSnafu, BlockArchiveNotConfiguredSnafu, BlockReadSnafu, IndexLookupSnafu,
2929
RecoveryError, StateRootComputationSnafu,
3030
};
31-
use crate::log_storage::{AppliedStateAccessor, VaultHealthStatus, MAX_RECOVERY_ATTEMPTS};
31+
use crate::log_storage::{AppliedStateAccessor, MAX_RECOVERY_ATTEMPTS, VaultHealthStatus};
3232
use crate::types::{LedgerNodeId, LedgerRequest, LedgerResponse, LedgerTypeConfig};
3333

3434
/// Default interval between recovery scans.
@@ -144,7 +144,10 @@ impl AutoRecoveryJob {
144144
/// Calculate retry delay with exponential backoff.
145145
fn retry_delay(&self, attempt: u8) -> Duration {
146146
let multiplier = 2u64.saturating_pow(attempt.saturating_sub(1) as u32);
147-
let delay = self.config.base_retry_delay.saturating_mul(multiplier as u32);
147+
let delay = self
148+
.config
149+
.base_retry_delay
150+
.saturating_mul(multiplier as u32);
148151
std::cmp::min(delay, self.config.max_retry_delay)
149152
}
150153

@@ -171,7 +174,10 @@ impl AutoRecoveryJob {
171174
VaultHealthStatus::Diverged { .. } => {
172175
needs_recovery.push((namespace_id, vault_id, health));
173176
}
174-
VaultHealthStatus::Recovering { started_at, attempt } => {
177+
VaultHealthStatus::Recovering {
178+
started_at,
179+
attempt,
180+
} => {
175181
if *attempt < MAX_RECOVERY_ATTEMPTS
176182
&& self.is_ready_for_retry(*started_at, *attempt)
177183
{
@@ -213,7 +219,16 @@ impl AutoRecoveryJob {
213219
// First, transition to Recovering state
214220
let now = chrono::Utc::now().timestamp();
215221
if let Err(e) = self
216-
.propose_health_update(namespace_id, vault_id, false, None, None, None, Some(attempt), Some(now))
222+
.propose_health_update(
223+
namespace_id,
224+
vault_id,
225+
false,
226+
None,
227+
None,
228+
None,
229+
Some(attempt),
230+
Some(now),
231+
)
217232
.await
218233
{
219234
warn!(
@@ -228,9 +243,7 @@ impl AutoRecoveryJob {
228243

229244
info!(
230245
namespace_id,
231-
vault_id,
232-
attempt,
233-
"Starting vault recovery attempt"
246+
vault_id, attempt, "Starting vault recovery attempt"
234247
);
235248

236249
// Get the expected state root from the divergence info
@@ -258,7 +271,16 @@ impl AutoRecoveryJob {
258271
if computed_root == expected_root || expected_root == ledger_types::ZERO_HASH {
259272
// Recovery successful
260273
if let Err(e) = self
261-
.propose_health_update(namespace_id, vault_id, true, None, None, None, None, None)
274+
.propose_health_update(
275+
namespace_id,
276+
vault_id,
277+
true,
278+
None,
279+
None,
280+
None,
281+
None,
282+
None,
283+
)
262284
.await
263285
{
264286
warn!(
@@ -320,10 +342,7 @@ impl AutoRecoveryJob {
320342

321343
debug!(
322344
namespace_id,
323-
vault_id,
324-
start_height,
325-
tip_height,
326-
"Replaying vault state for recovery"
345+
vault_id, start_height, tip_height, "Replaying vault state for recovery"
327346
);
328347

329348
// Replay blocks from start_height to tip
@@ -385,10 +404,7 @@ impl AutoRecoveryJob {
385404
.iter()
386405
.find(|v| v.vault_id == vault_id)
387406
{
388-
return Ok((
389-
vault_state.vault_height + 1,
390-
vault_state.state_root,
391-
));
407+
return Ok((vault_state.vault_height + 1, vault_state.state_root));
392408
}
393409
}
394410
}
@@ -423,14 +439,14 @@ impl AutoRecoveryJob {
423439
recovery_started_at,
424440
};
425441

426-
let result = self
427-
.raft
428-
.client_write(request)
429-
.await
430-
.map_err(|e| RecoveryError::RaftConsensus {
431-
message: format!("{:?}", e),
432-
backtrace: snafu::Backtrace::generate(),
433-
})?;
442+
let result =
443+
self.raft
444+
.client_write(request)
445+
.await
446+
.map_err(|e| RecoveryError::RaftConsensus {
447+
message: format!("{:?}", e),
448+
backtrace: snafu::Backtrace::generate(),
449+
})?;
434450

435451
match result.data {
436452
LedgerResponse::VaultHealthUpdated { success: true } => Ok(()),

crates/raft/src/batching.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ mod tests {
581581
#[tokio::test]
582582
async fn test_eager_commit_flushes_quickly() {
583583
let config = BatchConfig {
584-
max_batch_size: 100, // High limit
584+
max_batch_size: 100, // High limit
585585
batch_timeout: Duration::from_secs(10), // Very long timeout
586586
tick_interval: Duration::from_millis(5),
587587
eager_commit: true, // Enable eager commit

crates/raft/src/error.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,10 @@ pub enum RecoveryError {
5656

5757
/// Raft consensus write failed.
5858
#[snafu(display("Raft consensus write failed: {message}"))]
59-
RaftConsensus { message: String, backtrace: Backtrace },
59+
RaftConsensus {
60+
message: String,
61+
backtrace: Backtrace,
62+
},
6063

6164
/// Health update was rejected by the state machine.
6265
#[snafu(display("Health update rejected: {reason}"))]
@@ -88,7 +91,10 @@ pub enum SagaError {
8891

8992
/// Raft consensus write failed.
9093
#[snafu(display("Raft write failed for saga operation: {message}"))]
91-
SagaRaftWrite { message: String, backtrace: Backtrace },
94+
SagaRaftWrite {
95+
message: String,
96+
backtrace: Backtrace,
97+
},
9298

9399
/// Entity read from state failed.
94100
#[snafu(display("Failed to read {entity_type} from state: {source}"))]
@@ -106,7 +112,10 @@ pub enum SagaError {
106112

107113
/// Sequence allocation failed.
108114
#[snafu(display("Sequence allocation failed: {message}"))]
109-
SequenceAllocation { message: String, backtrace: Backtrace },
115+
SequenceAllocation {
116+
message: String,
117+
backtrace: Backtrace,
118+
},
110119

111120
/// Unexpected response from state machine.
112121
#[snafu(display("Unexpected saga response: {description}"))]
@@ -123,7 +132,10 @@ pub enum SagaError {
123132
pub enum OrphanCleanupError {
124133
/// Raft consensus write failed.
125134
#[snafu(display("Raft write failed during orphan cleanup: {message}"))]
126-
OrphanRaftWrite { message: String, backtrace: Backtrace },
135+
OrphanRaftWrite {
136+
message: String,
137+
backtrace: Backtrace,
138+
},
127139

128140
/// Failed to delete orphaned entity.
129141
#[snafu(display("Failed to delete orphan entity {key}: {reason}"))]
@@ -142,7 +154,10 @@ pub enum OrphanCleanupError {
142154
pub enum ServiceError {
143155
/// Raft consensus operation failed.
144156
#[snafu(display("Raft operation failed: {message}"))]
145-
Raft { message: String, backtrace: Backtrace },
157+
Raft {
158+
message: String,
159+
backtrace: Backtrace,
160+
},
146161

147162
/// Storage operation failed.
148163
#[snafu(display("Storage operation failed: {source}"))]

crates/raft/src/file_lock.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,7 @@ impl std::fmt::Display for LockError {
5656
impl std::error::Error for LockError {
5757
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
5858
match self {
59-
LockError::CreateFailed(_, err) | LockError::DirectoryCreateFailed(_, err) => {
60-
Some(err)
61-
}
59+
LockError::CreateFailed(_, err) | LockError::DirectoryCreateFailed(_, err) => Some(err),
6260
LockError::AlreadyLocked(_) => None,
6361
}
6462
}
@@ -114,8 +112,8 @@ impl DataDirLock {
114112
}
115113

116114
// Create/open the lock file
117-
let file = File::create(&lock_path)
118-
.map_err(|e| LockError::CreateFailed(lock_path.clone(), e))?;
115+
let file =
116+
File::create(&lock_path).map_err(|e| LockError::CreateFailed(lock_path.clone(), e))?;
119117

120118
// Try to acquire an exclusive lock (non-blocking)
121119
match file.try_lock_exclusive() {
@@ -189,6 +187,7 @@ impl Drop for DataDirLock {
189187
}
190188

191189
#[cfg(test)]
190+
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
192191
mod tests {
193192
use super::*;
194193
use tempfile::TempDir;

crates/raft/src/idempotency.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,13 @@ impl IdempotencyCache {
155155
}
156156

157157
// Not a duplicate, insert the new result
158-
self.insert(namespace_id, vault_id, client_id.to_string(), sequence, result);
158+
self.insert(
159+
namespace_id,
160+
vault_id,
161+
client_id.to_string(),
162+
sequence,
163+
result,
164+
);
159165
None
160166
}
161167

@@ -280,7 +286,10 @@ mod tests {
280286

281287
// Same client, same sequence, but different vault should NOT be duplicate
282288
let cached = cache.check(1, 2, "client-1", 1);
283-
assert!(cached.is_none(), "different vault should not be a duplicate");
289+
assert!(
290+
cached.is_none(),
291+
"different vault should not be a duplicate"
292+
);
284293

285294
// Now insert for vault 2
286295
cache.insert(1, 2, "client-1".to_string(), 1, result);
@@ -302,7 +311,10 @@ mod tests {
302311

303312
// Same client, same vault, same sequence, but different namespace should NOT be duplicate
304313
let cached = cache.check(2, 1, "client-1", 1);
305-
assert!(cached.is_none(), "different namespace should not be a duplicate");
314+
assert!(
315+
cached.is_none(),
316+
"different namespace should not be a duplicate"
317+
);
306318
}
307319

308320
/// DESIGN.md compliance test: sequence numbers must be monotonically increasing.

crates/raft/src/learner_refresh.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ use tonic::transport::Channel;
2828
use tracing::{debug, info, warn};
2929

3030
use crate::log_storage::AppliedStateAccessor;
31-
use crate::proto::system_discovery_service_client::SystemDiscoveryServiceClient;
3231
use crate::proto::GetSystemStateRequest;
32+
use crate::proto::system_discovery_service_client::SystemDiscoveryServiceClient;
3333
use crate::types::{LedgerNodeId, LedgerTypeConfig};
3434

3535
/// Default refresh interval for learner background task.
@@ -154,11 +154,7 @@ impl LearnerRefreshJob {
154154

155155
membership
156156
.voter_ids()
157-
.filter_map(|id| {
158-
membership
159-
.get_node(&id)
160-
.map(|node| (id, node.addr.clone()))
161-
})
157+
.filter_map(|id| membership.get_node(&id).map(|node| (id, node.addr.clone())))
162158
.collect()
163159
}
164160

crates/raft/src/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,18 @@ pub mod proto {
5151

5252
pub use auto_recovery::{AutoRecoveryConfig, AutoRecoveryJob, RecoveryResult};
5353
pub use batching::{BatchConfig, BatchError, BatchWriter, BatchWriterHandle};
54-
pub use learner_refresh::{CachedSystemState, LearnerRefreshConfig, LearnerRefreshJob};
5554
pub use block_compaction::BlockCompactor;
5655
pub use file_lock::{DataDirLock, LockError};
5756
pub use idempotency::IdempotencyCache;
57+
pub use learner_refresh::{CachedSystemState, LearnerRefreshConfig, LearnerRefreshJob};
5858
pub use log_storage::{
5959
AppliedState, AppliedStateAccessor, NamespaceMeta, RaftLogStore, SequenceCounters,
6060
VaultHealthStatus, VaultMeta,
6161
};
62+
pub use multi_raft::{
63+
MultiRaftConfig, MultiRaftError, MultiRaftManager, MultiRaftStats, ShardConfig, ShardGroup,
64+
};
65+
pub use multi_shard_server::MultiShardLedgerServer;
6266
pub use orphan_cleanup::OrphanCleanupJob;
6367
pub use pagination::{PageToken, PageTokenCodec, PageTokenError};
6468
pub use peer_maintenance::PeerMaintenance;
@@ -67,13 +71,9 @@ pub use raft_network::{GrpcRaftNetwork, GrpcRaftNetworkFactory};
6771
pub use rate_limit::{NamespaceRateLimiter, RateLimitExceeded};
6872
pub use saga_orchestrator::SagaOrchestrator;
6973
pub use server::LedgerServer;
70-
pub use multi_shard_server::MultiShardLedgerServer;
7174
pub use shard_router::{
7275
RouterConfig, RouterStats, RoutingError, RoutingInfo, ShardConnection, ShardRouter,
7376
};
74-
pub use multi_raft::{
75-
MultiRaftConfig, MultiRaftError, MultiRaftManager, MultiRaftStats, ShardConfig, ShardGroup,
76-
};
7777
// Re-export multi-shard service types
7878
pub use services::{
7979
ForwardClient, MultiShardReadService, MultiShardResolver, MultiShardWriteService,

0 commit comments

Comments
 (0)