Skip to content

Commit ac04623

Browse files
committed
feat(db): use write buffer to speed up state migrations
1 parent 60a8e5f commit ac04623

File tree

2 files changed

+154
-2
lines changed

2 files changed

+154
-2
lines changed

src/state_migration/mod.rs

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ use std::sync::{
99
use crate::networks::{ChainConfig, Height, NetworkChain};
1010
use crate::shim::clock::ChainEpoch;
1111
use crate::shim::state_tree::StateRoot;
12+
use ahash::{HashMap, HashMapExt};
1213
use cid::Cid;
1314
use fvm_ipld_blockstore::Blockstore;
1415
use fvm_ipld_encoding::CborStore;
16+
use itertools::Itertools;
17+
use parking_lot::RwLock;
1518

1619
pub(in crate::state_migration) mod common;
1720
mod nv17;
@@ -115,7 +118,8 @@ where
115118
if epoch == chain_config.epoch(height) {
116119
tracing::info!("Running {height} migration at epoch {epoch}");
117120
let start_time = std::time::Instant::now();
118-
let new_state = migrate(chain_config, db, parent_state, epoch)?;
121+
let db = Arc::new(BlockstoreWithWriteBuffer::new(db.clone()));
122+
let new_state = migrate(chain_config, &db, parent_state, epoch)?;
119123
let elapsed = start_time.elapsed();
120124
// `new_state_actors` is the Go state migration output, log for comparision
121125
let new_state_actors = db
@@ -144,5 +148,70 @@ where
144148
Ok(None)
145149
}
146150

151+
pub struct BlockstoreWithWriteBuffer<DB: Blockstore> {
152+
inner: DB,
153+
buffer: RwLock<HashMap<Cid, Vec<u8>>>,
154+
buffer_capacity: usize,
155+
}
156+
157+
impl<DB: Blockstore> Blockstore for BlockstoreWithWriteBuffer<DB> {
158+
fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
159+
if let Some(v) = self.buffer.read().get(k) {
160+
return Ok(Some(v.clone()));
161+
}
162+
self.inner.get(k)
163+
}
164+
165+
fn has(&self, k: &Cid) -> anyhow::Result<bool> {
166+
Ok(self.buffer.read().contains_key(k) || self.inner.has(k)?)
167+
}
168+
169+
fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
170+
{
171+
let mut buffer = self.buffer.write();
172+
buffer.insert(*k, block.to_vec());
173+
}
174+
self.flush_buffer_if_needed()
175+
}
176+
}
177+
178+
impl<DB: Blockstore> BlockstoreWithWriteBuffer<DB> {
179+
pub fn new(inner: DB) -> Self {
180+
Self::new_with_capacity(inner, 10000)
181+
}
182+
183+
pub fn new_with_capacity(inner: DB, buffer_capacity: usize) -> Self {
184+
Self {
185+
inner,
186+
buffer_capacity,
187+
buffer: RwLock::new(HashMap::with_capacity(buffer_capacity)),
188+
}
189+
}
190+
191+
fn flush_buffer(&self) -> anyhow::Result<()> {
192+
let records = {
193+
let mut buffer = self.buffer.write();
194+
buffer.drain().collect_vec()
195+
};
196+
self.inner.put_many_keyed(records)
197+
}
198+
199+
fn flush_buffer_if_needed(&self) -> anyhow::Result<()> {
200+
if self.buffer.read().len() >= self.buffer_capacity {
201+
self.flush_buffer()
202+
} else {
203+
Ok(())
204+
}
205+
}
206+
}
207+
208+
impl<DB: Blockstore> Drop for BlockstoreWithWriteBuffer<DB> {
209+
fn drop(&mut self) {
210+
if let Err(e) = self.flush_buffer() {
211+
tracing::warn!("{e}");
212+
}
213+
}
214+
}
215+
147216
#[cfg(test)]
148217
mod tests;

src/tool/subcommands/shed_cmd/migration.rs

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ use std::time::Instant;
77

88
use cid::Cid;
99
use clap::Args;
10+
use fvm_ipld_blockstore::Blockstore;
1011
use itertools::Itertools;
1112

