diff --git a/Cargo.lock b/Cargo.lock index f400ab177d36e..fa32ec77da8f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1977,7 +1977,7 @@ dependencies = [ "bitflags 2.9.4", "cexpr", "clang-sys", - "itertools 0.13.0", + "itertools 0.10.5", "proc-macro2 1.0.95", "quote 1.0.40", "regex", @@ -9243,8 +9243,7 @@ dependencies = [ [[package]] name = "kvdb" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7d770dcb02bf6835887c3a979b5107a04ff4bbde97a5f0928d27404a155add9" +source = "git+https://github.com/Harrm/parity-common.git#ec191b2a5573d71737e61b2bd2cbea680669ecd3" dependencies = [ "smallvec", ] @@ -9252,8 +9251,7 @@ dependencies = [ [[package]] name = "kvdb-memorydb" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf7a85fe66f9ff9cd74e169fdd2c94c6e1e74c412c99a73b4df3200b5d3760b2" +source = "git+https://github.com/Harrm/parity-common.git#ec191b2a5573d71737e61b2bd2cbea680669ecd3" dependencies = [ "kvdb", "parking_lot 0.12.3", @@ -9262,8 +9260,7 @@ dependencies = [ [[package]] name = "kvdb-rocksdb" version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8beb5ce840610e5a945f0306f6e7a2d5b3e68ea3e64e9a4f081fa4ee5aa6525" +source = "git+https://github.com/Harrm/parity-common.git#ec191b2a5573d71737e61b2bd2cbea680669ecd3" dependencies = [ "kvdb", "num_cpus", @@ -9275,8 +9272,7 @@ dependencies = [ [[package]] name = "kvdb-shared-tests" version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64d3b4e3e80c369f1b5364b6acdeba9b8a02285e91a5570f7c0404b7f9024541" +source = "git+https://github.com/Harrm/parity-common.git#ec191b2a5573d71737e61b2bd2cbea680669ecd3" dependencies = [ "kvdb", ] @@ -17945,7 +17941,7 @@ checksum = "f8650aabb6c35b860610e9cff5dc1af886c9e25073b7b1712a68972af4281302" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.13.0", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -23207,6 +23203,7 @@ name = "sp-database" version = "10.0.0" dependencies = [ "kvdb", + "kvdb-rocksdb", "parking_lot 0.12.3", ] diff --git a/Cargo.toml b/Cargo.toml index d5e9c32cb3a49..bcf649b7ba4dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -890,10 +890,10 @@ jsonrpsee = { version = "0.24.3" } jsonrpsee-core = { version = "0.24.3" } k256 = { version = "0.13.4", default-features = false } kitchensink-runtime = { path = "substrate/bin/node/runtime" } -kvdb = { version = "0.13.0" } -kvdb-memorydb = { version = "0.13.0" } -kvdb-rocksdb = { version = "0.20.0" } -kvdb-shared-tests = { version = "0.11.0" } +kvdb = { git = "https://github.com/Harrm/parity-common.git" } +kvdb-memorydb = { git = "https://github.com/Harrm/parity-common.git" } +kvdb-rocksdb = { git = "https://github.com/Harrm/parity-common.git" } +kvdb-shared-tests = { git = "https://github.com/Harrm/parity-common.git" } landlock = { version = "0.3.0" } libc = { version = "0.2.155" } libfuzzer-sys = { version = "0.4" } diff --git a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs index 50ee9c1219d91..532e090ce3b98 100644 --- a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs +++ b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs @@ -226,7 +226,7 @@ impl RelayChainInterface for RelayChainInProcessInterface { key: &[u8], ) -> RelayChainResult> { let state = self.backend.state_at(relay_parent, TrieCacheContext::Untrusted)?; - state.storage(key).map_err(RelayChainError::GenericError) + state.storage(key).map_err(|e| RelayChainError::GenericError(e.to_string())) } async fn prove_read( diff --git a/substrate/client/db/Cargo.toml b/substrate/client/db/Cargo.toml index 4c7296032f2b9..ffbbac2a4c2a7 100644 --- a/substrate/client/db/Cargo.toml +++ b/substrate/client/db/Cargo.toml @@ -56,10 +56,10 @@ substrate-test-runtime-client = { workspace = true } tempfile = { workspace = true } [features] -default = [] +default = ["rocksdb"] test-helpers = [] runtime-benchmarks = [ "kitchensink-runtime/runtime-benchmarks", "sp-runtime/runtime-benchmarks", ] -rocksdb = ["kvdb-rocksdb"] +rocksdb = ["kvdb-rocksdb", "sp-database/rocksdb"] diff --git a/substrate/client/db/src/archive_db.rs b/substrate/client/db/src/archive_db.rs new file mode 100644 index 0000000000000..d334b7a210718 --- /dev/null +++ b/substrate/client/db/src/archive_db.rs @@ -0,0 +1,226 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::sync::Arc; + +use codec::{Decode, Encode}; +use sc_client_api::ChildInfo; +use sp_core::Hasher; +use sp_database::{Database, DatabaseWithSeekableIterator}; +use sp_runtime::traits::{BlakeTwo256, BlockNumber, HashingFor, Header}; +use sp_state_machine::{ + BackendTransaction, ChildStorageCollection, DefaultError, IterArgs, StorageCollection, + StorageKey, StorageValue, UsageInfo, +}; +use sp_trie::MerkleValue; + +use crate::{columns, BlockT, DbHash, StateBackend, StateMachineStats, StorageDb}; + +pub(crate) struct ArchiveDb { + db: Arc>, + parent_hash: Option, + block_number: <::Header as Header>::Number, +} + +impl std::fmt::Debug for ArchiveDb { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ArchiveDb").field("parent_hash", &self.parent_hash).finish() + } +} + +impl ArchiveDb { + pub(crate) fn new( + db: Arc>, + parent_hash: Option, + block_number: <::Header as Header>::Number, + ) -> Self { + Self { db, parent_hash, block_number } + } + + pub(crate) fn storage(&self, key: &[u8]) -> Result, DefaultError> { + let full_key = make_full_key(key, self.block_number); + let mut iter = self + .db + .seekable_iter(columns::ARCHIVE) + .expect("Archive column space must exist if ArchiveDb exists"); + iter.seek_prev(&full_key); + + if let Some((found_key, value)) = iter.get() { + if extract_key::<<::Header as Header>::Number>(&found_key) == key { + return Ok(Some(value.to_owned())); + } + } + Ok(None) + } + + pub(crate) fn storage_hash( + &self, + key: &[u8], + ) -> Result as hash_db::Hasher>::Out>, DefaultError> { + let full_key = make_full_key(key, self.block_number); + + if let Some(value) = self.db.get(columns::ARCHIVE, &full_key) { + Ok(Some(HashingFor::::hash(&value))) + } else { + Ok(None) + } + } + + pub(crate) fn child_storage( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result, DefaultError> { + todo!() + } + + pub(crate) fn child_storage_hash( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result as hash_db::Hasher>::Out>, DefaultError> { + todo!() + } + + pub(crate) fn exists_storage(&self, key: &[u8]) -> Result { + todo!() + } + + pub(crate) fn exists_child_storage( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result { + todo!() + } + + pub(crate) fn next_storage_key(&self, key: &[u8]) -> Result, DefaultError> { + todo!() + } + + pub(crate) fn next_child_storage_key( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result, DefaultError> { + todo!() + } + + pub(crate) fn raw_iter(&self, args: IterArgs) -> Result, DefaultError> { + todo!() + } + + pub(crate) fn register_overlay_stats(&self, _stats: &crate::StateMachineStats) { + todo!() + } + + pub(crate) fn usage_info(&self) -> UsageInfo { + todo!() + } + + pub(crate) fn wipe(&self) -> Result<(), DefaultError> { + unimplemented!() + } + + pub(crate) fn commit( + &self, + _: as Hasher>::Out, + _: BackendTransaction>, + _: StorageCollection, + _: ChildStorageCollection, + ) -> Result<(), DefaultError> { + unimplemented!() + } + + pub(crate) fn read_write_count(&self) -> (u32, u32, u32, u32) { + unimplemented!() + } + + pub(crate) fn reset_read_write_count(&self) { + unimplemented!() + } + + pub(crate) fn get_read_and_written_keys(&self) -> Vec<(Vec, u32, u32, bool)> { + unimplemented!() + } +} + +pub struct RawIter { + _phantom: std::marker::PhantomData, +} + +impl RawIter { + pub(crate) fn next_key( + &mut self, + backend: &ArchiveDb, + ) -> Option> { + unimplemented!() + } + + pub(crate) fn next_pair( + &mut self, + backend: &ArchiveDb, + ) -> Option> { + unimplemented!() + } + + pub(crate) fn was_complete(&self) -> bool { + unimplemented!() + } +} + +pub(crate) fn make_full_key(key: &[u8], number: impl Encode) -> Vec { + let mut full_key = Vec::with_capacity(key.len() + number.encoded_size()); + full_key.extend_from_slice(&key[..]); + number.encode_to(&mut &mut full_key); + full_key +} + +pub(crate) fn extract_key(full_key: &[u8]) -> &[u8] { + let key_len = full_key.len() - + BlockNumber::encoded_fixed_size() + .expect("Variable length block numbers can't be used for archive storage"); + &full_key[..key_len] +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::columns::ARCHIVE; + + use sp_database::{Change, MemDb, Transaction}; + use sp_runtime::testing::{Block, MockCallU64, TestXt}; + + type TestBlock = Block>; + + #[test] + fn set_get() { + let mut mem_db = Arc::new(MemDb::new()); + mem_db.commit(Transaction(vec![ + Change::::Set(ARCHIVE, make_full_key(&[1, 2, 3], 4u64), vec![4, 2]), + Change::::Set(ARCHIVE, make_full_key(&[1, 2, 3], 6u64), vec![5, 2]) + ])); + let archive_db = ArchiveDb::::new(mem_db.clone(), Some(sp_core::H256::default()), 5); + assert_eq!(archive_db.storage(&[1, 2, 3]), Ok(Some(vec![4u8, 2u8]))); + + let archive_db = ArchiveDb::::new(mem_db, Some(sp_core::H256::default()), 7); + assert_eq!(archive_db.storage(&[1, 2, 3]), Ok(Some(vec![5u8, 2u8]))); + + } +} diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index 05696bdc74d56..f8931a1341a85 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -32,6 +32,7 @@ pub mod offchain; pub mod bench; +mod archive_db; mod children; mod parity_db; mod pinned_blocks_cache; @@ -53,6 +54,7 @@ use std::{ }; use crate::{ + archive_db::{make_full_key, ArchiveDb}, pinned_blocks_cache::PinnedBlocksCache, record_stats_state::RecordStatsState, stats::StateUsageStats, @@ -77,7 +79,7 @@ use sp_core::{ offchain::OffchainOverlayedChange, storage::{well_known_keys, ChildInfo}, }; -use sp_database::Transaction; +use sp_database::{DatabaseWithSeekableIterator, Transaction}; use sp_runtime::{ generic::BlockId, traits::{ @@ -88,7 +90,7 @@ use sp_runtime::{ }; use sp_state_machine::{ backend::{AsTrieBackend, Backend as StateBackend}, - BackendTransaction, ChildStorageCollection, DBValue, IndexOperation, IterArgs, + BackendTransaction, ChildStorageCollection, DBValue, DefaultError, IndexOperation, IterArgs, OffchainChangesCollection, StateMachineStats, StorageCollection, StorageIterator, StorageKey, StorageValue, UsageInfo as StateUsageInfo, }; @@ -442,6 +444,8 @@ pub(crate) mod columns { /// Transactions pub const TRANSACTION: u32 = 11; pub const BODY_INDEX: u32 = 12; + // Diffs for archive blocks + pub const ARCHIVE: u32 = 13; } struct PendingBlock { @@ -828,7 +832,7 @@ impl HeaderMetadata for BlockchainDb { /// Database transaction pub struct BlockImportOperation { - old_state: RecordStatsState, Block>, + old_state: RecordStatsState, Block>, db_updates: PrefixedMemoryDB>, storage_updates: StorageCollection, child_storage_updates: ChildStorageCollection, @@ -899,7 +903,7 @@ impl BlockImportOperation { impl sc_client_api::backend::BlockImportOperation for BlockImportOperation { - type State = RecordStatsState, Block>; + type State = RecordStatsState, Block>; fn state(&self) -> ClientResult> { Ok(Some(&self.old_state)) @@ -998,7 +1002,6 @@ impl sc_client_api::backend::BlockImportOperation self.create_gap = create_gap; } } - struct StorageDb { pub db: Arc>, pub state_db: StateDb, StateMetaDb>, @@ -1109,6 +1112,7 @@ impl FrozenForDuration { /// blocks. Otherwise, trie nodes are kept only from some recent blocks. pub struct Backend { storage: Arc>, + db: BackendDatabase, offchain_storage: offchain::LocalStorage, blockchain: BlockchainDb, canonicalization_delay: u64, @@ -1121,6 +1125,30 @@ pub struct Backend { shared_trie_cache: Option>>, } +#[derive(Clone)] +enum BackendDatabase { + Database(Arc>), + DatabaseWithSeekableIterator(Arc>), +} + +impl Into>> for BackendDatabase { + fn into(self) -> Arc> { + match self { + BackendDatabase::Database(db) => db.clone(), + BackendDatabase::DatabaseWithSeekableIterator(db) => db.clone(), + } + } +} + +impl AsRef> for BackendDatabase { + fn as_ref(&self) -> &(dyn Database + 'static) { + match self { + BackendDatabase::Database(db) => db.as_ref(), + BackendDatabase::DatabaseWithSeekableIterator(db) => db.as_ref(), + } + } +} + impl Backend { /// Create a new instance of database backend. /// @@ -1141,7 +1169,7 @@ impl Backend { Err(as_is) => return Err(as_is.into()), }; - Self::from_database(db as Arc<_>, canonicalization_delay, &db_config, needs_init) + Self::from_database(db, canonicalization_delay, &db_config, needs_init) } /// Reset the shared trie cache. @@ -1209,13 +1237,15 @@ impl Backend { } fn from_database( - db: Arc>, + backend_db: BackendDatabase, canonicalization_delay: u64, config: &DatabaseSettings, should_init: bool, ) -> ClientResult { let mut db_init_transaction = Transaction::new(); + let db: Arc> = backend_db.clone().into(); + let requested_state_pruning = config.state_pruning.clone(); let state_meta_db = StateMetaDb(db.clone()); let map_e = sp_blockchain::Error::from_state_db; @@ -1257,6 +1287,7 @@ impl Backend { let backend = Backend { storage: Arc::new(storage_db), + db: backend_db, offchain_storage, blockchain, canonicalization_delay, @@ -1602,16 +1633,17 @@ impl Backend { let mut ops: u64 = 0; let mut bytes: u64 = 0; - for (key, value) in operation - .storage_updates - .iter() - .chain(operation.child_storage_updates.iter().flat_map(|(_, s)| s.iter())) - { + for (key, value) in operation.storage_updates.drain(..).chain( + operation.child_storage_updates.drain(..).flat_map(|(_, s)| s.into_iter()), + ) { ops += 1; bytes += key.len() as u64; if let Some(v) = value.as_ref() { bytes += v.len() as u64; } + let full_key = make_full_key(&key, pending_block.header.number()); + println!("#{:?}: {:?} -> {:?}", pending_block.header.hash(), key, value); + transaction.set_from_vec(columns::ARCHIVE, &full_key, value.encode()); } self.state_usage.tally_writes(ops, bytes); let number_u64 = number.saturated_into::(); @@ -2004,13 +2036,24 @@ impl Backend { Ok(()) } - fn empty_state(&self) -> RecordStatsState, Block> { + fn empty_state(&self) -> RecordStatsState, Block> { let root = EmptyStorage::::new().0; // Empty trie let db_state = DbStateBuilder::>::new(self.storage.clone(), root) .with_optional_cache(self.shared_trie_cache.as_ref().map(|c| c.local_cache_untrusted())) .build(); - let state = RefTrackingState::new(db_state, self.storage.clone(), None); - RecordStatsState::new(state, None, self.state_usage.clone()) + let trie_state = RefTrackingState::new(db_state, self.storage.clone(), None); + let archive_state = match &self.db { + BackendDatabase::DatabaseWithSeekableIterator(db) => { + Some(ArchiveDb::new(db.clone(), None, ::Number::zero())) + }, + _ => None + }; + + RecordStatsState::new( + TrieOrArchiveState { trie_state: Some(trie_state), archive_state }, + None, + self.state_usage.clone(), + ) } } @@ -2129,10 +2172,248 @@ where } } +#[derive(Debug)] +pub struct TrieOrArchiveState { + trie_state: Option>, + archive_state: Option>, +} + +pub enum TrieOrArchiveStateIter { + TrieIter( as StateBackend>>::RawIter), + ArchiveIter(archive_db::RawIter), +} + +impl StorageIterator> for TrieOrArchiveStateIter { + type Backend = TrieOrArchiveState; + + type Error = DefaultError; + + fn next_key( + &mut self, + backend: &Self::Backend, + ) -> Option> { + match self { + TrieOrArchiveStateIter::TrieIter(iter) => { + if let Some(trie_state) = &backend.trie_state { + iter.next_key(trie_state) + } else { + Some(Err( + "Trie iterator is used, but trie data does not exist for this state".into() + )) + } + }, + TrieOrArchiveStateIter::ArchiveIter(iter) => { + if let Some(archive_state) = &backend.archive_state { + iter.next_key(archive_state) + } else { + Some(Err( + "Archive iterator is used, but archive data does not exist for this state" + .into(), + )) + } + }, + } + } + + fn next_pair( + &mut self, + backend: &Self::Backend, + ) -> Option> { + todo!() + } + + fn was_complete(&self) -> bool { + todo!() + } +} + +impl StateBackend> for TrieOrArchiveState { + type Error = DefaultError; + + type TrieBackendStorage = + as AsTrieBackend>>::TrieBackendStorage; + + type RawIter = TrieOrArchiveStateIter; + + fn storage(&self, key: &[u8]) -> Result, Self::Error> { + if let Some(trie_state) = &self.trie_state { + trie_state.storage(key) + } else if let Some(archive_state) = &self.archive_state { + archive_state.storage(key) + } else { + Err("No storage exists for this state".into()) + } + } + + fn storage_hash( + &self, + key: &[u8], + ) -> Result as hash_db::Hasher>::Out>, Self::Error> { + if let Some(trie_state) = &self.trie_state { + trie_state.storage_hash(key) + } else if let Some(archive_state) = &self.archive_state { + archive_state.storage_hash(key) + } else { + Err("No storage exists for this state".into()) + } + } + + fn closest_merkle_value( + &self, + key: &[u8], + ) -> Result as hash_db::Hasher>::Out>>, Self::Error> { + if let Some(trie_state) = &self.trie_state { + trie_state.closest_merkle_value(key) + } else { + Ok(None) + } + } + + fn child_closest_merkle_value( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result as hash_db::Hasher>::Out>>, Self::Error> { + if let Some(trie_state) = &self.trie_state { + trie_state.child_closest_merkle_value(child_info, key) + } else { + Ok(None) + } + } + + fn child_storage( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result, Self::Error> { + if let Some(trie_state) = &self.trie_state { + trie_state.child_storage(child_info, key) + } else if let Some(archive_state) = &self.archive_state { + archive_state.child_storage(child_info, key) + } else { + Err("No storage exists for this state".into()) + } + } + + fn child_storage_hash( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result as hash_db::Hasher>::Out>, Self::Error> { + if let Some(trie_state) = &self.trie_state { + trie_state.child_storage_hash(child_info, key) + } else if let Some(archive_state) = &self.archive_state { + archive_state.child_storage_hash(child_info, key) + } else { + Err("No storage exists for this state".into()) + } + } + + fn next_storage_key(&self, key: &[u8]) -> Result, Self::Error> { + if let Some(trie_state) = &self.trie_state { + trie_state.next_storage_key(key) + } else if let Some(archive_state) = &self.archive_state { + archive_state.next_storage_key(key) + } else { + Err("No storage exists for this state".into()) + } + } + + fn next_child_storage_key( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result, Self::Error> { + if let Some(trie_state) = &self.trie_state { + trie_state.next_child_storage_key(child_info, key) + } else if let Some(archive_state) = &self.archive_state { + archive_state.next_child_storage_key(child_info, key) + } else { + Err("No storage exists for this state".into()) + } + } + + fn storage_root<'a>( + &self, + delta: impl Iterator)>, + state_version: StateVersion, + ) -> ( as hash_db::Hasher>::Out, BackendTransaction>) + where + as hash_db::Hasher>::Out: Ord, + { + if let Some(trie_state) = &self.trie_state { + trie_state.storage_root(delta, state_version) + } else { + todo!() + } + } + + fn child_storage_root<'a>( + &self, + child_info: &ChildInfo, + delta: impl Iterator)>, + state_version: StateVersion, + ) -> ( as hash_db::Hasher>::Out, bool, BackendTransaction>) + where + as hash_db::Hasher>::Out: Ord, + { + if let Some(trie_state) = &self.trie_state { + trie_state.child_storage_root(child_info, delta, state_version) + } else { + todo!() + } + } + + fn raw_iter(&self, args: IterArgs) -> Result { + if let Some(trie_state) = &self.trie_state { + trie_state.raw_iter(args).map(|iter| TrieOrArchiveStateIter::TrieIter(iter)) + } else if let Some(archive_state) = &self.archive_state { + archive_state + .raw_iter(args) + .map(|iter| TrieOrArchiveStateIter::ArchiveIter(iter)) + } else { + Err("No storage exists for this state".into()) + } + } + + fn register_overlay_stats(&self, stats: &sp_state_machine::StateMachineStats) { + if let Some(trie_state) = &self.trie_state { + trie_state.register_overlay_stats(stats) + } else if let Some(archive_state) = &self.archive_state { + archive_state.register_overlay_stats(stats) + } + } + + fn usage_info(&self) -> StateUsageInfo { + if let Some(trie_state) = &self.trie_state { + trie_state.usage_info() + } else if let Some(archive_state) = &self.archive_state { + archive_state.usage_info() + } else { + StateUsageInfo::empty() + } + } +} + +impl AsTrieBackend> for TrieOrArchiveState { + type TrieBackendStorage = + as AsTrieBackend>>::TrieBackendStorage; + + fn as_trie_backend( + &self, + ) -> &sp_state_machine::TrieBackend< + Self::TrieBackendStorage, + HashingFor, + sp_trie::cache::LocalTrieCache>, + > { + todo!() + } +} + impl sc_client_api::backend::Backend for Backend { type BlockImportOperation = BlockImportOperation; type Blockchain = BlockchainDb; - type State = RecordStatsState, Block>; + type State = RecordStatsState, Block>; type OffchainStorage = offchain::LocalStorage; fn begin_operation(&self) -> ClientResult { @@ -2539,8 +2820,24 @@ impl sc_client_api::backend::Backend for Backend { })) .build(); - let state = RefTrackingState::new(db_state, self.storage.clone(), None); - return Ok(RecordStatsState::new(state, None, self.state_usage.clone())); + let trie_state = Some(RefTrackingState::new(db_state, self.storage.clone(), None)); + let archive_state = match &self.db { + BackendDatabase::DatabaseWithSeekableIterator(db) => { + Some(ArchiveDb::new( + db.clone(), + Some(hash), + ::Number::zero(), + )) + }, + _ => None + }; + let trie_or_archive_state = TrieOrArchiveState { trie_state, archive_state }; + + return Ok(RecordStatsState::new( + trie_or_archive_state, + None, + self.state_usage.clone(), + )); } } @@ -2552,7 +2849,7 @@ impl sc_client_api::backend::Backend for Backend { .is_some() }; - if let Ok(()) = + let trie_state = if let Ok(()) = self.storage.state_db.pin(&hash, hdr.number.saturated_into::(), hint) { let root = hdr.state_root; @@ -2566,13 +2863,25 @@ impl sc_client_api::backend::Backend for Backend { } })) .build(); - let state = RefTrackingState::new(db_state, self.storage.clone(), Some(hash)); - Ok(RecordStatsState::new(state, Some(hash), self.state_usage.clone())) + let state: RefTrackingState = + RefTrackingState::new(db_state, self.storage.clone(), Some(hash)); + Some(state) } else { - Err(sp_blockchain::Error::UnknownBlock(format!( - "State already discarded for {hash:?}", - ))) - } + None + }; + let archive_state = match &self.db { + BackendDatabase::DatabaseWithSeekableIterator(db) => { + Some(ArchiveDb::new(db.clone(), Some(hash), hdr.number)) + }, + _ => None + }; + + let trie_or_archive_state = TrieOrArchiveState { trie_state, archive_state }; + Ok(RecordStatsState::new( + trie_or_archive_state, + Some(hash), + self.state_usage.clone(), + )) }, Err(e) => Err(e), } diff --git a/substrate/client/db/src/utils.rs b/substrate/client/db/src/utils.rs index a79f5ab3ac7d9..a78e4c67d7337 100644 --- a/substrate/client/db/src/utils.rs +++ b/substrate/client/db/src/utils.rs @@ -22,8 +22,9 @@ use std::{fmt, fs, io, path::Path, sync::Arc}; use log::{debug, info}; +use sp_database::DatabaseWithSeekableIterator; -use crate::{Database, DatabaseSource, DbHash}; +use crate::{BackendDatabase, Database, DatabaseSource, DbHash}; use codec::Decode; use sc_client_api::blockchain::{BlockGap, BlockGapType}; use sp_database::Transaction; @@ -38,7 +39,7 @@ use sp_trie::DBValue; /// Number of columns in the db. Must be the same for both full && light dbs. /// Otherwise RocksDb will fail to open database && check its type. -pub const NUM_COLUMNS: u32 = 13; +pub const NUM_COLUMNS: u32 = 14; /// Meta column. The set of keys in the column is shared by full && light storages. pub const COLUMN_META: u32 = 0; @@ -199,21 +200,22 @@ fn open_database_at( db_type: DatabaseType, create: bool, ) -> OpenDbResult { - let db: Arc> = match &db_source { + let db: BackendDatabase = match &db_source { DatabaseSource::ParityDb { path } => open_parity_db::(path, db_type, create)?, #[cfg(feature = "rocksdb")] - DatabaseSource::RocksDb { path, cache_size } => + DatabaseSource::RocksDb { path, cache_size } => BackendDatabase::DatabaseWithSeekableIterator( open_kvdb_rocksdb::(path, db_type, create, *cache_size)?, + ), DatabaseSource::Custom { db, require_create_flag } => { if *require_create_flag && !create { return Err(OpenDbError::DoesNotExist); } - db.clone() + BackendDatabase::Database(db.clone()) }, DatabaseSource::Auto { paritydb_path, rocksdb_path, cache_size } => { // check if rocksdb exists first, if not, open paritydb match open_kvdb_rocksdb::(rocksdb_path, db_type, false, *cache_size) { - Ok(db) => db, + Ok(rocksdb) => BackendDatabase::DatabaseWithSeekableIterator(rocksdb), Err(OpenDbError::NotEnabled(_)) | Err(OpenDbError::DoesNotExist) => open_parity_db::(paritydb_path, db_type, create)?, Err(as_is) => return Err(as_is), @@ -221,7 +223,7 @@ fn open_database_at( }, }; - check_database_type(&*db, db_type)?; + check_database_type(db.as_ref(), db_type)?; Ok(db) } @@ -239,7 +241,7 @@ pub enum OpenDbError { }, } -type OpenDbResult = Result>, OpenDbError>; +type OpenDbResult = Result; impl fmt::Display for OpenDbError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -292,11 +294,11 @@ impl From for OpenDbError { fn open_parity_db(path: &Path, db_type: DatabaseType, create: bool) -> OpenDbResult { match crate::parity_db::open(path, db_type, create, false) { - Ok(db) => Ok(db), + Ok(db) => Ok(BackendDatabase::Database(db)), Err(parity_db::Error::InvalidConfiguration(_)) => { log::warn!("Invalid parity db configuration, attempting database metadata update."); // Try to update the database with the new config - Ok(crate::parity_db::open(path, db_type, create, true)?) + Ok(BackendDatabase::Database(crate::parity_db::open(path, db_type, create, true)?)) }, Err(e) => Err(e.into()), } @@ -308,7 +310,7 @@ fn open_kvdb_rocksdb( db_type: DatabaseType, create: bool, cache_size: usize, -) -> OpenDbResult { +) -> Result>, OpenDbError> { // first upgrade database to required version match crate::upgrade::upgrade_db::(path, db_type) { // in case of missing version file, assume that database simply does not exist at given @@ -349,7 +351,7 @@ fn open_kvdb_rocksdb( let db = kvdb_rocksdb::Database::open(&db_config, path)?; // write database version only after the database is successfully opened crate::upgrade::update_version(path)?; - Ok(sp_database::as_database(db)) + Ok(sp_database::as_database_with_seekable_iter(db)) } #[cfg(not(any(feature = "rocksdb", test)))] @@ -358,7 +360,7 @@ fn open_kvdb_rocksdb( _db_type: DatabaseType, _create: bool, _cache_size: usize, -) -> OpenDbResult { +) -> Result>, OpenDbError> { Err(OpenDbError::NotEnabled("with-kvdb-rocksdb")) } diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 74d94c30cd69b..f88808ef9e326 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -313,14 +313,17 @@ fn warm_up_trie_cache( let start_time = std::time::Instant::now(); let mut keys_count = 0; let mut child_keys_count = 0; - for key in KeysIter::<_, TBl>::new(untrusted_state()?, None, None)? { + for key in KeysIter::<_, TBl>::new(untrusted_state()?, None, None) + .map_err(|e| Error::Other(e.to_string()))? + { if keys_count != 0 && keys_count % 100_000 == 0 { debug!("{} keys and {} child keys have been warmed", keys_count, child_keys_count); } match child_info(key.0.clone()) { Some(info) => { for child_key in - KeysIter::<_, TBl>::new_child(untrusted_state()?, info.clone(), None, None)? + KeysIter::<_, TBl>::new_child(untrusted_state()?, info.clone(), None, None) + .map_err(|e| Error::Other(e.to_string()))? { if trusted_state()? .child_storage(&info, &child_key.0) diff --git a/substrate/primitives/database/Cargo.toml b/substrate/primitives/database/Cargo.toml index 1795fece602ea..dfcef4c858254 100644 --- a/substrate/primitives/database/Cargo.toml +++ b/substrate/primitives/database/Cargo.toml @@ -15,4 +15,9 @@ workspace = true [dependencies] kvdb = { workspace = true } +kvdb-rocksdb = { optional = true, workspace = true } parking_lot = { workspace = true, default-features = true } + +[features] +default = [] +rocksdb = ["kvdb-rocksdb"] diff --git a/substrate/primitives/database/src/kvdb.rs b/substrate/primitives/database/src/kvdb.rs index 735813c368570..b84d10f72ab81 100644 --- a/substrate/primitives/database/src/kvdb.rs +++ b/substrate/primitives/database/src/kvdb.rs @@ -18,7 +18,9 @@ /// A wrapper around `kvdb::Database` that implements `sp_database::Database` trait use ::kvdb::{DBTransaction, KeyValueDB}; -use crate::{error, Change, ColumnId, Database, Transaction}; +#[cfg(feature = "rocksdb")] +use crate::DatabaseWithSeekableIterator; +use crate::{error, Change, ColumnId, Database, SeekableIterator, Transaction}; struct DbAdapter(D); @@ -31,7 +33,6 @@ fn handle_err(result: std::io::Result) -> T { } } -/// Wrap RocksDb database into a trait object that implements `sp_database::Database` pub fn as_database(db: D) -> std::sync::Arc> where D: KeyValueDB + 'static, @@ -40,6 +41,15 @@ where std::sync::Arc::new(DbAdapter(db)) } +/// Wrap RocksDb database into a trait object that implements `sp_database::DatabaseWithSeekableIterator` +#[cfg(any(feature = "rocksdb", test))] +pub fn as_database_with_seekable_iter(db: kvdb_rocksdb::Database) -> std::sync::Arc> +where + H: Clone + AsRef<[u8]>, +{ + std::sync::Arc::new(DbAdapter(db)) +} + impl DbAdapter { // Returns counter key and counter value if it exists. fn read_counter(&self, col: ColumnId, key: &[u8]) -> error::Result<(Vec, Option)> { @@ -116,3 +126,38 @@ impl> Database for DbAdapter { handle_err(self.0.has_key(col, key)) } } + +#[cfg(feature = "rocksdb")] +impl<'a> SeekableIterator for kvdb_rocksdb::DBRawIterator<'a> { + fn seek(&mut self, key: &[u8]) { + kvdb_rocksdb::DBRawIterator::seek(self, key) + } + + fn seek_prev(&mut self, key: &[u8]) { + kvdb_rocksdb::DBRawIterator::seek_for_prev(self, key) + } + + fn get(&self) -> Option<(&[u8], Vec)> { + let (k, v) = self.item()?; + Some((k, v.to_owned())) + } + + fn prev(&mut self) { + kvdb_rocksdb::DBRawIterator::prev(self) + } + + fn next(&mut self) { + kvdb_rocksdb::DBRawIterator::next(self) + } +} + +#[cfg(feature = "rocksdb")] +impl> DatabaseWithSeekableIterator for DbAdapter { + fn seekable_iter<'a>(&'a self, col: u32) -> Option> { + match self.0.raw_iter(col) { + Ok(iter) => Some(Box::new(iter)), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => None, + Err(e) => panic!("Internal database error: {}", e) + } + } +} diff --git a/substrate/primitives/database/src/lib.rs b/substrate/primitives/database/src/lib.rs index 42920bbefb499..f0317474e1cf1 100644 --- a/substrate/primitives/database/src/lib.rs +++ b/substrate/primitives/database/src/lib.rs @@ -21,7 +21,9 @@ pub mod error; mod kvdb; mod mem; -pub use crate::kvdb::as_database; +pub use kvdb::as_database; +#[cfg(any(feature = "rocksdb", test))] +pub use kvdb::as_database_with_seekable_iter; pub use mem::MemDb; /// An identifier for a column. @@ -77,6 +79,18 @@ impl Transaction { } } +pub trait SeekableIterator { + fn seek(&mut self, key: &[u8]); + fn seek_prev(&mut self, key: &[u8]); + fn prev(&mut self); + fn next(&mut self); + fn get(&self) -> Option<(&[u8], Vec)>; +} + +pub trait DatabaseWithSeekableIterator>: Database { + fn seekable_iter<'a>(&'a self, col: u32) -> Option>; +} + pub trait Database>: Send + Sync { /// Commit the `transaction` to the database atomically. Any further calls to `get` or `lookup` /// will reflect the new state. @@ -117,6 +131,7 @@ pub trait Database>: Send + Sync { /// /// Not all database implementations use a prefix for keys, so this function may be a noop. fn sanitize_key(&self, _key: &mut Vec) {} + } impl std::fmt::Debug for dyn Database { diff --git a/substrate/primitives/database/src/mem.rs b/substrate/primitives/database/src/mem.rs index 71ba7a9927636..1a971366da6e2 100644 --- a/substrate/primitives/database/src/mem.rs +++ b/substrate/primitives/database/src/mem.rs @@ -17,13 +17,17 @@ //! In-memory implementation of `Database` -use crate::{error, Change, ColumnId, Database, Transaction}; +use crate::{ + error, Change, ColumnId, Database, DatabaseWithSeekableIterator, SeekableIterator, Transaction, +}; use parking_lot::RwLock; -use std::collections::{hash_map::Entry, HashMap}; +use std::collections::{btree_map::Entry, BTreeMap, HashMap}; + +type ColumnSpace = BTreeMap, (u32, Vec)>; #[derive(Default)] /// This implements `Database` as an in-memory hash map. `commit` is not atomic. -pub struct MemDb(RwLock, (u32, Vec)>>>); +pub struct MemDb(RwLock>); impl Database for MemDb where @@ -75,6 +79,116 @@ where } } +enum IterState { + Valid { current_key: Vec }, + Invalid, +} + +struct MemDbSeekableIter<'db> { + db: &'db MemDb, + column: ColumnId, + state: IterState, +} + +impl<'db> MemDbSeekableIter<'db> { + fn lock_col_space(&self, callback: impl FnOnce(&ColumnSpace) -> T) -> T { + let lock = self.db.0.read(); + let column_space = lock + .get(&self.column) + .expect("Iterator must always point to an existing column"); + callback(column_space) + } +} + +impl<'db> SeekableIterator for MemDbSeekableIter<'db> { + fn seek(&mut self, key: &[u8]) { + let next_kv = self.lock_col_space(|col_space| { + let mut range = col_space + .range::<[u8], _>((std::ops::Bound::Included(key), std::ops::Bound::Unbounded)); + range.next().map(|(k, _)| k.to_owned()) + }); + self.state = match next_kv { + Some(key) => IterState::Valid { current_key: key }, + None => IterState::Invalid, + }; + } + + fn seek_prev(&mut self, key: &[u8]) { + let prev_kv = self.lock_col_space(|col_space| { + let mut range = col_space + .range::<[u8], _>((std::ops::Bound::Unbounded, std::ops::Bound::Included(key))); + range.next_back().map(|(k, _)| k.to_owned()) + }); + self.state = match prev_kv { + Some(key) => IterState::Valid { current_key: key }, + None => IterState::Invalid, + }; + } + + fn prev(&mut self) { + let prev_kv = match self.state { + IterState::Valid { ref current_key } => self.lock_col_space(|col_space| { + let mut range = col_space.range::, _>(( + std::ops::Bound::Unbounded, + std::ops::Bound::Excluded(current_key), + )); + range.next_back().map(|(k, _)| k.to_owned()) + }), + IterState::Invalid => None, + }; + self.state = match prev_kv { + Some(key) => IterState::Valid { current_key: key }, + None => IterState::Invalid, + }; + } + + fn next(&mut self) { + let next_kv = match &self.state { + IterState::Valid { current_key } => self.lock_col_space(|col_space| { + let mut range = col_space.range::, _>(( + std::ops::Bound::Excluded(current_key), + std::ops::Bound::Unbounded, + )); + range.next().map(|(k, _)| k.to_owned()) + }), + IterState::Invalid => None, + }; + self.state = match next_kv { + Some(key) => IterState::Valid { current_key: key }, + None => IterState::Invalid, + }; + } + + fn get(&self) -> Option<(&[u8], Vec)> { + match self.state { + IterState::Valid { ref current_key } => Some(( + ¤t_key, + self.lock_col_space(|col_space| { + col_space + .get(current_key) + .expect("Iterator in valid state must always point to an existing key") + .1 + .clone() + }), + )), + IterState::Invalid => None, + } + } +} + +impl DatabaseWithSeekableIterator for MemDb +where + H: Clone + AsRef<[u8]>, +{ + fn seekable_iter<'a>(&'a self, column: u32) -> Option> { + if self.0.read().contains_key(&column) { + Some(Box::new(MemDbSeekableIter { db: self, column, state: IterState::Invalid })) + } else { + None + } + } +} + impl MemDb { /// Create a new instance pub fn new() -> Self {