Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion lib/src/network/codec/storage_call_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use crate::util::protobuf;

use alloc::{borrow::Cow, vec::Vec};

/// Maximum size in bytes for light protocol requests (1 MiB).
pub const LIGHT_PROTOCOL_REQUEST_MAX_SIZE: usize = 1024 * 1024;

/// Description of a storage proof request that can be sent to a peer.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StorageProofRequestConfig<TKeysIter> {
Expand Down Expand Up @@ -102,7 +105,8 @@ pub fn decode_storage_or_call_proof_response(
) -> Result<Option<&[u8]>, DecodeStorageCallProofResponseError> {
let field_num = match ty {
StorageOrCallProof::CallProof => 1,
StorageOrCallProof::StorageProof => 2,
// Both storage and child storage use remote_read_response (field 2)
StorageOrCallProof::StorageProof | StorageOrCallProof::ChildStorageProof => 2,
};

// TODO: while the `proof` field is correctly optional, the `response` field isn't supposed to be optional; make it `#[required]` again once https://github.com/paritytech/substrate/pull/12732 has been merged and released
Expand Down Expand Up @@ -140,4 +144,49 @@ pub enum DecodeStorageCallProofResponseError {
pub enum StorageOrCallProof {
StorageProof,
CallProof,
ChildStorageProof,
}

/// Description of a child storage proof request that can be sent to a peer.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChildStorageProofRequestConfig<TChildTrie, TKeysIter> {
/// Hash of the block to request the storage of.
pub block_hash: [u8; 32],
/// Child storage key (the child trie name, without the `:child_storage:default:` prefix).
pub child_trie: TChildTrie,
/// List of storage keys to query within the child trie.
pub keys: TKeysIter,
}

// See https://github.com/paritytech/substrate/blob/c8653447fc8ef8d95a92fe164c96dffb37919e85/client/network/light/src/schema/light.v1.proto
// for protocol definition (RemoteReadChildRequest message).

/// Builds the bytes corresponding to a child storage proof request.
pub fn build_child_storage_proof_request<'a>(
config: ChildStorageProofRequestConfig<
impl AsRef<[u8]> + Clone + 'a,
impl Iterator<Item = impl AsRef<[u8]> + Clone + 'a> + 'a,
>,
) -> impl Iterator<Item = impl AsRef<[u8]>> {
// Message format for RemoteReadChildRequest (tag 4 in Request oneof):
// - Field 2: block hash
// - Field 3: child storage key (child trie name)
// - Field 6: keys to fetch
protobuf::message_tag_encode(
4,
protobuf::bytes_tag_encode(2, config.block_hash)
.map(either::Left)
.chain(
protobuf::bytes_tag_encode(3, config.child_trie)
.map(either::Left)
.map(either::Right),
)
.chain(
config
.keys
.flat_map(|key| protobuf::bytes_tag_encode(6, key))
.map(either::Right)
.map(either::Right),
),
)
}
52 changes: 52 additions & 0 deletions lib/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2997,6 +2997,46 @@ where
)?)
}

/// Sends a child storage proof request to the given peer.
///
/// This is similar to [`ChainNetwork::start_storage_proof_request`] but for child tries.
///
/// This function might generate a message destined a connection. Use
/// [`ChainNetwork::pull_message_to_connection`] to process messages after it has returned.
///
/// # Panic
///
/// Panics if the [`ChainId`] is invalid.
///
pub fn start_child_storage_proof_request(
&mut self,
target: &PeerId,
chain_id: ChainId,
config: codec::ChildStorageProofRequestConfig<
impl AsRef<[u8]> + Clone,
impl Iterator<Item = impl AsRef<[u8]> + Clone>,
>,
timeout: Duration,
) -> Result<SubstreamId, StartRequestMaybeTooLargeError> {
let request_data =
codec::build_child_storage_proof_request(config).fold(Vec::new(), |mut a, b| {
a.extend_from_slice(b.as_ref());
a
});

// The request data can possibly be higher than the protocol limit.
// TODO: check limit

Ok(self.start_request(
target,
request_data,
Protocol::LightStorage {
chain_index: chain_id.0,
},
timeout,
)?)
}

/// Sends a Kademlia find node request to the given peer.
///
/// This function might generate a message destined a connection. Use
Expand Down Expand Up @@ -4488,6 +4528,18 @@ pub enum StorageProofRequestError {
RemoteCouldntAnswer,
}

impl StorageProofRequestError {
/// Returns `true` if this is caused by networking issues, as opposed to a consensus-related
/// issue.
pub fn is_network_problem(&self) -> bool {
match self {
StorageProofRequestError::Request(_) => true,
StorageProofRequestError::Decode(_) => false,
StorageProofRequestError::RemoteCouldntAnswer => true,
}
}
}

/// Error returned by [`ChainNetwork::start_call_proof_request`].
#[derive(Debug, Clone, derive_more::Display, derive_more::Error)]
pub enum CallProofRequestError {
Expand Down
151 changes: 146 additions & 5 deletions light-base/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ impl<TPlat: PlatformRef> NetworkService<TPlat> {
grandpa_warp_sync_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
call_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
child_storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
chains_by_next_discovery: BTreeMap::new(),
}));

Expand Down Expand Up @@ -474,6 +475,38 @@ impl<TPlat: PlatformRef> NetworkServiceChain<TPlat> {
rx.await.unwrap()
}

