11use crate :: {
22 DataSourceError , Result ,
3- client:: {
4- self , MiniBFClient ,
5- api:: MiniBFApi ,
6- conversions:: { self , from_block_content} ,
7- } ,
3+ client:: { MiniBFClient , api:: MiniBFApi , conversions:: from_block_content} ,
84 read_mc_epoch_config,
95} ;
10- use blockfrost_openapi:: models:: block_content:: BlockContent ;
116use chrono:: { DateTime , NaiveDateTime , TimeDelta } ;
127use derive_new:: new;
138use figment:: { Figment , providers:: Env } ;
@@ -51,14 +46,9 @@ pub struct BlockDataSourceImpl {
5146impl BlockDataSourceImpl {
5247 /// Returns the latest _unstable_ Cardano block from the Db-Sync database
5348 pub async fn get_latest_block_info ( & self ) -> Result < MainchainBlock > {
54- self . client
55- . blocks_latest ( )
56- . await
57- . and_then ( conversions:: from_block_content)
58- . map_err ( |e| {
59- DataSourceError :: ExpectedDataNotFound ( format ! ( "No latest block on chain. {e}" , ) )
60- . into ( )
61- } )
49+ self . client . blocks_latest ( ) . await . and_then ( from_block_content) . map_err ( |e| {
50+ DataSourceError :: ExpectedDataNotFound ( format ! ( "No latest block on chain. {e}" , ) ) . into ( )
51+ } )
6252 }
6353
6454 /// Returns the latest _stable_ Cardano block from the Db-Sync database that is within
@@ -96,7 +86,7 @@ impl BlockDataSourceImpl {
9686 } ;
9787 let block_opt = match from_cache {
9888 Some ( block) => Some ( block) ,
99- None => Some ( conversions :: from_block_content ( self . client . blocks_by_id ( hash) . await ?) ?) ,
89+ None => Some ( from_block_content ( self . client . blocks_by_id ( hash) . await ?) ?) ,
10090 } ;
10191 Ok ( block_opt)
10292 }
@@ -185,7 +175,7 @@ impl BlockDataSourceImpl {
185175
186176 loop {
187177 let block = match self . client . blocks_by_id ( current_block_number) . await {
188- Ok ( b) => conversions :: from_block_content ( b) ?,
178+ Ok ( b) => from_block_content ( b) ?,
189179 Err ( _) => return Ok ( None ) ,
190180 } ;
191181
@@ -263,9 +253,8 @@ impl BlockDataSourceImpl {
263253 hash : McBlockHash ,
264254 reference_timestamp : NaiveDateTime ,
265255 ) -> Result < Option < MainchainBlock > > {
266- let block = Some ( conversions:: from_block_content ( self . client . blocks_by_id ( hash) . await ?) ?) ;
267- let latest_block =
268- Some ( conversions:: from_block_content ( self . client . blocks_latest ( ) . await ?) ?) ;
256+ let block = Some ( from_block_content ( self . client . blocks_by_id ( hash) . await ?) ?) ;
257+ let latest_block = Some ( from_block_content ( self . client . blocks_latest ( ) . await ?) ?) ;
269258 Ok ( block
270259 . zip ( latest_block)
271260 . filter ( |( block, latest_block) | {
@@ -276,28 +265,29 @@ impl BlockDataSourceImpl {
276265 }
277266
278267 /// Caches stable blocks for lookup by hash.
279- async fn fill_cache ( & self , _from_block : & MainchainBlock ) -> Result < ( ) > {
280- // let from_block_no = from_block.block_no;
281- // let size = u32::from(self.cache_size);
282- // let latest_block =
283- // db_model::get_latest_block_info(&self.pool)
284- // .await?
285- // .ok_or(InternalDataSourceError(
286- // "No latest block when filling the caches.".to_string(),
287- // ))?;
288- // let stable_block_num = latest_block.block_no.saturating_sub(self.security_parameter);
268+ async fn fill_cache ( & self , from_block : & MainchainBlock ) -> Result < ( ) > {
269+ let from_block_no = from_block. number ;
270+ let size = u32:: from ( self . cache_size ) ;
271+ let latest_block = from_block_content ( self . client . blocks_latest ( ) . await ?) ?;
272+ let stable_block_num = latest_block. number . saturating_sub ( self . security_parameter ) ;
289273
290- // let to_block_no = from_block_no.saturating_add(size).min(stable_block_num);
291- // let blocks = if to_block_no > from_block_no {
292- // db_model::get_blocks_by_numbers(&self.pool, from_block_no, to_block_no).await?
293- // } else {
294- // vec![from_block.clone()]
295- // };
274+ let to_block_no = from_block_no. saturating_add ( size) . min ( stable_block_num) ;
275+ let blocks = if from_block_no < to_block_no {
276+ let futures = ( from_block_no. 0 ..=to_block_no. 0 ) . map ( |block_no| async move {
277+ self . client
278+ . blocks_by_id ( McBlockNumber ( block_no) )
279+ . await
280+ . and_then ( from_block_content)
281+ } ) ;
282+ futures:: future:: try_join_all ( futures) . await ?. into_iter ( ) . collect ( )
283+ } else {
284+ vec ! [ from_block. clone( ) ]
285+ } ;
296286
297- // if let Ok(mut cache) = self.stable_blocks_cache.lock() {
298- // cache.update(blocks);
299- // debug!("Cached blocks {} to {} for by hash lookups.", from_block_no.0, to_block_no.0);
300- // }
287+ if let Ok ( mut cache) = self . stable_blocks_cache . lock ( ) {
288+ cache. update ( blocks) ;
289+ debug ! ( "Cached blocks {} to {} for by hash lookups." , from_block_no. 0 , to_block_no. 0 ) ;
290+ }
301291 Ok ( ( ) )
302292 }
303293
0 commit comments