Skip to content

Commit 3e4a2fe

Browse files
committed
feat: batchread, stress test
1 parent bc9ce75 commit 3e4a2fe

26 files changed

+2322
-180
lines changed

crates/raft/src/auto_recovery.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ pub struct AutoRecoveryJob {
9292
block_archive: Option<Arc<BlockArchive>>,
9393
/// Snapshot manager for finding recovery starting points.
9494
snapshot_manager: Option<Arc<SnapshotManager>>,
95-
/// State layer for applying recovered state.
96-
state: Arc<parking_lot::RwLock<StateLayer>>,
95+
/// State layer for applying recovered state (internally thread-safe via redb MVCC).
96+
state: Arc<StateLayer>,
9797
/// Configuration.
9898
config: AutoRecoveryConfig,
9999
}
@@ -104,7 +104,7 @@ impl AutoRecoveryJob {
104104
raft: Arc<Raft<LedgerTypeConfig>>,
105105
node_id: LedgerNodeId,
106106
applied_state: AppliedStateAccessor,
107-
state: Arc<parking_lot::RwLock<StateLayer>>,
107+
state: Arc<StateLayer>,
108108
) -> Self {
109109
Self {
110110
raft,
@@ -370,15 +370,16 @@ impl AutoRecoveryJob {
370370

371371
if let Some(entry) = vault_entry {
372372
// Apply transactions and compute new state root
373-
let state = self.state.read();
373+
// StateLayer is internally thread-safe via redb MVCC
374374
for tx in &entry.transactions {
375-
state
375+
self.state
376376
.apply_operations(vault_id, &tx.operations, height)
377377
.context(ApplyOperationsSnafu { height })?;
378378
}
379379

380380
// Compute state root after applying
381-
computed_root = state
381+
computed_root = self
382+
.state
382383
.compute_state_root(vault_id)
383384
.context(StateRootComputationSnafu { vault_id })?;
384385
}

crates/raft/src/log_storage.rs

Lines changed: 73 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use parking_lot::RwLock;
2626
use redb::{Database, ReadableTable, TableDefinition};
2727
use serde::{Deserialize, Serialize};
2828

29-
use ledger_types::{Hash, NamespaceId, ShardBlock, VaultEntry, VaultId, compute_tx_merkle_root};
29+
use ledger_types::{Hash, NamespaceId, Operation, ShardBlock, ShardId, VaultEntry, VaultId, compute_tx_merkle_root};
3030

3131
use crate::types::{
3232
BlockRetentionPolicy, LedgerNodeId, LedgerRequest, LedgerResponse, LedgerTypeConfig,
@@ -35,6 +35,7 @@ use crate::types::{
3535

3636
// Re-export storage types used in this module
3737
use ledger_storage::{BlockArchive, StateError, StateLayer};
38+
use ledger_storage::system::{NamespaceRegistry, NamespaceStatus, SystemKeys, SYSTEM_VAULT_ID};
3839

3940
// ============================================================================
4041
// Table Definitions
@@ -114,7 +115,7 @@ pub struct NamespaceMeta {
114115
/// Human-readable name.
115116
pub name: String,
116117
/// Shard hosting this namespace (0 for default).
117-
pub shard_id: u32,
118+
pub shard_id: ShardId,
118119
/// Whether the namespace is deleted.
119120
pub deleted: bool,
120121
}
@@ -354,11 +355,11 @@ pub struct RaftLogStore {
354355
/// Applied state (state machine) - shared with accessor.
355356
applied_state: Arc<RwLock<AppliedState>>,
356357
/// State layer for entity/relationship storage (shared with read service).
357-
state_layer: Option<Arc<RwLock<StateLayer>>>,
358+
state_layer: Option<Arc<StateLayer>>,
358359
/// Block archive for permanent block storage.
359360
block_archive: Option<Arc<BlockArchive>>,
360361
/// Shard ID for this Raft group.
361-
shard_id: i32,
362+
shard_id: ShardId,
362363
/// Node ID for block metadata.
363364
node_id: String,
364365
/// Current shard height (for block creation).
@@ -411,7 +412,7 @@ impl RaftLogStore {
411412
}
412413

413414
/// Configure the state layer for transaction application.
414-
pub fn with_state_layer(mut self, state_layer: Arc<RwLock<StateLayer>>) -> Self {
415+
pub fn with_state_layer(mut self, state_layer: Arc<StateLayer>) -> Self {
415416
self.state_layer = Some(state_layer);
416417
self
417418
}
@@ -423,7 +424,7 @@ impl RaftLogStore {
423424
}
424425

425426
/// Configure shard metadata.
426-
pub fn with_shard_config(mut self, shard_id: i32, node_id: String) -> Self {
427+
pub fn with_shard_config(mut self, shard_id: ShardId, node_id: String) -> Self {
427428
self.shard_id = shard_id;
428429
self.node_id = node_id;
429430
self
@@ -435,7 +436,7 @@ impl RaftLogStore {
435436
}
436437

437438
/// Get a reference to the state layer (if configured).
438-
pub fn state_layer(&self) -> Option<&Arc<RwLock<StateLayer>>> {
439+
pub fn state_layer(&self) -> Option<&Arc<StateLayer>> {
439440
self.state_layer.as_ref()
440441
}
441442

@@ -552,15 +553,14 @@ impl RaftLogStore {
552553
.unwrap_or(ledger_types::ZERO_HASH);
553554

554555
// Apply transactions to state layer if configured
555-
let state_root = if let Some(state_layer_lock) = &self.state_layer {
556+
let state_root = if let Some(state_layer) = &self.state_layer {
556557
// Collect all operations from all transactions
557558
let all_ops: Vec<_> = transactions
558559
.iter()
559560
.flat_map(|tx| tx.operations.clone())
560561
.collect();
561562

562-
// Acquire write lock and apply operations
563-
let state_layer = state_layer_lock.write();
563+
// Apply operations (StateLayer is internally thread-safe via redb MVCC)
564564
if let Err(e) = state_layer.apply_operations(*vault_id, &all_ops, new_height) {
565565
// Per DESIGN.md §6.1: On CAS failure, return current state for conflict resolution
566566
return match e {
@@ -655,18 +655,69 @@ impl RaftLogStore {
655655
)
656656
}
657657

658-
LedgerRequest::CreateNamespace { name } => {
658+
LedgerRequest::CreateNamespace { name, shard_id } => {
659659
let namespace_id = state.sequences.next_namespace();
660+
// Use provided shard_id or default to 0 (system shard)
661+
let assigned_shard = shard_id.unwrap_or(0);
660662
state.namespaces.insert(
661663
namespace_id,
662664
NamespaceMeta {
663665
namespace_id,
664666
name: name.clone(),
665-
shard_id: 0, // Default shard
667+
shard_id: assigned_shard,
666668
deleted: false,
667669
},
668670
);
669-
(LedgerResponse::NamespaceCreated { namespace_id }, None)
671+
672+
// Persist namespace to StateLayer for ShardRouter discovery.
673+
// This enables the ShardRouter to find the namespace->shard mapping.
674+
if let Some(state_layer) = &self.state_layer {
675+
let registry = NamespaceRegistry {
676+
namespace_id,
677+
name: name.clone(),
678+
shard_id: assigned_shard,
679+
member_nodes: vec![], // TODO: populate from cluster membership
680+
status: NamespaceStatus::Active,
681+
config_version: 1,
682+
created_at: chrono::Utc::now(),
683+
};
684+
685+
// Serialize and write to StateLayer
686+
if let Ok(value) = postcard::to_allocvec(&registry) {
687+
let key = SystemKeys::namespace_key(namespace_id);
688+
let name_index_key = SystemKeys::namespace_name_index_key(name);
689+
let ops = vec![
690+
Operation::SetEntity {
691+
key,
692+
value,
693+
condition: None,
694+
expires_at: None,
695+
},
696+
Operation::SetEntity {
697+
key: name_index_key,
698+
value: namespace_id.to_string().into_bytes(),
699+
condition: None,
700+
expires_at: None,
701+
},
702+
];
703+
704+
if let Err(e) = state_layer.apply_operations(SYSTEM_VAULT_ID, &ops, 0) {
705+
tracing::error!(
706+
namespace_id,
707+
error = %e,
708+
"Failed to persist namespace to StateLayer"
709+
);
710+
}
711+
}
712+
}
713+
714+
(
715+
LedgerResponse::NamespaceCreated {
716+
namespace_id,
717+
shard_id: assigned_shard,
718+
},
719+
None,
720+
)
670721
}
671722

672723
LedgerRequest::CreateVault {
@@ -1196,7 +1247,6 @@ impl RaftStorage<LedgerTypeConfig> for RaftLogStore {
11961247

11971248
// Restore StateLayer entities if StateLayer is configured
11981249
if let Some(state_layer) = &self.state_layer {
1199-
let state = state_layer.write();
12001250
for (vault_id, entities) in &combined.vault_entities {
12011251
for entity in entities {
12021252
// Convert entity to SetEntity operation
@@ -1211,7 +1261,7 @@ impl RaftStorage<LedgerTypeConfig> for RaftLogStore {
12111261
},
12121262
}];
12131263
// Apply at the entity's version height
1214-
if let Err(e) = state.apply_operations(*vault_id, &ops, entity.version) {
1264+
if let Err(e) = state_layer.apply_operations(*vault_id, &ops, entity.version) {
12151265
tracing::warn!(
12161266
vault_id,
12171267
key = %String::from_utf8_lossy(&entity.key),
@@ -1240,14 +1290,14 @@ impl RaftStorage<LedgerTypeConfig> for RaftLogStore {
12401290
}
12411291

12421292
// Collect entities from StateLayer if configured
1293+
// StateLayer is internally thread-safe via redb's MVCC, so no lock needed
12431294
let vault_entities = if let Some(state_layer) = &self.state_layer {
1244-
let sl = state_layer.read();
12451295
let mut entities_map = HashMap::new();
12461296

12471297
// Get entities for each known vault
12481298
for &(namespace_id, vault_id) in state.vault_heights.keys() {
12491299
// List all entities in this vault (up to 10000 per vault for snapshot)
1250-
match sl.list_entities(vault_id, None, None, 10000) {
1300+
match state_layer.list_entities(vault_id, None, None, 10000) {
12511301
Ok(entities) => {
12521302
if !entities.is_empty() {
12531303
entities_map.insert(vault_id, entities);
@@ -1387,12 +1437,13 @@ mod tests {
13871437

13881438
let request = LedgerRequest::CreateNamespace {
13891439
name: "test-ns".to_string(),
1440+
shard_id: None,
13901441
};
13911442

13921443
let (response, _vault_entry) = store.apply_request(&request, &mut state);
13931444

13941445
match response {
1395-
LedgerResponse::NamespaceCreated { namespace_id } => {
1446+
LedgerResponse::NamespaceCreated { namespace_id, .. } => {
13961447
assert_eq!(namespace_id, 1);
13971448
}
13981449
_ => panic!("unexpected response"),
@@ -1661,9 +1712,11 @@ mod tests {
16611712
let requests = vec![
16621713
LedgerRequest::CreateNamespace {
16631714
name: "acme-corp".to_string(),
1715+
shard_id: None,
16641716
},
16651717
LedgerRequest::CreateNamespace {
16661718
name: "startup-inc".to_string(),
1719+
shard_id: None,
16671720
},
16681721
LedgerRequest::CreateVault {
16691722
namespace_id: 1,
@@ -1882,6 +1935,7 @@ mod tests {
18821935
let requests: Vec<LedgerRequest> = vec![
18831936
LedgerRequest::CreateNamespace {
18841937
name: "ns1".to_string(),
1938+
shard_id: None,
18851939
},
18861940
LedgerRequest::CreateVault {
18871941
namespace_id: 1,
@@ -2025,6 +2079,7 @@ mod tests {
20252079
store.apply_request(
20262080
&LedgerRequest::CreateNamespace {
20272081
name: "test".to_string(),
2082+
shard_id: None,
20282083
},
20292084
&mut state,
20302085
);

crates/raft/src/multi_raft.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ pub struct ShardGroup {
228228
/// The Raft consensus instance.
229229
raft: Arc<Raft<LedgerTypeConfig>>,
230230
/// Shared state layer for this shard.
231-
state: Arc<RwLock<StateLayer>>,
231+
state: Arc<StateLayer>,
232232
/// Block archive for historical blocks.
233233
block_archive: Arc<BlockArchive>,
234234
/// Accessor for applied state.
@@ -249,7 +249,7 @@ impl ShardGroup {
249249
}
250250

251251
/// Get the state layer.
252-
pub fn state(&self) -> &Arc<RwLock<StateLayer>> {
252+
pub fn state(&self) -> &Arc<StateLayer> {
253253
&self.state
254254
}
255255

@@ -507,7 +507,7 @@ impl MultiRaftManager {
507507
&self,
508508
shard_id: ShardId,
509509
raft: Arc<Raft<LedgerTypeConfig>>,
510-
state: Arc<RwLock<StateLayer>>,
510+
state: Arc<StateLayer>,
511511
block_archive: Arc<BlockArchive>,
512512
applied_state: AppliedStateAccessor,
513513
) -> ShardBackgroundJobs {
@@ -551,14 +551,14 @@ impl MultiRaftManager {
551551
&self,
552552
shard_id: ShardId,
553553
shard_dir: &PathBuf,
554-
) -> Result<(Arc<RwLock<StateLayer>>, Arc<BlockArchive>, RaftLogStore)> {
554+
) -> Result<(Arc<StateLayer>, Arc<BlockArchive>, RaftLogStore)> {
555555
// Open state database
556556
let state_db_path = shard_dir.join("state.redb");
557557
let engine = StorageEngine::open(&state_db_path).map_err(|e| MultiRaftError::Storage {
558558
shard_id,
559559
message: format!("Failed to open state db: {}", e),
560560
})?;
561-
let state = Arc::new(RwLock::new(StateLayer::new(engine.db())));
561+
let state = Arc::new(StateLayer::new(engine.db()));
562562

563563
// Open block archive
564564
let blocks_db_path = shard_dir.join("blocks.redb");

crates/raft/src/orphan_cleanup.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use std::sync::Arc;
2222
use std::time::Duration;
2323

2424
use openraft::Raft;
25-
use parking_lot::RwLock;
2625
use snafu::GenerateImplicitData;
2726
use tokio::time::interval;
2827
use tracing::{debug, info, warn};
@@ -55,8 +54,8 @@ pub struct OrphanCleanupJob {
5554
raft: Arc<Raft<LedgerTypeConfig>>,
5655
/// This node's ID.
5756
node_id: LedgerNodeId,
58-
/// The shared state layer.
59-
state: Arc<RwLock<StateLayer>>,
57+
/// The shared state layer (internally thread-safe via redb MVCC).
58+
state: Arc<StateLayer>,
6059
/// Accessor for applied state (namespace registry).
6160
applied_state: AppliedStateAccessor,
6261
/// Cleanup interval.
@@ -68,7 +67,7 @@ impl OrphanCleanupJob {
6867
pub fn new(
6968
raft: Arc<Raft<LedgerTypeConfig>>,
7069
node_id: LedgerNodeId,
71-
state: Arc<RwLock<StateLayer>>,
70+
state: Arc<StateLayer>,
7271
applied_state: AppliedStateAccessor,
7372
) -> Self {
7473
Self {
@@ -99,10 +98,10 @@ impl OrphanCleanupJob {
9998
/// - User has deleted_at set
10099
/// - User has status = "DELETED" or "DELETING"
101100
fn get_deleted_user_ids(&self) -> HashSet<i64> {
102-
let state = self.state.read();
101+
// StateLayer is internally thread-safe via redb MVCC
103102

104103
// List all user entities in _system (vault_id = 0)
105-
let entities = match state.list_entities(0, Some("user:"), None, MAX_BATCH_SIZE * 10) {
104+
let entities = match self.state.list_entities(0, Some("user:"), None, MAX_BATCH_SIZE * 10) {
106105
Ok(e) => e,
107106
Err(e) => {
108107
warn!(error = %e, "Failed to list users");
@@ -143,14 +142,13 @@ impl OrphanCleanupJob {
143142
return Vec::new();
144143
}
145144

146-
let state = self.state.read();
147-
145+
// StateLayer is internally thread-safe via redb MVCC
148146
// Note: Namespace entities live in vault_id = 0 for the namespace
149147
// We need to list entities in the namespace, not vault 0 of _system
150148
// Actually, namespace-level entities (members, teams) are stored with the namespace
151149
// For this implementation, we assume entities are in vault_id = 0 per namespace
152150

153-
let entities = match state.list_entities(0, Some("member:"), None, MAX_BATCH_SIZE) {
151+
let entities = match self.state.list_entities(0, Some("member:"), None, MAX_BATCH_SIZE) {
154152
Ok(e) => e,
155153
Err(e) => {
156154
warn!(namespace_id, error = %e, "Failed to list memberships");

0 commit comments

Comments
 (0)