/// Sends a child storage proof request to the given peer.
pub async fn child_storage_proof_request(
self: Arc<Self>,
target: PeerId,
config: codec::ChildStorageProofRequestConfig<
impl AsRef<[u8]> + Clone,
impl Iterator<Item = impl AsRef<[u8]> + Clone>,
>,
timeout: Duration,
) -> Result<service::EncodedMerkleProof, ChildStorageProofRequestError> {
let (tx, rx) = oneshot::channel();

self.messages_tx
.send(ToBackgroundChain::StartChildStorageProofRequest {
target: target.clone(),
config: ChildStorageProofRequestConfigOwned {
block_hash: config.block_hash,
child_trie: config.child_trie.as_ref().to_vec(),
keys: config
.keys
.map(|key| key.as_ref().to_vec())
.collect::<Vec<_>>(),
},
timeout,
result: tx,
})
.await
.unwrap();

rx.await.unwrap()
}

/// Announces transaction to the peers we are connected to.
///
/// Returns a list of peers that we have sent the transaction to. Can return an empty `Vec`
Expand Down Expand Up @@ -662,6 +695,37 @@ impl CallProofRequestError {
}
}

/// Error returned by [`NetworkServiceChain::child_storage_proof_request`].
#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
pub enum ChildStorageProofRequestError {
/// No established connection with the target.
NoConnection,
/// Child storage proof request is too large and can't be sent.
RequestTooLarge,
/// Error during the request.
#[display("{_0}")]
Request(service::StorageProofRequestError),
}

impl ChildStorageProofRequestError {
/// Returns `true` if this is caused by networking issues, as opposed to a consensus-related
/// issue.
pub fn is_network_problem(&self) -> bool {
match self {
ChildStorageProofRequestError::Request(err) => err.is_network_problem(),
ChildStorageProofRequestError::RequestTooLarge => false,
ChildStorageProofRequestError::NoConnection => true,
}
}
}

/// Owned version of [`codec::ChildStorageProofRequestConfig`] for sending across channel.
struct ChildStorageProofRequestConfigOwned {
block_hash: [u8; 32],
child_trie: Vec<u8>,
keys: Vec<Vec<u8>>,
}

enum ToBackground<TPlat: PlatformRef> {
AddChain {
messages_rx: async_channel::Receiver<ToBackgroundChain>,
Expand Down Expand Up @@ -708,6 +772,13 @@ enum ToBackgroundChain {
timeout: Duration,
result: oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
},
// TODO: serialize the request before sending over channel
StartChildStorageProofRequest {
target: PeerId,
config: ChildStorageProofRequestConfigOwned,
timeout: Duration,
result: oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
},
SetLocalBestBlock {
best_hash: [u8; 32],
best_number: u64,
Expand Down Expand Up @@ -834,6 +905,12 @@ struct BackgroundTask<TPlat: PlatformRef> {
fnv::FnvBuildHasher,
>,

child_storage_proof_requests: HashMap<
service::SubstreamId,
oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
fnv::FnvBuildHasher,
>,

/// All chains, indexed by the value of [`Chain::next_discovery_when`].
chains_by_next_discovery: BTreeMap<(TPlat::Instant, ChainId), Pin<Box<TPlat::Delay>>>,
}
Expand Down Expand Up @@ -1487,6 +1564,65 @@ async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
}
};
}
WakeUpReason::MessageForChain(
chain_id,
ToBackgroundChain::StartChildStorageProofRequest {
target,
config,
timeout,
result,
},
) => {
log!(
&task.platform,
Debug,
"network",
"child-storage-proof-request-started",
chain = task.network[chain_id].log_name,
target,
block_hash = HashDisplay(&config.block_hash)
);

match task.network.start_child_storage_proof_request(
&target,
chain_id,
codec::ChildStorageProofRequestConfig {
block_hash: config.block_hash,
child_trie: &config.child_trie,
keys: config.keys.iter().map(|k| k.as_slice()),
},
timeout,
) {
Ok(substream_id) => {
task.child_storage_proof_requests
.insert(substream_id, result);
}
Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
log!(
&task.platform,
Debug,
"network",
"child-storage-proof-request-error",
chain = task.network[chain_id].log_name,
target,
error = "NoConnection"
);
let _ = result.send(Err(ChildStorageProofRequestError::NoConnection));
}
Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
log!(
&task.platform,
Debug,
"network",
"child-storage-proof-request-error",
chain = task.network[chain_id].log_name,
target,
error = "RequestTooLarge"
);
let _ = result.send(Err(ChildStorageProofRequestError::RequestTooLarge));
}
};
}
WakeUpReason::MessageForChain(
chain_id,
ToBackgroundChain::SetLocalBestBlock {
Expand Down Expand Up @@ -2146,11 +2282,16 @@ async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
}
}

let _ = task
.storage_proof_requests
.remove(&substream_id)
.unwrap()
.send(response.map_err(StorageProofRequestError::Request));
// Both regular storage proof and child storage proof use the same protocol,
// so check both HashMaps for the request.
if let Some(sender) = task.storage_proof_requests.remove(&substream_id) {
let _ = sender.send(response.map_err(StorageProofRequestError::Request));
} else if let Some(sender) = task.child_storage_proof_requests.remove(&substream_id)
{
let _ = sender.send(response.map_err(ChildStorageProofRequestError::Request));
} else {
unreachable!()
}
}
WakeUpReason::NetworkEvent(service::Event::RequestResult {
substream_id,
Expand Down
Loading
Loading