Skip to content

Commit 98b0e32

Browse files
Merge branch 'alin/MR-427-state-manager-split' into 'master'
feat: [MR-427] Implement state splitting in State Manager See merge request dfinity-lab/public/ic!12495
2 parents 575950b + 139ff68 commit 98b0e32

File tree

5 files changed

+586
-0
lines changed

5 files changed

+586
-0
lines changed

rs/state_manager/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ rust_library(
2323
"//rs/monitoring/logger",
2424
"//rs/monitoring/metrics",
2525
"//rs/protobuf",
26+
"//rs/registry/routing_table",
2627
"//rs/registry/subnet_type",
2728
"//rs/replicated_state",
2829
"//rs/state_layout",

rs/state_manager/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ ic-interfaces-state-manager = { path = "../interfaces/state_manager" }
1919
ic-logger = { path = "../monitoring/logger" }
2020
ic-metrics = { path = "../monitoring/metrics" }
2121
ic-protobuf = { path = "../protobuf" }
22+
ic-registry-routing-table = { path = "../registry/routing_table" }
2223
ic-registry-subnet-type = { path = "../registry/subnet_type" }
2324
ic-replicated-state = { path = "../replicated_state" }
2425
ic-state-layout = { path = "../state_layout" }

rs/state_manager/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
pub mod checkpoint;
44
pub mod labeled_tree_visitor;
55
pub mod manifest;
6+
pub mod split;
67
pub mod state_sync;
78
pub mod stream_encoding;
89
pub mod tip;

rs/state_manager/src/split.rs

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
//! Prunes a replicated state, as part of a subnet split.
2+
use crate::{
3+
checkpoint::{load_checkpoint, make_checkpoint},
4+
tip::spawn_tip_thread,
5+
PageAllocatorFileDescriptorImpl, StateManagerMetrics, NUMBER_OF_CHECKPOINT_THREADS,
6+
};
7+
8+
use ic_config::state_manager::Config;
9+
use ic_logger::ReplicaLogger;
10+
use ic_metrics::MetricsRegistry;
11+
use ic_registry_routing_table::{CanisterIdRanges, RoutingTable};
12+
use ic_registry_subnet_type::SubnetType;
13+
use ic_replicated_state::{page_map::PageAllocatorFileDescriptor, ReplicatedState};
14+
use ic_state_layout::{CheckpointLayout, ReadOnly, StateLayout};
15+
use ic_types::{malicious_flags::MaliciousFlags, PrincipalId, SubnetId};
16+
use scoped_threadpool::Pool;
17+
use std::{path::PathBuf, sync::Arc};
18+
19+
#[cfg(test)]
20+
mod tests;
21+
22+
/// Loads the latest checkpoint under the given root; splits off the state of
23+
/// `subnet_id`, hosting the provided canister ID ranges; and writes back the
24+
/// split state as a new checkpoint, under the same root.
25+
pub fn split(
26+
root: PathBuf,
27+
subnet_id: PrincipalId,
28+
canister_id_ranges: CanisterIdRanges,
29+
metrics_registry: &MetricsRegistry,
30+
log: ReplicaLogger,
31+
) -> Result<(), String> {
32+
// Load latest checkpoint under `root`.
33+
let config = Config::new(root);
34+
let state_layout =
35+
StateLayout::try_new(log.clone(), config.state_root, metrics_registry).unwrap();
36+
37+
// A thread pool to use for reading and writing checkpoints.
38+
let mut thread_pool = Pool::new(NUMBER_OF_CHECKPOINT_THREADS);
39+
40+
// Create the file descriptor factory that is used to create files for PageMaps.
41+
let page_delta_path = state_layout.page_deltas();
42+
let fd_factory: Arc<dyn PageAllocatorFileDescriptor> = Arc::new(
43+
PageAllocatorFileDescriptorImpl::new(page_delta_path, config.file_backed_memory_allocator),
44+
);
45+
46+
let metrics = StateManagerMetrics::new(metrics_registry);
47+
let (cp, state) = read_checkpoint(
48+
&state_layout,
49+
&mut thread_pool,
50+
fd_factory.clone(),
51+
&metrics,
52+
)?;
53+
54+
// Set up the split.
55+
let subnet_id: SubnetId = subnet_id.into();
56+
let mut routing_table = RoutingTable::new();
57+
routing_table
58+
.assign_ranges(canister_id_ranges, subnet_id)
59+
.map_err(|e| format!("{:?}", e))?;
60+
61+
// Split the state.
62+
let split_state = state.split(subnet_id, &routing_table);
63+
64+
// Write the split state as a new checkpoint.
65+
write_checkpoint(
66+
&split_state,
67+
state_layout,
68+
&cp,
69+
&mut thread_pool,
70+
fd_factory,
71+
&metrics,
72+
log,
73+
)
74+
}
75+
76+
/// Reads the `ReplicatedState` from the latest checkpoint under `state_layout`.
77+
fn read_checkpoint(
78+
state_layout: &StateLayout,
79+
thread_pool: &mut Pool,
80+
fd_factory: Arc<dyn PageAllocatorFileDescriptor>,
81+
metrics: &StateManagerMetrics,
82+
) -> Result<(CheckpointLayout<ReadOnly>, ReplicatedState), String> {
83+
let height = *state_layout
84+
.checkpoint_heights()
85+
.map_err(|e| e.to_string())?
86+
.last()
87+
.ok_or(format!(
88+
"No checkpoints found at {}",
89+
state_layout.raw_path().display()
90+
))?;
91+
let cp = state_layout.checkpoint(height).map_err(|e| e.to_string())?;
92+
93+
let state = load_checkpoint(
94+
&cp,
95+
SubnetType::Application,
96+
&metrics.checkpoint_metrics,
97+
Some(thread_pool),
98+
fd_factory,
99+
)
100+
.map_err(|e| {
101+
format!(
102+
"Failed to load checkpoint at {}: {}",
103+
cp.raw_path().display(),
104+
e
105+
)
106+
})?;
107+
108+
Ok((cp, state))
109+
}
110+
111+
/// Writes the given `ReplicatedState` into a new checkpoint under
112+
/// `state_layout`, based off of `old_cp`.
113+
fn write_checkpoint(
114+
state: &ReplicatedState,
115+
state_layout: StateLayout,
116+
old_cp: &CheckpointLayout<ReadOnly>,
117+
thread_pool: &mut Pool,
118+
fd_factory: Arc<dyn PageAllocatorFileDescriptor>,
119+
metrics: &StateManagerMetrics,
120+
log: ReplicaLogger,
121+
) -> Result<(), String> {
122+
let old_height = old_cp.height();
123+
124+
let mut tip_handler = state_layout.capture_tip_handler();
125+
tip_handler
126+
.reset_tip_to(&state_layout, old_cp, Some(thread_pool))
127+
.map_err(|e| e.to_string())?;
128+
let (_tip_thread, tip_channel) = spawn_tip_thread(
129+
log,
130+
tip_handler,
131+
state_layout,
132+
metrics.clone(),
133+
MaliciousFlags::default(),
134+
);
135+
136+
make_checkpoint(
137+
state,
138+
old_height.increment(),
139+
&tip_channel,
140+
&metrics.checkpoint_metrics,
141+
thread_pool,
142+
fd_factory,
143+
)
144+
.map_err(|e| format!("Failed to write checkpoint: {}", e))?;
145+
146+
Ok(())
147+
}

0 commit comments

Comments
 (0)