Skip to content

Commit 5118c30

Browse files
committed
add time to log
add log add read_all_with_callback split state dump into multiple task add more log to indicate progress
1 parent 655a358 commit 5118c30

File tree

13 files changed

+384
-211
lines changed

13 files changed

+384
-211
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ cfx-tasks = { workspace = true }
8080
cfx-config = { workspace = true }
8181
cfxcore-types = { workspace = true }
8282
fallible-iterator = { workspace = true }
83+
chrono = { workspace = true }
8384

8485
[dev-dependencies]
8586
criterion = { workspace = true }

crates/client/src/state_dump.rs

Lines changed: 60 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,18 @@ use cfx_config::Configuration;
33
use cfx_rpc_eth_types::{AccountState, StateDump, EOA_STORAGE_ROOT_H256};
44
use cfx_rpc_primitives::Bytes;
55
use cfx_statedb::{StateDbExt, StateDbGeneric};
6-
use cfx_storage::{
7-
state_manager::StateManagerTrait, utils::to_key_prefix_iter_upper_bound,
8-
KeyValueDbIterableTrait,
9-
};
6+
use cfx_storage::state_manager::StateManagerTrait;
107
use cfx_types::{Address, Space, H256};
118
use cfxcore::NodeType;
12-
use fallible_iterator::FallibleIterator;
9+
use chrono::Utc;
1310
use keccak_hash::{keccak, KECCAK_EMPTY};
1411
use parking_lot::{Condvar, Mutex};
1512
use primitives::{
1613
Account, SkipInputCheck, StorageKey, StorageKeyWithSpace, StorageValue,
1714
};
1815
use rlp::Rlp;
1916
use std::{
20-
collections::{BTreeMap, HashMap, HashSet},
17+
collections::{BTreeMap, HashMap},
2118
ops::Deref,
2219
sync::Arc,
2320
thread,
@@ -32,6 +29,10 @@ pub struct StateDumpConfig {
3229
pub no_storage: bool,
3330
}
3431

32+
// This method will read all data (k, v) from the Conflux state tree (including
33+
// core space and espace accounts, code, storage, deposit, vote_list) into
34+
// memory at once, then parse and assemble them and assemble all account states
35+
// into a StateDump struct and return it
3536
pub fn dump_whole_state(
3637
conf: &mut Configuration, exit_cond_var: Arc<(Mutex<bool>, Condvar)>,
3738
config: &StateDumpConfig,
@@ -52,14 +53,19 @@ pub fn dump_whole_state(
5253
Ok(state_dump)
5354
}
5455

56+
// This method will iterate through the entire state tree, storing each found
57+
// account in a temporary map After iterating through all accounts, it will
58+
// retrieve the code and storage for each account, then call the callback method
59+
// Pass the AccountState as a parameter to the callback method, which will
60+
// handle the AccountState
5561
pub fn iterate_dump_whole_state<F: Fn(AccountState)>(
5662
conf: &mut Configuration, exit_cond_var: Arc<(Mutex<bool>, Condvar)>,
5763
config: &StateDumpConfig, callback: F,
5864
) -> Result<H256, String> {
5965
let (mut state_db, state_root) =
6066
prepare_state_db(conf, exit_cond_var, config)?;
6167

62-
export_space_accounts_with_iterator(
68+
export_space_accounts_with_callback(
6369
&mut state_db,
6470
Space::Ethereum,
6571
config,
@@ -74,7 +80,7 @@ fn prepare_state_db(
7480
conf: &mut Configuration, exit_cond_var: Arc<(Mutex<bool>, Condvar)>,
7581
config: &StateDumpConfig,
7682
) -> Result<(StateDbGeneric, H256), String> {
77-
println!("Preparing state...");
83+
println("Preparing state...");
7884
let (
7985
data_man,
8086
_,
@@ -140,6 +146,7 @@ fn prepare_state_db(
140146
fn export_space_accounts(
141147
state: &mut StateDbGeneric, space: Space, config: &StateDumpConfig,
142148
) -> Result<BTreeMap<Address, AccountState>, Box<dyn std::error::Error>> {
149+
println("Start to iterate state...");
143150
let empty_key = StorageKey::EmptyKey.with_space(space);
144151
let kv_pairs = state.read_all(empty_key, None)?;
145152

@@ -156,7 +163,7 @@ fn export_space_accounts(
156163
match storage_key_with_space.key {
157164
StorageKey::AccountKey(address_bytes) => {
158165
let address = Address::from_slice(address_bytes);
159-
println!("Find account: {:?}", address);
166+
println(&format!("Find account: {:?}", address));
160167
let account =
161168
Account::new_from_rlp(address, &Rlp::new(&value))?;
162169
accounts_map.insert(address, account);
@@ -208,16 +215,16 @@ fn export_space_accounts(
208215
codes_map.get(&address).cloned()
209216
} else {
210217
if let Some(code) = codes_map.get(&address) {
211-
println!("no-contract account have code: {:?}", code);
218+
println(&format!("no-contract account have code: {:?}", code));
212219
}
213220
None
214221
};
215222

216223
let storage = if is_contract {
217224
storage_map.get(&address).cloned()
218225
} else {
219-
if let Some(storage) = storage_map.get(&address) {
220-
println!("no-contract account have storage: {:?}", storage);
226+
if let Some(_storage) = storage_map.get(&address) {
227+
println(&format!("no-contract account have storage"));
221228
}
222229
None
223230
};
@@ -243,86 +250,61 @@ fn export_space_accounts(
243250
Ok(accounts)
244251
}
245252

246-
fn export_space_accounts_with_iterator<F: Fn(AccountState)>(
253+
pub fn export_space_accounts_with_callback<F: Fn(AccountState)>(
247254
state: &mut StateDbGeneric, space: Space, config: &StateDumpConfig,
248255
callback: F,
249256
) -> Result<(), Box<dyn std::error::Error>> {
250-
let empty_key = StorageKey::EmptyKey.with_space(space);
251-
let (kvs, maybe_kv_iterator) = state.read_all_iterator(empty_key)?;
252-
253-
let mut deleted_keys = HashSet::new();
257+
println("Start to iterate state...");
254258
let mut found_accounts = 0;
259+
let mut core_space_key_count: u64 = 0;
260+
let mut total_key_count: u64 = 0;
255261

256-
// Iterate key value pairs from delta trie and intermediate trie
257-
for (k, v) in kvs {
258-
let storage_key = StorageKeyWithSpace::from_delta_mpt_key(&k);
259-
let key = storage_key.to_key_bytes();
260-
deleted_keys.insert(key.clone());
261-
262-
let storage_key_with_space =
263-
StorageKeyWithSpace::from_key_bytes::<SkipInputCheck>(&key);
264-
if storage_key_with_space.space != space {
265-
continue;
266-
}
267-
268-
if let StorageKey::AccountKey(address_bytes) =
269-
storage_key_with_space.key
270-
{
271-
let address = Address::from_slice(address_bytes);
272-
println!("Find account: {:?}", address);
273-
let account = Account::new_from_rlp(address, &Rlp::new(&v))?;
274-
275-
let account_state = get_account_state(state, &account, config)?;
276-
callback(account_state);
277-
found_accounts += 1;
278-
279-
if config.limit > 0 && found_accounts >= config.limit as usize {
280-
break;
281-
}
282-
} else {
283-
continue;
284-
}
285-
}
286-
287-
let lower_bound_incl = empty_key.to_key_bytes();
288-
let upper_bound_excl = to_key_prefix_iter_upper_bound(&lower_bound_incl);
262+
for i in 0..=255 {
263+
let prefix = [i];
264+
let start_key = StorageKey::AddressPrefixKey(&prefix).with_space(space);
289265

290-
if let Some(mut kv_iterator) = maybe_kv_iterator {
291-
let mut kvs = kv_iterator
292-
.iter_range(
293-
lower_bound_incl.as_slice(),
294-
upper_bound_excl.as_ref().map(|v| &**v),
295-
)?
296-
.take();
297-
298-
while let Some((key, value)) = kvs.next()? {
299-
if deleted_keys.contains(&key) {
300-
continue;
301-
}
266+
let mut account_states = BTreeMap::new();
302267

268+
let mut inner_callback = |(key, value): (Vec<u8>, Box<[u8]>)| {
269+
total_key_count += 1;
303270
let storage_key_with_space =
304271
StorageKeyWithSpace::from_key_bytes::<SkipInputCheck>(&key);
305272
if storage_key_with_space.space != space {
306-
continue;
273+
core_space_key_count += 1;
274+
return;
275+
}
276+
277+
if total_key_count % 10000 == 0 {
278+
println(&format!(
279+
"total_key_count: {}, core_space_key_count: {}",
280+
total_key_count, core_space_key_count
281+
));
307282
}
308283

309284
if let StorageKey::AccountKey(address_bytes) =
310285
storage_key_with_space.key
311286
{
312287
let address = Address::from_slice(address_bytes);
313-
println!("Find account: {:?}", address);
314-
let account =
315-
Account::new_from_rlp(address, &Rlp::new(&value))?;
288+
println(&format!("Find account: {:?}", address));
289+
let account = Account::new_from_rlp(address, &Rlp::new(&value))
290+
.expect("Failed to decode account");
316291

317-
let account_state = get_account_state(state, &account, config)?;
318-
callback(account_state);
319-
found_accounts += 1;
292+
account_states.insert(address, account);
293+
}
294+
};
320295

321-
if config.limit > 0 && found_accounts >= config.limit as usize {
322-
break;
323-
}
324-
} else {
325-
continue;
296+
state.read_all_with_callback(start_key, &mut inner_callback)?;
297+
298+
if account_states.len() > 0 {
299+
println("Start to read account code and storage data...");
300+
}
301+
302+
for (_address, account) in account_states {
303+
let account_state = get_account_state(state, &account, config)?;
304+
callback(account_state);
305+
found_accounts += 1;
306+
if config.limit > 0 && found_accounts >= config.limit as usize {
307+
break;
326308
}
327309
}
328310
}
@@ -369,3 +351,7 @@ fn get_account_state(
369351
address_hash: Some(address_hash),
370352
})
371353
}
354+
355+
fn println(message: &str) {
356+
println!("[{}] {}", Utc::now().format("%Y-%m-%d %H:%M:%S"), message);
357+
}

crates/dbs/statedb/src/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,12 +188,12 @@ mod impls {
188188
self.delete_all::<access_mode::Read>(key_prefix, debug_record)
189189
}
190190

191-
pub fn read_all_iterator(
191+
pub fn read_all_with_callback(
192192
&mut self, access_key_prefix: StorageKeyWithSpace,
193-
) -> Result<(Vec<MptKeyValue>, Option<KvdbSqliteSharded<Box<[u8]>>>)>
194-
{
193+
callback: &mut dyn FnMut(MptKeyValue),
194+
) -> Result<()> {
195195
self.storage
196-
.read_all_iterator(access_key_prefix)
196+
.read_all_with_callback(access_key_prefix, callback)
197197
.map_err(|err| err.into())
198198
}
199199

@@ -544,7 +544,7 @@ mod impls {
544544
};
545545
use cfx_storage::{
546546
utils::{access_mode, to_key_prefix_iter_upper_bound},
547-
KvdbSqliteSharded, MptKeyValue, StorageStateTrait,
547+
MptKeyValue, StorageStateTrait,
548548
};
549549
use cfx_types::{
550550
address_util::AddressUtil, Address, AddressWithSpace, Space,

crates/dbs/storage/src/impls/delta_mpt/cow_node_ref.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,57 @@ impl CowNodeRef {
622622
Ok(())
623623
}
624624

625+
pub fn iterate_internal_with_callback(
626+
&self, owned_node_set: &OwnedNodeSet, trie: &DeltaMpt,
627+
guarded_trie_node: GuardedMaybeOwnedTrieNodeAsCowCallParam,
628+
key_prefix: CompressedPathRaw, db: &mut DeltaDbOwnedReadTraitObj,
629+
callback: &mut dyn FnMut(MptKeyValue),
630+
) -> Result<()> {
631+
if guarded_trie_node.as_ref().as_ref().has_value() {
632+
assert!(CompressedPathRaw::has_second_nibble(
633+
key_prefix.path_mask()
634+
));
635+
callback((
636+
key_prefix.path_slice().to_vec(),
637+
guarded_trie_node.as_ref().as_ref().value_clone().unwrap(),
638+
));
639+
}
640+
641+
let children_table =
642+
guarded_trie_node.as_ref().as_ref().children_table.clone();
643+
// Free the lock for trie_node.
644+
// FIXME: try to share the lock.
645+
drop(guarded_trie_node);
646+
647+
let node_memory_manager = trie.get_node_memory_manager();
648+
let allocator = node_memory_manager.get_allocator();
649+
for (i, node_ref) in children_table.iter() {
650+
let mut cow_child_node =
651+
Self::new((*node_ref).into(), owned_node_set, self.mpt_id);
652+
let child_node = cow_child_node.get_trie_node(
653+
node_memory_manager,
654+
&allocator,
655+
db,
656+
)?;
657+
let key_prefix = CompressedPathRaw::join_connected_paths(
658+
&key_prefix,
659+
i,
660+
&child_node.compressed_path_ref(),
661+
);
662+
let child_node = GuardedValue::take(child_node);
663+
cow_child_node.iterate_internal_with_callback(
664+
owned_node_set,
665+
trie,
666+
child_node,
667+
key_prefix,
668+
db,
669+
callback,
670+
)?;
671+
}
672+
673+
Ok(())
674+
}
675+
625676
/// Recursively commit dirty nodes.
626677
pub fn commit_dirty_recursively<
627678
Transaction: BorrowMut<DeltaDbTransactionTraitObj>,

0 commit comments

Comments
 (0)