@@ -3,18 +3,22 @@ use crate::{
3
3
cache:: ImmutableFileDigestCacheProvider , ImmutableDigester , ImmutableDigesterError ,
4
4
ImmutableFile ,
5
5
} ,
6
- entities:: { CardanoDbBeacon , HexEncodedDigest , ImmutableFileName } ,
6
+ entities:: { CardanoDbBeacon , HexEncodedDigest , ImmutableFileName , ImmutableFileNumber } ,
7
7
logging:: LoggerExtensions ,
8
8
} ;
9
9
use async_trait:: async_trait;
10
10
use sha2:: { Digest , Sha256 } ;
11
11
use slog:: { debug, info, warn, Logger } ;
12
12
use std:: { collections:: BTreeMap , io, path:: Path , sync:: Arc } ;
13
13
14
- /// Result of a cache computation, contains the digest and the list of new entries to add
14
+ /// Result of a cache computation, contains the list of immutable digests and the list of new entries to add
15
15
/// to the [ImmutableFileDigestCacheProvider].
16
- type CacheComputationResult =
17
- Result < ( [ u8 ; 32 ] , Vec < ( ImmutableFileName , HexEncodedDigest ) > ) , io:: Error > ;
16
+ type ComputedImmutablesDigestsResult = Result < ComputedImmutablesDigests , io:: Error > ;
17
+
18
+ struct ComputedImmutablesDigests {
19
+ digests : Vec < HexEncodedDigest > ,
20
+ new_cached_entries : Vec < ( ImmutableFileName , HexEncodedDigest ) > ,
21
+ }
18
22
19
23
/// A digester working directly on a Cardano DB immutables files
20
24
pub struct CardanoImmutableDigester {
@@ -40,6 +44,36 @@ impl CardanoImmutableDigester {
40
44
logger : logger. new_with_component_name :: < Self > ( ) ,
41
45
}
42
46
}
47
+
48
+ async fn process_immutables (
49
+ & self ,
50
+ immutables : Vec < ImmutableFile > ,
51
+ ) -> Result < ComputedImmutablesDigests , ImmutableDigesterError > {
52
+ let cached_values = match self . cache_provider . as_ref ( ) {
53
+ None => BTreeMap :: from_iter ( immutables. into_iter ( ) . map ( |i| ( i, None ) ) ) ,
54
+ Some ( cache_provider) => match cache_provider. get ( immutables. clone ( ) ) . await {
55
+ Ok ( values) => values,
56
+ Err ( error) => {
57
+ warn ! (
58
+ self . logger, "Error while getting cached immutable files digests" ;
59
+ "error" => ?error
60
+ ) ;
61
+ BTreeMap :: from_iter ( immutables. into_iter ( ) . map ( |i| ( i, None ) ) )
62
+ }
63
+ } ,
64
+ } ;
65
+
66
+ // The computation of immutable files digests is done in a separate thread because it is blocking the whole task
67
+ let logger = self . logger . clone ( ) ;
68
+ let computed_digests =
69
+ tokio:: task:: spawn_blocking ( move || -> ComputedImmutablesDigestsResult {
70
+ compute_immutables_digests ( logger, cached_values)
71
+ } )
72
+ . await
73
+ . map_err ( |e| ImmutableDigesterError :: DigestComputationError ( e. into ( ) ) ) ??;
74
+
75
+ Ok ( computed_digests)
76
+ }
43
77
}
44
78
45
79
#[ async_trait]
@@ -49,99 +83,87 @@ impl ImmutableDigester for CardanoImmutableDigester {
49
83
dirpath : & Path ,
50
84
beacon : & CardanoDbBeacon ,
51
85
) -> Result < String , ImmutableDigesterError > {
52
- let up_to_file_number = beacon. immutable_file_number ;
53
- let immutables = ImmutableFile :: list_completed_in_dir ( dirpath) ?
54
- . into_iter ( )
55
- . filter ( |f| f. number <= up_to_file_number)
56
- . collect :: < Vec < _ > > ( ) ;
57
- info ! ( self . logger, ">> compute_digest" ; "beacon" => #?beacon, "nb_of_immutables" => immutables. len( ) ) ;
58
-
59
- match immutables. last ( ) {
60
- None => Err ( ImmutableDigesterError :: NotEnoughImmutable {
61
- expected_number : up_to_file_number,
62
- found_number : None ,
63
- db_dir : dirpath. to_owned ( ) ,
64
- } ) ,
65
- Some ( last_immutable_file) if last_immutable_file. number < up_to_file_number => {
66
- Err ( ImmutableDigesterError :: NotEnoughImmutable {
67
- expected_number : up_to_file_number,
68
- found_number : Some ( last_immutable_file. number ) ,
69
- db_dir : dirpath. to_owned ( ) ,
70
- } )
86
+ let immutables_to_process =
87
+ list_immutable_files_to_process ( dirpath, beacon. immutable_file_number ) ?;
88
+ info ! ( self . logger, ">> compute_digest" ; "beacon" => #?beacon, "nb_of_immutables" => immutables_to_process. len( ) ) ;
89
+ let computed_immutables_digests = self . process_immutables ( immutables_to_process) . await ?;
90
+
91
+ let digest = {
92
+ let mut hasher = Sha256 :: new ( ) ;
93
+ hasher. update ( compute_beacon_hash ( & self . cardano_network , beacon) . as_bytes ( ) ) ;
94
+ for digest in computed_immutables_digests. digests {
95
+ hasher. update ( digest) ;
71
96
}
72
- Some ( _) => {
73
- let cached_values = match self . cache_provider . as_ref ( ) {
74
- None => BTreeMap :: from_iter ( immutables. into_iter ( ) . map ( |i| ( i, None ) ) ) ,
75
- Some ( cache_provider) => match cache_provider. get ( immutables. clone ( ) ) . await {
76
- Ok ( values) => values,
77
- Err ( error) => {
78
- warn ! (
79
- self . logger, "Error while getting cached immutable files digests" ;
80
- "error" => ?error
81
- ) ;
82
- BTreeMap :: from_iter ( immutables. into_iter ( ) . map ( |i| ( i, None ) ) )
83
- }
84
- } ,
85
- } ;
86
-
87
- // digest is done in a separate thread because it is blocking the whole task
88
- let logger = self . logger . clone ( ) ;
89
- let thread_cardano_network = self . cardano_network . clone ( ) ;
90
- let thread_beacon = beacon. clone ( ) ;
91
- let ( hash, new_cache_entries) =
92
- tokio:: task:: spawn_blocking ( move || -> CacheComputationResult {
93
- compute_hash (
94
- logger,
95
- thread_cardano_network,
96
- & thread_beacon,
97
- cached_values,
98
- )
99
- } )
100
- . await
101
- . map_err ( |e| ImmutableDigesterError :: DigestComputationError ( e. into ( ) ) ) ??;
102
- let digest = hex:: encode ( hash) ;
103
-
104
- debug ! ( self . logger, "Computed digest: {digest:?}" ) ;
105
-
106
- if let Some ( cache_provider) = self . cache_provider . as_ref ( ) {
107
- if let Err ( error) = cache_provider. store ( new_cache_entries) . await {
108
- warn ! (
109
- self . logger, "Error while storing new immutable files digests to cache" ;
110
- "error" => ?error
111
- ) ;
112
- }
113
- }
97
+ let hash: [ u8 ; 32 ] = hasher. finalize ( ) . into ( ) ;
98
+
99
+ hex:: encode ( hash)
100
+ } ;
114
101
115
- Ok ( digest)
102
+ debug ! ( self . logger, "Computed digest: {digest:?}" ) ;
103
+
104
+ if let Some ( cache_provider) = self . cache_provider . as_ref ( ) {
105
+ if let Err ( error) = cache_provider
106
+ . store ( computed_immutables_digests. new_cached_entries )
107
+ . await
108
+ {
109
+ warn ! (
110
+ self . logger, "Error while storing new immutable files digests to cache" ;
111
+ "error" => ?error
112
+ ) ;
116
113
}
117
114
}
115
+
116
+ Ok ( digest)
118
117
}
119
118
}
120
119
121
- fn compute_hash (
120
+ fn list_immutable_files_to_process (
121
+ dirpath : & Path ,
122
+ up_to_file_number : ImmutableFileNumber ,
123
+ ) -> Result < Vec < ImmutableFile > , ImmutableDigesterError > {
124
+ let immutables: Vec < ImmutableFile > = ImmutableFile :: list_completed_in_dir ( dirpath) ?
125
+ . into_iter ( )
126
+ . filter ( |f| f. number <= up_to_file_number)
127
+ . collect ( ) ;
128
+
129
+ match immutables. last ( ) {
130
+ None => Err ( ImmutableDigesterError :: NotEnoughImmutable {
131
+ expected_number : up_to_file_number,
132
+ found_number : None ,
133
+ db_dir : dirpath. to_owned ( ) ,
134
+ } ) ,
135
+ Some ( last_immutable_file) if last_immutable_file. number < up_to_file_number => {
136
+ Err ( ImmutableDigesterError :: NotEnoughImmutable {
137
+ expected_number : up_to_file_number,
138
+ found_number : Some ( last_immutable_file. number ) ,
139
+ db_dir : dirpath. to_owned ( ) ,
140
+ } )
141
+ }
142
+ Some ( _) => Ok ( immutables) ,
143
+ }
144
+ }
145
+
146
+ fn compute_immutables_digests (
122
147
logger : Logger ,
123
- cardano_network : String ,
124
- beacon : & CardanoDbBeacon ,
125
148
entries : BTreeMap < ImmutableFile , Option < HexEncodedDigest > > ,
126
- ) -> CacheComputationResult {
127
- let mut hasher = Sha256 :: new ( ) ;
149
+ ) -> ComputedImmutablesDigestsResult {
128
150
let mut new_cached_entries = Vec :: new ( ) ;
129
151
let mut progress = Progress {
130
152
index : 0 ,
131
153
total : entries. len ( ) ,
132
154
} ;
133
155
134
- hasher . update ( compute_beacon_hash ( & cardano_network , beacon ) . as_bytes ( ) ) ;
156
+ let mut digests = Vec :: with_capacity ( entries . len ( ) ) ;
135
157
136
158
for ( ix, ( entry, cache) ) in entries. iter ( ) . enumerate ( ) {
137
159
match cache {
138
160
None => {
139
161
let data = hex:: encode ( entry. compute_raw_hash :: < Sha256 > ( ) ?) ;
140
- hasher . update ( & data) ;
162
+ digests . push ( data. clone ( ) ) ;
141
163
new_cached_entries. push ( ( entry. filename . clone ( ) , data) ) ;
142
164
}
143
165
Some ( digest) => {
144
- hasher . update ( digest) ;
166
+ digests . push ( digest. to_string ( ) ) ;
145
167
}
146
168
} ;
147
169
@@ -150,7 +172,10 @@ fn compute_hash(
150
172
}
151
173
}
152
174
153
- Ok ( ( hasher. finalize ( ) . into ( ) , new_cached_entries) )
175
+ Ok ( ComputedImmutablesDigests {
176
+ digests,
177
+ new_cached_entries,
178
+ } )
154
179
}
155
180
156
181
fn compute_beacon_hash ( network : & str , cardano_db_beacon : & CardanoDbBeacon ) -> String {
0 commit comments