Skip to content

Commit 4c3a6e9

Browse files
starknet_committer: parallel reads using unordered futures
1 parent aff17da commit 4c3a6e9

File tree

3 files changed

+67
-1
lines changed

3 files changed

+67
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/starknet_committer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ apollo_config.workspace = true
1414
async-trait.workspace = true
1515
derive_more = { workspace = true, features = ["as_ref", "from", "into"] }
1616
ethnum.workspace = true
17+
futures.workspace = true
1718
hex.workspace = true
1819
pretty_assertions.workspace = true
1920
rand.workspace = true

crates/starknet_committer/src/db/trie_traversal.rs

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::borrow::Borrow;
22
use std::collections::HashMap;
33
use std::fmt::Debug;
44

5+
use futures::stream::{FuturesUnordered, StreamExt};
56
use starknet_api::core::ContractAddress;
67
use starknet_api::hash::HashOutput;
78
use starknet_patricia::db_layout::{NodeLayout, NodeLayoutFor};
@@ -26,7 +27,7 @@ use starknet_patricia::patricia_merkle_tree::traversal::{
2627
use starknet_patricia::patricia_merkle_tree::types::{NodeIndex, SortedLeafIndices};
2728
use starknet_patricia_storage::db_object::{DBObject, EmptyKeyContext, HasStaticPrefix};
2829
use starknet_patricia_storage::errors::StorageError;
29-
use starknet_patricia_storage::storage_trait::{DbKey, Storage};
30+
use starknet_patricia_storage::storage_trait::{AsyncStorage, DbKey, Storage};
3031
use tracing::warn;
3132

3233
use crate::block_committer::input::{
@@ -459,3 +460,66 @@ where
459460
}
460461
Ok(storage_tries)
461462
}
463+
464+
// TODO(Nimrod): Remove the `allow(dead_code)` once we use this function.
465+
#[allow(dead_code)]
466+
async fn create_storage_tries_concurrently<'a, Layout: NodeLayoutFor<StarknetStorageValue>>(
467+
storage: &impl AsyncStorage,
468+
actual_storage_updates: &HashMap<ContractAddress, LeafModifications<StarknetStorageValue>>,
469+
original_contracts_trie_leaves: &HashMap<NodeIndex, ContractState>,
470+
config: &ReaderConfig,
471+
storage_tries_sorted_indices: &HashMap<ContractAddress, SortedLeafIndices<'a>>,
472+
) -> ForestResult<HashMap<ContractAddress, OriginalSkeletonTreeImpl<'a>>>
473+
where
474+
<Layout as NodeLayoutFor<StarknetStorageValue>>::DbLeaf:
475+
HasStaticPrefix<KeyContext = ContractAddress>,
476+
{
477+
let mut futures = FuturesUnordered::new();
478+
let mut storage_tries = HashMap::new();
479+
480+
for (address, updates) in actual_storage_updates {
481+
// Extract data needed for this contract.
482+
let sorted_leaf_indices = *storage_tries_sorted_indices
483+
.get(address)
484+
.ok_or(ForestError::MissingSortedLeafIndices(*address))?;
485+
let contract_state = original_contracts_trie_leaves
486+
.get(&contract_address_into_node_index(address))
487+
.ok_or(ForestError::MissingContractCurrentState(*address))?;
488+
let trie_config = OriginalSkeletonTrieConfig::new_for_classes_or_storage_trie(
489+
config.warn_on_trivial_modifications(),
490+
);
491+
492+
// Prepare data for the async operation.
493+
let address = *address;
494+
let cloned_storage = storage.clone();
495+
// TODO(Ariel): Change `LeafModifications` in `actual_storage_updates` to be an
496+
// iterator over borrowed data so that the conversion below is costless.
497+
let leaf_modifications: HashMap<
498+
NodeIndex,
499+
<Layout as NodeLayoutFor<StarknetStorageValue>>::DbLeaf,
500+
> = updates.iter().map(|(idx, value)| (*idx, Layout::DbLeaf::from(*value))).collect();
501+
502+
// Create the future - tokio will poll all futures concurrently.
503+
futures.push(async move {
504+
let original_skeleton = create_original_skeleton_tree::<Layout::DbLeaf, Layout>(
505+
&cloned_storage,
506+
contract_state.storage_root_hash,
507+
sorted_leaf_indices,
508+
&trie_config,
509+
&leaf_modifications,
510+
None,
511+
&address,
512+
)
513+
.await?;
514+
Ok::<_, ForestError>((address, original_skeleton))
515+
});
516+
}
517+
518+
// Collect all results as they complete.
519+
while let Some(result) = futures.next().await {
520+
let (address, original_skeleton) = result?;
521+
storage_tries.insert(address, original_skeleton);
522+
}
523+
524+
Ok(storage_tries)
525+
}

0 commit comments

Comments
 (0)