13+
use crate::state_migration::BlockstoreWithWriteBuffer;
1214
use crate::utils::db::CborStoreExt;
1315
use crate::{
1416
blocks::CachingBlockHeader,
@@ -36,6 +38,9 @@ pub struct MigrateStateCommand {
3638
/// Filecoin network chain
3739
#[arg(long, required = true)]
3840
chain: NetworkChain,
41+
/// Size of database write buffer, use 0 to disable write buffer
42+
#[arg(long, default_value_t = 10000)]
43+
db_write_buffer: usize,
3944
}
4045

4146
impl MigrateStateCommand {
@@ -45,6 +50,7 @@ impl MigrateStateCommand {
4550
block_to_look_back,
4651
db,
4752
chain,
53+
db_write_buffer,
4854
} = self;
4955
let db = {
5056
let db = if let Some(db) = db {
@@ -53,7 +59,15 @@ impl MigrateStateCommand {
5359
let (_, config) = read_config(None, Some(chain.clone()))?;
5460
db_root(&chain_path(&config))?
5561
};
56-
load_db(&db)?
62+
let db = load_db(&db)?;
63+
Arc::new(if db_write_buffer > 0 {
64+
Either::Left(BlockstoreWithWriteBuffer::new_with_capacity(
65+
db,
66+
db_write_buffer,
67+
))
68+
} else {
69+
Either::Right(db)
70+
})
5771
};
5872
let block: CachingBlockHeader = db.get_cbor_required(&block_to_look_back)?;
5973
let chain_config = Arc::new(ChainConfig::from_chain(&chain));
@@ -91,3 +105,72 @@ pub(super) fn load_db(db_root: &Path) -> anyhow::Result<Arc<ManyCar<ParityDb>>>
91105
load_all_forest_cars(&db, &forest_car_db_dir)?;
92106
Ok(Arc::new(db))
93107
}
108+
109+
enum Either<A: Blockstore, B: Blockstore> {
110+
Left(A),
111+
Right(B),
112+
}
113+
114+
impl<A: Blockstore, B: Blockstore> Blockstore for Either<A, B> {
115+
fn has(&self, k: &Cid) -> anyhow::Result<bool> {
116+
match self {
117+
Self::Left(v) => v.has(k),
118+
Self::Right(v) => v.has(k),
119+
}
120+
}
121+
122+
#[allow(clippy::disallowed_types)]
123+
fn put<D>(
124+
&self,
125+
mh_code: multihash_codetable::Code,
126+
block: &fvm_ipld_blockstore::Block<D>,
127+
) -> anyhow::Result<Cid>
128+
where
129+
Self: Sized,
130+
D: AsRef<[u8]>,
131+
{
132+
match self {
133+
Self::Left(v) => v.put(mh_code, block),
134+
Self::Right(v) => v.put(mh_code, block),
135+
}
136+
}
137+
138+
#[allow(clippy::disallowed_types)]
139+
fn put_many<D, I>(&self, blocks: I) -> anyhow::Result<()>
140+
where
141+
Self: Sized,
142+
D: AsRef<[u8]>,
143+
I: IntoIterator<Item = (multihash_codetable::Code, fvm_ipld_blockstore::Block<D>)>,
144+
{
145+
match self {
146+
Self::Left(v) => v.put_many(blocks),
147+
Self::Right(v) => v.put_many(blocks),
148+
}
149+
}
150+
151+
fn put_many_keyed<D, I>(&self, blocks: I) -> anyhow::Result<()>
152+
where
153+
Self: Sized,
154+
D: AsRef<[u8]>,
155+
I: IntoIterator<Item = (Cid, D)>,
156+
{
157+
match self {
158+
Self::Left(v) => v.put_many_keyed(blocks),
159+
Self::Right(v) => v.put_many_keyed(blocks),
160+
}
161+
}
162+
163+
fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
164+
match self {
165+
Self::Left(v) => v.get(k),
166+
Self::Right(v) => v.get(k),
167+
}
168+
}
169+
170+
fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
171+
match self {
172+
Self::Left(v) => v.put_keyed(k, block),
173+
Self::Right(v) => v.put_keyed(k, block),
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)