Skip to content

Commit a7bc763

Browse files
lexnvjsdw
authored andcommitted
rpc-v2: Implement archive_unstable_storageDiff (paritytech#5997)
This PR implements the `archive_unstable_storageDiff`. The implementation follows the rpc-v2 spec from: - paritytech/json-rpc-interface-spec#159. - builds on top of paritytech/json-rpc-interface-spec#161 cc @paritytech/subxt-team --------- Signed-off-by: Alexandru Vasile <[email protected]> Co-authored-by: James Wilson <[email protected]>
1 parent b8f05bf commit a7bc763

File tree

10 files changed

+1449
-18
lines changed

10 files changed

+1449
-18
lines changed

Cargo.lock

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

prdoc/pr_5997.prdoc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
2+
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json
3+
4+
title: Implement archive_unstable_storageDiff method
5+
6+
doc:
7+
- audience: Node Dev
8+
description: |
9+
This PR implements the `archive_unstable_storageDiff` rpc-v2 method.
10+
Developers can use this method to fetch the storage differences
11+
between two blocks. This is useful for oracles and archive nodes.
12+
For more details see: https://github.com/paritytech/json-rpc-interface-spec/blob/main/src/api/archive_unstable_storageDiff.md.
13+
14+
crates:
15+
- name: sc-rpc-spec-v2
16+
bump: major
17+
- name: sc-service
18+
bump: patch

substrate/client/rpc-spec-v2/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ log = { workspace = true, default-features = true }
4242
futures-util = { workspace = true }
4343
rand = { workspace = true, default-features = true }
4444
schnellru = { workspace = true }
45+
itertools = { workspace = true }
4546

4647
[dev-dependencies]
4748
async-trait = { workspace = true }

substrate/client/rpc-spec-v2/src/archive/api.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
//! API trait of the archive methods.
2020
2121
use crate::{
22-
common::events::{ArchiveStorageResult, PaginatedStorageQuery},
22+
common::events::{
23+
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
24+
PaginatedStorageQuery,
25+
},
2326
MethodResult,
2427
};
2528
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
@@ -104,4 +107,21 @@ pub trait ArchiveApi<Hash> {
104107
items: Vec<PaginatedStorageQuery<String>>,
105108
child_trie: Option<String>,
106109
) -> RpcResult<ArchiveStorageResult>;
110+
111+
/// Returns the storage difference between two blocks.
112+
///
113+
/// # Unstable
114+
///
115+
/// This method is unstable and can change in minor or patch releases.
116+
#[subscription(
117+
name = "archive_unstable_storageDiff" => "archive_unstable_storageDiffEvent",
118+
unsubscribe = "archive_unstable_storageDiff_stopStorageDiff",
119+
item = ArchiveStorageDiffEvent,
120+
)]
121+
fn archive_unstable_storage_diff(
122+
&self,
123+
hash: Hash,
124+
items: Vec<ArchiveStorageDiffItem<String>>,
125+
previous_hash: Option<Hash>,
126+
);
107127
}

substrate/client/rpc-spec-v2/src/archive/archive.rs

Lines changed: 84 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,29 @@
1919
//! API implementation for `archive`.
2020
2121
use crate::{
22-
archive::{error::Error as ArchiveError, ArchiveApiServer},
23-
common::events::{ArchiveStorageResult, PaginatedStorageQuery},
24-
hex_string, MethodResult,
22+
archive::{
23+
archive_storage::{ArchiveStorage, ArchiveStorageDiff},
24+
error::Error as ArchiveError,
25+
ArchiveApiServer,
26+
},
27+
common::events::{
28+
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
29+
PaginatedStorageQuery,
30+
},
31+
hex_string, MethodResult, SubscriptionTaskExecutor,
2532
};
2633

