@@ -2,99 +2,16 @@ use std::path::Path;
2
2
use std:: sync:: Arc ;
3
3
4
4
use async_trait:: async_trait;
5
- use slog:: { warn , Logger } ;
5
+ use slog:: Logger ;
6
6
use tokio:: sync:: Mutex ;
7
7
8
- use crate :: cardano_block_scanner:: { BlockScanner , BlockStreamer , ImmutableBlockStreamer } ;
8
+ use crate :: cardano_block_scanner:: { BlockScanner , BlockStreamer } ;
9
9
use crate :: chain_reader:: ChainBlockReader ;
10
- use crate :: digesters:: ImmutableFile ;
11
- use crate :: entities:: { BlockNumber , ChainPoint , ImmutableFileNumber } ;
10
+ use crate :: entities:: { BlockNumber , ChainPoint } ;
12
11
use crate :: StdResult ;
13
12
14
13
use super :: ChainReaderBlockStreamer ;
15
14
16
- /// Trait to find the lower bound that should be used by the [block scanner][CardanoBlockScanner] when
17
- /// scanning.
18
- #[ cfg_attr( test, mockall:: automock) ]
19
- #[ async_trait]
20
- pub trait ImmutableLowerBoundFinder : Send + Sync {
21
- /// Find the lowest immutable file number that should be scanned by the block scanner.
22
- async fn find_lower_bound ( & self ) -> StdResult < Option < ImmutableFileNumber > > ;
23
- }
24
-
25
- /// Cardano immutable block scanner
26
- ///
27
- /// This scanner reads the immutable files in the given directory and returns the blocks.
28
- ///
29
- /// Both the lower and upper bounds of the [BlockScanner] are ignored, instead:
30
- /// * for the lower bound: the result of the [ImmutableLowerBoundFinder] is used.
31
- /// * for the upper bound: the latest completed immutable file is used.
32
- pub struct CardanoImmutableBlockScanner {
33
- logger : Logger ,
34
- /// When set to true, no error is returned in case of unparsable block, and an error log is written instead.
35
- /// This can occur when the crate 'pallas-hardano' doesn't support some non final encoding for a Cardano era.
36
- /// This situation should only happen on the test networks and not on the mainnet.
37
- allow_unparsable_block : bool ,
38
- lower_bound_finder : Arc < dyn ImmutableLowerBoundFinder > ,
39
- rescan_offset : Option < usize > ,
40
- }
41
-
42
- impl CardanoImmutableBlockScanner {
43
- /// Factory
44
- pub fn new (
45
- logger : Logger ,
46
- allow_unparsable_block : bool ,
47
- lower_bound_finder : Arc < dyn ImmutableLowerBoundFinder > ,
48
- rescan_offset : Option < usize > ,
49
- ) -> Self {
50
- if allow_unparsable_block {
51
- warn ! (
52
- logger,
53
- "The 'allow_unparsable_block' option is activated. This option should only be used on test networks." )
54
- }
55
-
56
- Self {
57
- logger,
58
- allow_unparsable_block,
59
- lower_bound_finder,
60
- rescan_offset,
61
- }
62
- }
63
-
64
- async fn get_lower_bound ( & self ) -> StdResult < Option < ImmutableFileNumber > > {
65
- let highest = self . lower_bound_finder . find_lower_bound ( ) . await ?;
66
- let rescan_offset = self . rescan_offset . unwrap_or ( 0 ) ;
67
- let highest = highest. map ( |h| ( h + 1 ) . saturating_sub ( rescan_offset as u64 ) ) ;
68
- Ok ( highest)
69
- }
70
- }
71
-
72
- #[ async_trait]
73
- impl BlockScanner for CardanoImmutableBlockScanner {
74
- async fn scan (
75
- & self ,
76
- dirpath : & Path ,
77
- _from : Option < ChainPoint > ,
78
- _until : BlockNumber ,
79
- ) -> StdResult < Box < dyn BlockStreamer > > {
80
- let lower_bound = self . get_lower_bound ( ) . await ?;
81
- let is_in_bounds = |number : ImmutableFileNumber | match lower_bound {
82
- Some ( from) => from <= number,
83
- None => true ,
84
- } ;
85
- let immutable_chunks = ImmutableFile :: list_completed_in_dir ( dirpath) ?
86
- . into_iter ( )
87
- . filter ( |f| is_in_bounds ( f. number ) && f. filename . contains ( "chunk" ) )
88
- . collect :: < Vec < _ > > ( ) ;
89
-
90
- Ok ( Box :: new ( ImmutableBlockStreamer :: new (
91
- immutable_chunks,
92
- self . allow_unparsable_block ,
93
- self . logger . clone ( ) ,
94
- ) ) )
95
- }
96
- }
97
-
98
15
/// Cardano block scanner
99
16
///
100
17
/// This scanner reads the blocks with a chain block reader
@@ -139,211 +56,3 @@ impl BlockScanner for CardanoBlockScanner {
139
56
) )
140
57
}
141
58
}
142
-
143
- #[ cfg( test) ]
144
- mod tests {
145
- use crate :: cardano_block_scanner:: BlockStreamerTestExtensions ;
146
- use crate :: test_utils:: { TempDir , TestLogger } ;
147
-
148
- use super :: * ;
149
-
150
- fn get_number_of_immutable_chunk_in_dir ( dir : & Path ) -> usize {
151
- ImmutableFile :: list_completed_in_dir ( dir)
152
- . unwrap ( )
153
- . into_iter ( )
154
- . map ( |i| i. filename . contains ( "chunk" ) )
155
- . len ( )
156
- }
157
-
158
- fn lower_bound_finder < F > ( finder_mock_config : F ) -> Arc < dyn ImmutableLowerBoundFinder >
159
- where
160
- F : FnOnce ( & mut MockImmutableLowerBoundFinder ) ,
161
- {
162
- let mut mock = MockImmutableLowerBoundFinder :: new ( ) ;
163
- finder_mock_config ( & mut mock) ;
164
- Arc :: new ( mock)
165
- }
166
-
167
- #[ tokio:: test]
168
- async fn test_scan_without_lower_bound_ignore_upper_bound ( ) {
169
- let db_path = Path :: new ( "../mithril-test-lab/test_data/immutable/" ) ;
170
- assert ! ( get_number_of_immutable_chunk_in_dir( db_path) >= 3 ) ;
171
-
172
- let lower_bound_finder = lower_bound_finder ( |mock| {
173
- mock. expect_find_lower_bound ( ) . returning ( || Ok ( None ) ) ;
174
- } ) ;
175
- let cardano_transaction_parser = CardanoImmutableBlockScanner :: new (
176
- TestLogger :: stdout ( ) ,
177
- false ,
178
- lower_bound_finder,
179
- None ,
180
- ) ;
181
-
182
- for until_block_number in [ 1 , 10000 ] {
183
- let mut streamer = cardano_transaction_parser
184
- . scan ( db_path, None , until_block_number)
185
- . await
186
- . unwrap ( ) ;
187
- let immutable_blocks = streamer. poll_all ( ) . await . unwrap ( ) ;
188
-
189
- let max_immutable_file_number = immutable_blocks
190
- . iter ( )
191
- . map ( |b| b. immutable_file_number )
192
- . max ( ) ;
193
- // The max highest completed immutable file number is 2
194
- assert_eq ! (
195
- max_immutable_file_number,
196
- Some ( 2 ) ,
197
- "until_chain_point: {until_block_number:?}" ,
198
- ) ;
199
- }
200
- }
201
-
202
- #[ tokio:: test]
203
- async fn test_scan_with_lower_bound_ignore_upper_bound ( ) {
204
- let db_path = Path :: new ( "../mithril-test-lab/test_data/immutable/" ) ;
205
- assert ! ( get_number_of_immutable_chunk_in_dir( db_path) >= 3 ) ;
206
-
207
- let lower_bound_finder = lower_bound_finder ( |mock| {
208
- mock. expect_find_lower_bound ( ) . returning ( || Ok ( Some ( 0 ) ) ) ;
209
- } ) ;
210
- let cardano_transaction_parser = CardanoImmutableBlockScanner :: new (
211
- TestLogger :: stdout ( ) ,
212
- false ,
213
- lower_bound_finder,
214
- None ,
215
- ) ;
216
-
217
- for until_block_number in [ 1 , 10000 ] {
218
- let mut streamer = cardano_transaction_parser
219
- . scan ( db_path, Some ( ChainPoint :: dummy ( ) ) , until_block_number)
220
- . await
221
- . unwrap ( ) ;
222
- let immutable_blocks = streamer. poll_all ( ) . await . unwrap ( ) ;
223
-
224
- let max_immutable_file_number = immutable_blocks
225
- . iter ( )
226
- . map ( |b| b. immutable_file_number )
227
- . max ( ) ;
228
- // The max highest completed immutable file number is 2
229
- assert_eq ! (
230
- max_immutable_file_number,
231
- Some ( 2 ) ,
232
- "until_chain_point: {until_block_number:?}" ,
233
- ) ;
234
- }
235
- }
236
-
237
- #[ tokio:: test]
238
- async fn test_scan_ignore_given_lower_bound_instead_find_it_using_an_external_service ( ) {
239
- let db_path = Path :: new ( "../mithril-test-lab/test_data/immutable/" ) ;
240
- assert ! ( get_number_of_immutable_chunk_in_dir( db_path) >= 3 ) ;
241
-
242
- let test_cases = [
243
- ( None , 0 ) ,
244
- // When a lowest immutable file number is found we start from the next immutable (i + 1)
245
- ( Some ( 1 ) , 2 ) ,
246
- ] ;
247
- for ( lowest_found_immutable, expected) in test_cases {
248
- let lower_bound_finder = lower_bound_finder ( |mock| {
249
- mock. expect_find_lower_bound ( )
250
- . return_once ( move || Ok ( lowest_found_immutable) ) ;
251
- } ) ;
252
-
253
- let from = ChainPoint :: dummy ( ) ;
254
- let cardano_transaction_parser = CardanoImmutableBlockScanner :: new (
255
- TestLogger :: stdout ( ) ,
256
- false ,
257
- lower_bound_finder,
258
- None ,
259
- ) ;
260
-
261
- let mut streamer = cardano_transaction_parser
262
- . scan ( db_path, Some ( from) , 10000000 )
263
- . await
264
- . unwrap ( ) ;
265
- let immutable_blocks = streamer. poll_all ( ) . await . unwrap ( ) ;
266
-
267
- let min_immutable_file_number = immutable_blocks
268
- . iter ( )
269
- . map ( |b| b. immutable_file_number )
270
- . min ( ) ;
271
- assert_eq ! ( min_immutable_file_number, Some ( expected) ) ;
272
- }
273
- }
274
-
275
- #[ tokio:: test]
276
- async fn test_instantiate_parser_with_allow_unparsable_block_should_log_warning ( ) {
277
- let temp_dir = TempDir :: create (
278
- "cardano_transaction_parser" ,
279
- "test_instantiate_parser_with_allow_unparsable_block_should_log_warning" ,
280
- ) ;
281
- let log_path = temp_dir. join ( "test.log" ) ;
282
-
283
- // We create a block to drop the logger and force a flush before we read the log file.
284
- {
285
- let _ = CardanoImmutableBlockScanner :: new (
286
- TestLogger :: file ( & log_path) ,
287
- true ,
288
- lower_bound_finder ( |_| { } ) ,
289
- None ,
290
- ) ;
291
- }
292
-
293
- let log_file = std:: fs:: read_to_string ( & log_path) . unwrap ( ) ;
294
- assert ! ( log_file. contains( "The 'allow_unparsable_block' option is activated. This option should only be used on test networks." ) ) ;
295
- }
296
-
297
- #[ tokio:: test]
298
- async fn test_instantiate_parser_without_allow_unparsable_block_should_not_log_warning ( ) {
299
- let temp_dir = TempDir :: create (
300
- "cardano_transaction_parser" ,
301
- "test_instantiate_parser_without_allow_unparsable_block_should_not_log_warning" ,
302
- ) ;
303
- let log_path = temp_dir. join ( "test.log" ) ;
304
-
305
- // We create a block to drop the logger and force a flush before we read the log file.
306
- {
307
- let _ = CardanoImmutableBlockScanner :: new (
308
- TestLogger :: file ( & log_path) ,
309
- false ,
310
- lower_bound_finder ( |_| { } ) ,
311
- None ,
312
- ) ;
313
- }
314
-
315
- let log_file = std:: fs:: read_to_string ( & log_path) . unwrap ( ) ;
316
- assert ! ( !log_file. contains( "The 'allow_unparsable_block' option is activated. This option should only be used on test networks." ) ) ;
317
- }
318
-
319
- #[ tokio:: test]
320
- async fn change_parsed_lower_bound_when_rescan_limit_is_set ( ) {
321
- fn scanner_with_offset (
322
- highest_stored_immutable : ImmutableFileNumber ,
323
- rescan_offset : ImmutableFileNumber ,
324
- ) -> CardanoImmutableBlockScanner {
325
- let mut store = MockImmutableLowerBoundFinder :: new ( ) ;
326
- store
327
- . expect_find_lower_bound ( )
328
- . returning ( move || Ok ( Some ( highest_stored_immutable) ) ) ;
329
-
330
- CardanoImmutableBlockScanner :: new (
331
- TestLogger :: stdout ( ) ,
332
- false ,
333
- Arc :: new ( store) ,
334
- Some ( rescan_offset as usize ) ,
335
- )
336
- }
337
- let scanner = scanner_with_offset ( 8 , 3 ) ;
338
-
339
- let from = scanner. get_lower_bound ( ) . await . unwrap ( ) ;
340
- // Expected should be: highest_stored_beacon + 1 - rescan_offset
341
- assert_eq ! ( Some ( 6 ) , from) ;
342
-
343
- let scanner = scanner_with_offset ( 5 , 10 ) ;
344
-
345
- let from = scanner. get_lower_bound ( ) . await . unwrap ( ) ;
346
- // If sub overflow it should be 0
347
- assert_eq ! ( Some ( 0 ) , from) ;
348
- }
349
- }
0 commit comments