1919//! API implementation for `archive`.
2020
2121use crate :: {
22- archive:: { error:: Error as ArchiveError , ArchiveApiServer } ,
23- common:: events:: { ArchiveStorageResult , PaginatedStorageQuery } ,
24- hex_string, MethodResult ,
22+ archive:: {
23+ archive_storage:: { ArchiveStorage , ArchiveStorageDiff } ,
24+ error:: Error as ArchiveError ,
25+ ArchiveApiServer ,
26+ } ,
27+ common:: events:: {
28+ ArchiveStorageDiffEvent , ArchiveStorageDiffItem , ArchiveStorageResult ,
29+ PaginatedStorageQuery ,
30+ } ,
31+ hex_string, MethodResult , SubscriptionTaskExecutor ,
2532} ;
2633
2734use codec:: Encode ;
28- use jsonrpsee:: core:: { async_trait, RpcResult } ;
35+ use futures:: FutureExt ;
36+ use jsonrpsee:: {
37+ core:: { async_trait, RpcResult } ,
38+ PendingSubscriptionSink ,
39+ } ;
2940use sc_client_api:: {
3041 Backend , BlockBackend , BlockchainEvents , CallExecutor , ChildInfo , ExecutorProvider , StorageKey ,
3142 StorageProvider ,
3243} ;
44+ use sc_rpc:: utils:: Subscription ;
3345use sp_api:: { CallApiAt , CallContext } ;
3446use sp_blockchain:: {
3547 Backend as BlockChainBackend , Error as BlockChainError , HeaderBackend , HeaderMetadata ,
@@ -41,7 +53,9 @@ use sp_runtime::{
4153} ;
4254use std:: { collections:: HashSet , marker:: PhantomData , sync:: Arc } ;
4355
44- use super :: archive_storage:: ArchiveStorage ;
56+ use tokio:: sync:: mpsc;
57+
58+ pub ( crate ) const LOG_TARGET : & str = "rpc-spec-v2::archive" ;
4559
4660/// The configuration of [`Archive`].
4761pub struct ArchiveConfig {
@@ -64,6 +78,12 @@ const MAX_DESCENDANT_RESPONSES: usize = 5;
6478/// `MAX_DESCENDANT_RESPONSES`.
6579const MAX_QUERIED_ITEMS : usize = 8 ;
6680
81+ /// The buffer capacity for each storage query.
82+ ///
83+ /// This is small because the underlying JSON-RPC server has
84+ /// its down buffer capacity per connection as well.
85+ const STORAGE_QUERY_BUF : usize = 16 ;
86+
6787impl Default for ArchiveConfig {
6888 fn default ( ) -> Self {
6989 Self {
@@ -79,6 +99,8 @@ pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
7999 client : Arc < Client > ,
80100 /// Backend of the chain.
81101 backend : Arc < BE > ,
102+ /// Executor to spawn subscriptions.
103+ executor : SubscriptionTaskExecutor ,
82104 /// The hexadecimal encoded hash of the genesis block.
83105 genesis_hash : String ,
84106 /// The maximum number of items the `archive_storage` can return for a descendant query before
@@ -96,12 +118,14 @@ impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> {
96118 client : Arc < Client > ,
97119 backend : Arc < BE > ,
98120 genesis_hash : GenesisHash ,
121+ executor : SubscriptionTaskExecutor ,
99122 config : ArchiveConfig ,
100123 ) -> Self {
101124 let genesis_hash = hex_string ( & genesis_hash. as_ref ( ) ) ;
102125 Self {
103126 client,
104127 backend,
128+ executor,
105129 genesis_hash,
106130 storage_max_descendant_responses : config. max_descendant_responses ,
107131 storage_max_queried_items : config. max_queried_items ,
@@ -278,4 +302,59 @@ where
278302
279303 Ok ( storage_client. handle_query ( hash, items, child_trie) )
280304 }
305+
306+ fn archive_unstable_storage_diff (
307+ & self ,
308+ pending : PendingSubscriptionSink ,
309+ hash : Block :: Hash ,
310+ items : Vec < ArchiveStorageDiffItem < String > > ,
311+ previous_hash : Option < Block :: Hash > ,
312+ ) {
313+ let storage_client = ArchiveStorageDiff :: new ( self . client . clone ( ) ) ;
314+ let client = self . client . clone ( ) ;
315+
316+ log:: trace!( target: LOG_TARGET , "Storage diff subscription started" ) ;
317+
318+ let fut = async move {
319+ let Ok ( mut sink) = pending. accept ( ) . await . map ( Subscription :: from) else { return } ;
320+
321+ let previous_hash = if let Some ( previous_hash) = previous_hash {
322+ previous_hash
323+ } else {
324+ let Ok ( Some ( current_header) ) = client. header ( hash) else {
325+ let message = format ! ( "Block header is not present: {hash}" ) ;
326+ let _ = sink. send ( & ArchiveStorageDiffEvent :: err ( message) ) . await ;
327+ return
328+ } ;
329+ * current_header. parent_hash ( )
330+ } ;
331+
332+ let ( tx, mut rx) = tokio:: sync:: mpsc:: channel ( STORAGE_QUERY_BUF ) ;
333+ let storage_fut =
334+ storage_client. handle_trie_queries ( hash, items, previous_hash, tx. clone ( ) ) ;
335+
336+ // We don't care about the return value of this join:
337+ // - process_events might encounter an error (if the client disconnected)
338+ // - storage_fut might encounter an error while processing a trie queries and
339+ // the error is propagated via the sink.
340+ let _ = futures:: future:: join ( storage_fut, process_events ( & mut rx, & mut sink) ) . await ;
341+ } ;
342+
343+ self . executor . spawn ( "substrate-rpc-subscription" , Some ( "rpc" ) , fut. boxed ( ) ) ;
344+ }
345+ }
346+
347+ /// Sends all the events to the sink.
348+ async fn process_events ( rx : & mut mpsc:: Receiver < ArchiveStorageDiffEvent > , sink : & mut Subscription ) {
349+ while let Some ( event) = rx. recv ( ) . await {
350+ if event. is_done ( ) {
351+ log:: debug!( target: LOG_TARGET , "Finished processing partial trie query" ) ;
352+ } else if event. is_err ( ) {
353+ log:: debug!( target: LOG_TARGET , "Error encountered while processing partial trie query" ) ;
354+ }
355+
356+ if sink. send ( & event) . await . is_err ( ) {
357+ return
358+ }
359+ }
281360}
0 commit comments