@@ -101,24 +101,25 @@ impl CardanoTransactionsImporter {
101
101
) ;
102
102
return Ok ( ( ) ) ;
103
103
}
104
-
105
- // todo: temp algorithm, should be optimized to avoid loading all blocks & transactions
106
- // at once in memory (probably using iterators)
107
- let scanned_blocks = self . block_scanner . scan ( & self . dirpath , from, until) . await ?;
108
- let parsed_transactions: Vec < CardanoTransaction > = scanned_blocks
109
- . into_iter ( )
110
- . flat_map ( |b| b. into_transactions ( ) )
111
- . collect ( ) ;
112
104
debug ! (
113
105
self . logger,
114
- "TransactionsImporter retrieved '{}' Cardano transactions between immutables '{}' and '{until}'" ,
115
- parsed_transactions. len( ) ,
106
+ "TransactionsImporter will retrieve Cardano transactions between immutables '{}' and '{until}'" ,
116
107
from. unwrap_or( 0 )
117
108
) ;
118
109
119
- self . transaction_store
120
- . store_transactions ( parsed_transactions)
121
- . await ?;
110
+ let mut streamer = self . block_scanner . scan ( & self . dirpath , from, until) . await ?;
111
+
112
+ while let Some ( blocks) = streamer. poll_next ( ) . await ? {
113
+ let parsed_transactions: Vec < CardanoTransaction > = blocks
114
+ . into_iter ( )
115
+ . flat_map ( |b| b. into_transactions ( ) )
116
+ . collect ( ) ;
117
+
118
+ self . transaction_store
119
+ . store_transactions ( parsed_transactions)
120
+ . await ?;
121
+ }
122
+
122
123
Ok ( ( ) )
123
124
}
124
125
@@ -180,7 +181,9 @@ impl TransactionsImporter for CardanoTransactionsImporter {
180
181
mod tests {
181
182
use mockall:: mock;
182
183
183
- use mithril_common:: cardano_block_scanner:: { DumbBlockScanner , ScannedBlock } ;
184
+ use mithril_common:: cardano_block_scanner:: {
185
+ BlockStreamer , DumbBlockScanner , DumbBlockStreamer , ScannedBlock ,
186
+ } ;
184
187
use mithril_common:: crypto_helper:: MKTree ;
185
188
use mithril_common:: entities:: { BlockNumber , BlockRangesSequence } ;
186
189
@@ -199,7 +202,7 @@ mod tests {
199
202
dirpath: & Path ,
200
203
from_immutable: Option <ImmutableFileNumber >,
201
204
until_immutable: ImmutableFileNumber ,
202
- ) -> StdResult <Vec < ScannedBlock >>;
205
+ ) -> StdResult <Box <dyn BlockStreamer >>;
203
206
}
204
207
}
205
208
@@ -267,7 +270,7 @@ mod tests {
267
270
scanner_mock
268
271
. expect_scan ( )
269
272
. withf ( move |_, from, until| from. is_none ( ) && until == & up_to_beacon)
270
- . return_once ( move |_, _, _| Ok ( blocks) ) ;
273
+ . return_once ( move |_, _, _| Ok ( Box :: new ( DumbBlockStreamer :: new ( vec ! [ blocks] ) ) ) ) ;
271
274
CardanoTransactionsImporter :: new_for_test ( Arc :: new ( scanner_mock) , repository. clone ( ) )
272
275
} ;
273
276
@@ -429,7 +432,9 @@ mod tests {
429
432
scanner_mock
430
433
. expect_scan ( )
431
434
. withf ( move |_, from, until| from == & Some ( 12 ) && until == & up_to_beacon)
432
- . return_once ( move |_, _, _| Ok ( scanned_blocks) )
435
+ . return_once ( move |_, _, _| {
436
+ Ok ( Box :: new ( DumbBlockStreamer :: new ( vec ! [ scanned_blocks] ) ) )
437
+ } )
433
438
. once ( ) ;
434
439
CardanoTransactionsImporter :: new_for_test ( Arc :: new ( scanner_mock) , repository. clone ( ) )
435
440
} ;
@@ -607,11 +612,9 @@ mod tests {
607
612
let transactions = into_transactions ( & blocks) ;
608
613
let importer = {
609
614
let connection = cardano_tx_db_connection ( ) . unwrap ( ) ;
610
- let mut scanner = MockBlockScannerImpl :: new ( ) ;
611
- scanner. expect_scan ( ) . return_once ( move |_, _, _| Ok ( blocks) ) ;
612
615
613
616
CardanoTransactionsImporter :: new_for_test (
614
- Arc :: new ( scanner ) ,
617
+ Arc :: new ( DumbBlockScanner :: new ( blocks . clone ( ) ) ) ,
615
618
Arc :: new ( CardanoTransactionRepository :: new ( Arc :: new ( connection) ) ) ,
616
619
)
617
620
} ;
0 commit comments