2734
use codec::Encode;
28-
use jsonrpsee::core::{async_trait, RpcResult};
35+
use futures::FutureExt;
36+
use jsonrpsee::{
37+
core::{async_trait, RpcResult},
38+
PendingSubscriptionSink,
39+
};
2940
use sc_client_api::{
3041
Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
3142
StorageProvider,
3243
};
44+
use sc_rpc::utils::Subscription;
3345
use sp_api::{CallApiAt, CallContext};
3446
use sp_blockchain::{
3547
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
@@ -41,7 +53,9 @@ use sp_runtime::{
4153
};
4254
use std::{collections::HashSet, marker::PhantomData, sync::Arc};
4355

44-
use super::archive_storage::ArchiveStorage;
56+
use tokio::sync::mpsc;
57+
58+
pub(crate) const LOG_TARGET: &str = "rpc-spec-v2::archive";
4559

4660
/// The configuration of [`Archive`].
4761
pub struct ArchiveConfig {
@@ -64,6 +78,12 @@ const MAX_DESCENDANT_RESPONSES: usize = 5;
6478
/// `MAX_DESCENDANT_RESPONSES`.
6579
const MAX_QUERIED_ITEMS: usize = 8;
6680

81+
/// The buffer capacity for each storage query.
82+
///
83+
/// This is small because the underlying JSON-RPC server has
84+
/// its down buffer capacity per connection as well.
85+
const STORAGE_QUERY_BUF: usize = 16;
86+
6787
impl Default for ArchiveConfig {
6888
fn default() -> Self {
6989
Self {
@@ -79,6 +99,8 @@ pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
7999
client: Arc<Client>,
80100
/// Backend of the chain.
81101
backend: Arc<BE>,
102+
/// Executor to spawn subscriptions.
103+
executor: SubscriptionTaskExecutor,
82104
/// The hexadecimal encoded hash of the genesis block.
83105
genesis_hash: String,
84106
/// The maximum number of items the `archive_storage` can return for a descendant query before
@@ -96,12 +118,14 @@ impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> {
96118
client: Arc<Client>,
97119
backend: Arc<BE>,
98120
genesis_hash: GenesisHash,
121+
executor: SubscriptionTaskExecutor,
99122
config: ArchiveConfig,
100123
) -> Self {
101124
let genesis_hash = hex_string(&genesis_hash.as_ref());
102125
Self {
103126
client,
104127
backend,
128+
executor,
105129
genesis_hash,
106130
storage_max_descendant_responses: config.max_descendant_responses,
107131
storage_max_queried_items: config.max_queried_items,
@@ -278,4 +302,59 @@ where
278302

279303
Ok(storage_client.handle_query(hash, items, child_trie))
280304
}
305+
306+
fn archive_unstable_storage_diff(
307+
&self,
308+
pending: PendingSubscriptionSink,
309+
hash: Block::Hash,
310+
items: Vec<ArchiveStorageDiffItem<String>>,
311+
previous_hash: Option<Block::Hash>,
312+
) {
313+
let storage_client = ArchiveStorageDiff::new(self.client.clone());
314+
let client = self.client.clone();
315+
316+
log::trace!(target: LOG_TARGET, "Storage diff subscription started");
317+
318+
let fut = async move {
319+
let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return };
320+
321+
let previous_hash = if let Some(previous_hash) = previous_hash {
322+
previous_hash
323+
} else {
324+
let Ok(Some(current_header)) = client.header(hash) else {
325+
let message = format!("Block header is not present: {hash}");
326+
let _ = sink.send(&ArchiveStorageDiffEvent::err(message)).await;
327+
return
328+
};
329+
*current_header.parent_hash()
330+
};
331+
332+
let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
333+
let storage_fut =
334+
storage_client.handle_trie_queries(hash, items, previous_hash, tx.clone());
335+
336+
// We don't care about the return value of this join:
337+
// - process_events might encounter an error (if the client disconnected)
338+
// - storage_fut might encounter an error while processing a trie queries and
339+
// the error is propagated via the sink.
340+
let _ = futures::future::join(storage_fut, process_events(&mut rx, &mut sink)).await;
341+
};
342+
343+
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
344+
}
345+
}
346+
347+
/// Sends all the events to the sink.
348+
async fn process_events(rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>, sink: &mut Subscription) {
349+
while let Some(event) = rx.recv().await {
350+
if event.is_done() {
351+
log::debug!(target: LOG_TARGET, "Finished processing partial trie query");
352+
} else if event.is_err() {
353+
log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query");
354+
}
355+
356+
if sink.send(&event).await.is_err() {
357+
return
358+
}
359+
}
281360
}

0 commit comments

Comments
 (0)