2020
2121use crate :: {
2222 archive:: {
23- archive_storage:: { ArchiveStorage , ArchiveStorageDiff } ,
24- error:: Error as ArchiveError ,
25- ArchiveApiServer ,
23+ archive_storage:: ArchiveStorageDiff , error:: Error as ArchiveError , ArchiveApiServer ,
2624 } ,
27- common:: events:: {
28- ArchiveStorageDiffEvent , ArchiveStorageDiffItem , ArchiveStorageResult ,
29- PaginatedStorageQuery ,
25+ common:: {
26+ events:: {
27+ ArchiveStorageDiffEvent , ArchiveStorageDiffItem , ArchiveStorageEvent , StorageQuery ,
28+ } ,
29+ storage:: { QueryResult , StorageSubscriptionClient } ,
3030 } ,
3131 hex_string, MethodResult , SubscriptionTaskExecutor ,
3232} ;
@@ -57,42 +57,12 @@ use tokio::sync::mpsc;
5757
5858pub ( crate ) const LOG_TARGET : & str = "rpc-spec-v2::archive" ;
5959
60- /// The configuration of [`Archive`].
61- pub struct ArchiveConfig {
62- /// The maximum number of items the `archive_storage` can return for a descendant query before
63- /// pagination is required.
64- pub max_descendant_responses : usize ,
65- /// The maximum number of queried items allowed for the `archive_storage` at a time.
66- pub max_queried_items : usize ,
67- }
68-
69- /// The maximum number of items the `archive_storage` can return for a descendant query before
70- /// pagination is required.
71- ///
72- /// Note: this is identical to the `chainHead` value.
73- const MAX_DESCENDANT_RESPONSES : usize = 5 ;
74-
75- /// The maximum number of queried items allowed for the `archive_storage` at a time.
76- ///
77- /// Note: A queried item can also be a descendant query which can return up to
78- /// `MAX_DESCENDANT_RESPONSES`.
79- const MAX_QUERIED_ITEMS : usize = 8 ;
80-
8160/// The buffer capacity for each storage query.
8261///
8362/// This is small because the underlying JSON-RPC server has
8463/// its down buffer capacity per connection as well.
8564const STORAGE_QUERY_BUF : usize = 16 ;
8665
87- impl Default for ArchiveConfig {
88- fn default ( ) -> Self {
89- Self {
90- max_descendant_responses : MAX_DESCENDANT_RESPONSES ,
91- max_queried_items : MAX_QUERIED_ITEMS ,
92- }
93- }
94- }
95-
9666/// An API for archive RPC calls.
9767pub struct Archive < BE : Backend < Block > , Block : BlockT , Client > {
9868 /// Substrate client.
@@ -103,11 +73,6 @@ pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
10373 executor : SubscriptionTaskExecutor ,
10474 /// The hexadecimal encoded hash of the genesis block.
10575 genesis_hash : String ,
106- /// The maximum number of items the `archive_storage` can return for a descendant query before
107- /// pagination is required.
108- storage_max_descendant_responses : usize ,
109- /// The maximum number of queried items allowed for the `archive_storage` at a time.
110- storage_max_queried_items : usize ,
11176 /// Phantom member to pin the block type.
11277 _phantom : PhantomData < Block > ,
11378}
@@ -119,18 +84,9 @@ impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> {
11984 backend : Arc < BE > ,
12085 genesis_hash : GenesisHash ,
12186 executor : SubscriptionTaskExecutor ,
122- config : ArchiveConfig ,
12387 ) -> Self {
12488 let genesis_hash = hex_string ( & genesis_hash. as_ref ( ) ) ;
125- Self {
126- client,
127- backend,
128- executor,
129- genesis_hash,
130- storage_max_descendant_responses : config. max_descendant_responses ,
131- storage_max_queried_items : config. max_queried_items ,
132- _phantom : PhantomData ,
133- }
89+ Self { client, backend, executor, genesis_hash, _phantom : PhantomData }
13490 }
13591}
13692
@@ -260,47 +216,53 @@ where
260216
261217 fn archive_unstable_storage (
262218 & self ,
219+ pending : PendingSubscriptionSink ,
263220 hash : Block :: Hash ,
264- items : Vec < PaginatedStorageQuery < String > > ,
221+ items : Vec < StorageQuery < String > > ,
265222 child_trie : Option < String > ,
266- ) -> RpcResult < ArchiveStorageResult > {
267- let items = items
268- . into_iter ( )
269- . map ( |query| {
270- let key = StorageKey ( parse_hex_param ( query. key ) ?) ;
271- let pagination_start_key = query
272- . pagination_start_key
273- . map ( |key| parse_hex_param ( key) . map ( |key| StorageKey ( key) ) )
274- . transpose ( ) ?;
275-
276- // Paginated start key is only supported
277- if pagination_start_key. is_some ( ) && !query. query_type . is_descendant_query ( ) {
278- return Err ( ArchiveError :: InvalidParam (
279- "Pagination start key is only supported for descendants queries"
280- . to_string ( ) ,
281- ) )
282- }
223+ ) {
224+ let mut storage_client =
225+ StorageSubscriptionClient :: < Client , Block , BE > :: new ( self . client . clone ( ) ) ;
226+
227+ let fut = async move {
228+ let Ok ( mut sink) = pending. accept ( ) . await . map ( Subscription :: from) else { return } ;
283229
284- Ok ( PaginatedStorageQuery {
285- key,
286- query_type : query. query_type ,
287- pagination_start_key,
230+ let items = match items
231+ . into_iter ( )
232+ . map ( |query| {
233+ let key = StorageKey ( parse_hex_param ( query. key ) ?) ;
234+ Ok ( StorageQuery { key, query_type : query. query_type } )
288235 } )
289- } )
290- . collect :: < Result < Vec < _ > , ArchiveError > > ( ) ?;
236+ . collect :: < Result < Vec < _ > , ArchiveError > > ( )
237+ {
238+ Ok ( items) => items,
239+ Err ( error) => {
240+ let _ = sink. send ( & ArchiveStorageEvent :: err ( error. to_string ( ) ) ) ;
241+ return
242+ } ,
243+ } ;
291244
292- let child_trie = child_trie
293- . map ( |child_trie| parse_hex_param ( child_trie) )
294- . transpose ( ) ?
295- . map ( ChildInfo :: new_default_from_vec) ;
245+ let child_trie = child_trie. map ( |child_trie| parse_hex_param ( child_trie) ) . transpose ( ) ;
246+ let child_trie = match child_trie {
247+ Ok ( child_trie) => child_trie. map ( ChildInfo :: new_default_from_vec) ,
248+ Err ( error) => {
249+ let _ = sink. send ( & ArchiveStorageEvent :: err ( error. to_string ( ) ) ) ;
250+ return
251+ } ,
252+ } ;
296253
297- let storage_client = ArchiveStorage :: new (
298- self . client . clone ( ) ,
299- self . storage_max_descendant_responses ,
300- self . storage_max_queried_items ,
301- ) ;
254+ let ( tx, mut rx) = tokio:: sync:: mpsc:: channel ( STORAGE_QUERY_BUF ) ;
255+ let storage_fut = storage_client. generate_events ( hash, items, child_trie, tx) ;
302256
303- Ok ( storage_client. handle_query ( hash, items, child_trie) )
257+ // We don't care about the return value of this join:
258+ // - process_events might encounter an error (if the client disconnected)
259+ // - storage_fut might encounter an error while processing a trie queries and
260+ // the error is propagated via the sink.
261+ let _ = futures:: future:: join ( storage_fut, process_storage_events ( & mut rx, & mut sink) )
262+ . await ;
263+ } ;
264+
265+ self . executor . spawn ( "substrate-rpc-subscription" , Some ( "rpc" ) , fut. boxed ( ) ) ;
304266 }
305267
306268 fn archive_unstable_storage_diff (
@@ -337,24 +299,74 @@ where
337299 // - process_events might encounter an error (if the client disconnected)
338300 // - storage_fut might encounter an error while processing a trie queries and
339301 // the error is propagated via the sink.
340- let _ = futures:: future:: join ( storage_fut, process_events ( & mut rx, & mut sink) ) . await ;
302+ let _ =
303+ futures:: future:: join ( storage_fut, process_storage_diff_events ( & mut rx, & mut sink) )
304+ . await ;
341305 } ;
342306
343307 self . executor . spawn ( "substrate-rpc-subscription" , Some ( "rpc" ) , fut. boxed ( ) ) ;
344308 }
345309}
346310
347- /// Sends all the events to the sink.
348- async fn process_events ( rx : & mut mpsc:: Receiver < ArchiveStorageDiffEvent > , sink : & mut Subscription ) {
349- while let Some ( event) = rx. recv ( ) . await {
350- if event. is_done ( ) {
351- log:: debug!( target: LOG_TARGET , "Finished processing partial trie query" ) ;
352- } else if event. is_err ( ) {
353- log:: debug!( target: LOG_TARGET , "Error encountered while processing partial trie query" ) ;
311+ /// Sends all the events of the storage_diff method to the sink.
312+ async fn process_storage_diff_events (
313+ rx : & mut mpsc:: Receiver < ArchiveStorageDiffEvent > ,
314+ sink : & mut Subscription ,
315+ ) {
316+ loop {
317+ tokio:: select! {
318+ _ = sink. closed( ) => {
319+ return
320+ } ,
321+
322+ maybe_event = rx. recv( ) => {
323+ let Some ( event) = maybe_event else {
324+ break ;
325+ } ;
326+
327+ if event. is_done( ) {
328+ log:: debug!( target: LOG_TARGET , "Finished processing partial trie query" ) ;
329+ } else if event. is_err( ) {
330+ log:: debug!( target: LOG_TARGET , "Error encountered while processing partial trie query" ) ;
331+ }
332+
333+ if sink. send( & event) . await . is_err( ) {
334+ return
335+ }
336+ }
354337 }
338+ }
339+ }
340+
341+ /// Sends all the events of the storage method to the sink.
342+ async fn process_storage_events ( rx : & mut mpsc:: Receiver < QueryResult > , sink : & mut Subscription ) {
343+ loop {
344+ tokio:: select! {
345+ _ = sink. closed( ) => {
346+ break
347+ }
348+
349+ maybe_storage = rx. recv( ) => {
350+ let Some ( event) = maybe_storage else {
351+ break ;
352+ } ;
353+
354+ match event {
355+ Ok ( None ) => continue ,
356+
357+ Ok ( Some ( event) ) =>
358+ if sink. send( & ArchiveStorageEvent :: result( event) ) . await . is_err( ) {
359+ return
360+ } ,
355361
356- if sink. send ( & event) . await . is_err ( ) {
357- return
362+ Err ( error) => {
363+ let _ = sink. send( & ArchiveStorageEvent :: err( error) ) . await ;
364+ return
365+ }
366+ }
367+ }
358368 }
359369 }
370+
371+ let _ = sink. send ( & ArchiveStorageEvent :: StorageDone ) . await ;
360372}
0 commit comments