@@ -7,7 +7,7 @@ use async_trait::async_trait;
7
7
use chrono:: Utc ;
8
8
use slog_scope:: info;
9
9
use std:: sync:: Arc ;
10
- use tokio:: { runtime :: Handle , task:: JoinHandle } ;
10
+ use tokio:: task:: JoinHandle ;
11
11
12
12
use mithril_common:: {
13
13
entities:: {
@@ -37,7 +37,7 @@ pub trait SignedEntityService: Send + Sync {
37
37
& self ,
38
38
signed_entity_type : SignedEntityType ,
39
39
certificate : & Certificate ,
40
- ) -> StdResult < ( ) > ;
40
+ ) -> StdResult < JoinHandle < StdResult < ( ) > > > ;
41
41
42
42
/// Return a list of signed snapshots order by creation date descending.
43
43
async fn get_last_signed_snapshots (
@@ -107,6 +107,55 @@ impl MithrilSignedEntityService {
107
107
}
108
108
}
109
109
110
+ async fn create_artifact_impl (
111
+ & self ,
112
+ signed_entity_type : SignedEntityType ,
113
+ certificate : & Certificate ,
114
+ ) -> StdResult < ( ) > {
115
+ info ! (
116
+ "MithrilSignedEntityService::create_artifact" ;
117
+ "signed_entity_type" => ?signed_entity_type,
118
+ "certificate_hash" => & certificate. hash
119
+ ) ;
120
+
121
+ println ! (
122
+ "MithrilSignedEntityService::create_artifact: signed_entity_type: {:?}, certificate_hash: {}" ,
123
+ signed_entity_type, certificate. hash
124
+ ) ;
125
+
126
+ let mut remaining_retries = 2 ;
127
+ let artifact = loop {
128
+ remaining_retries -= 1 ;
129
+
130
+ match self
131
+ . compute_artifact ( signed_entity_type. clone ( ) , certificate)
132
+ . await
133
+ {
134
+ Err ( error) if remaining_retries == 0 => break Err ( error) ,
135
+ Err ( _error) => ( ) ,
136
+ Ok ( artifact) => break Ok ( artifact) ,
137
+ } ;
138
+ } ?;
139
+
140
+ let signed_entity = SignedEntityRecord {
141
+ signed_entity_id : artifact. get_id ( ) ,
142
+ signed_entity_type : signed_entity_type. clone ( ) ,
143
+ certificate_id : certificate. hash . clone ( ) ,
144
+ artifact : serde_json:: to_string ( & artifact) ?,
145
+ created_at : Utc :: now ( ) ,
146
+ } ;
147
+
148
+ self . signed_entity_storer
149
+ . store_signed_entity ( & signed_entity)
150
+ . await
151
+ . with_context ( || {
152
+ format ! (
153
+ "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
154
+ )
155
+ } ) ?;
156
+ Ok ( ( ) )
157
+ }
158
+
110
159
async fn create_artifact_return_join_handle (
111
160
& self ,
112
161
signed_entity_type : SignedEntityType ,
@@ -135,7 +184,7 @@ impl MithrilSignedEntityService {
135
184
let service_clone = service. clone ( ) ;
136
185
let result = tokio:: task:: spawn ( async move {
137
186
service_clone
138
- . create_artifact ( signed_entity_type_clone, & certificate_cloned)
187
+ . create_artifact_impl ( signed_entity_type_clone, & certificate_cloned)
139
188
. await
140
189
} )
141
190
. await ;
@@ -212,49 +261,41 @@ impl SignedEntityService for MithrilSignedEntityService {
212
261
& self ,
213
262
signed_entity_type : SignedEntityType ,
214
263
certificate : & Certificate ,
215
- ) -> StdResult < ( ) > {
216
- info ! (
217
- "MithrilSignedEntityService::create_artifact" ;
218
- "signed_entity_type" => ?signed_entity_type,
219
- "certificate_hash" => & certificate. hash
220
- ) ;
221
-
222
- println ! (
223
- "MithrilSignedEntityService::create_artifact: signed_entity_type: {:?}, certificate_hash: {}" ,
224
- signed_entity_type, certificate. hash
225
- ) ;
226
-
227
- let mut remaining_retries = 2 ;
228
- let artifact = loop {
229
- remaining_retries -= 1 ;
264
+ ) -> StdResult < JoinHandle < StdResult < ( ) > > > {
265
+ if self
266
+ . signed_entity_type_lock
267
+ . is_locked ( & signed_entity_type)
268
+ . await
269
+ {
270
+ return Err ( anyhow ! (
271
+ "Signed entity type '{:?}' is already locked" ,
272
+ signed_entity_type
273
+ ) ) ;
274
+ }
230
275
231
- match self
232
- . compute_artifact ( signed_entity_type. clone ( ) , certificate)
233
- . await
234
- {
235
- Err ( error) if remaining_retries == 0 => break Err ( error) ,
236
- Err ( _error) => ( ) ,
237
- Ok ( artifact) => break Ok ( artifact) ,
238
- } ;
239
- } ?;
276
+ let service = self . clone ( ) ;
277
+ let certificate_cloned = certificate. clone ( ) ;
278
+ service
279
+ . signed_entity_type_lock
280
+ . lock ( & signed_entity_type)
281
+ . await ;
240
282
241
- let signed_entity = SignedEntityRecord {
242
- signed_entity_id : artifact. get_id ( ) ,
243
- signed_entity_type : signed_entity_type. clone ( ) ,
244
- certificate_id : certificate. hash . clone ( ) ,
245
- artifact : serde_json:: to_string ( & artifact) ?,
246
- created_at : Utc :: now ( ) ,
247
- } ;
283
+ Ok ( tokio:: task:: spawn ( async move {
284
+ let signed_entity_type_clone = signed_entity_type. clone ( ) ;
285
+ let service_clone = service. clone ( ) ;
286
+ let result = tokio:: task:: spawn ( async move {
287
+ service_clone
288
+ . create_artifact_impl ( signed_entity_type_clone, & certificate_cloned)
289
+ . await
290
+ } )
291
+ . await ;
292
+ service
293
+ . signed_entity_type_lock
294
+ . release ( signed_entity_type)
295
+ . await ;
248
296
249
- self . signed_entity_storer
250
- . store_signed_entity ( & signed_entity)
251
- . await
252
- . with_context ( || {
253
- format ! (
254
- "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'"
255
- )
256
- } ) ?;
257
- Ok ( ( ) )
297
+ result. unwrap ( )
298
+ } ) )
258
299
}
259
300
260
301
async fn get_last_signed_snapshots (
@@ -664,7 +705,7 @@ mod tests {
664
705
let error_message_str = error_message. as_str ( ) ;
665
706
666
707
artifact_builder_service
667
- . create_artifact ( signed_entity_type, & certificate)
708
+ . create_artifact_impl ( signed_entity_type, & certificate)
668
709
. await
669
710
. expect ( error_message_str) ;
670
711
}
@@ -687,13 +728,13 @@ mod tests {
687
728
let signed_entity_type_immutable =
688
729
SignedEntityType :: CardanoImmutableFilesFull ( CardanoDbBeacon :: default ( ) ) ;
689
730
signed_entity_type_service
690
- . create_artifact ( signed_entity_type_immutable, & certificate)
731
+ . create_artifact_impl ( signed_entity_type_immutable, & certificate)
691
732
. await
692
733
. unwrap ( ) ;
693
734
694
735
let signed_entity_type_msd = SignedEntityType :: MithrilStakeDistribution ( Epoch ( 1 ) ) ;
695
736
signed_entity_type_service
696
- . create_artifact ( signed_entity_type_msd, & certificate)
737
+ . create_artifact_impl ( signed_entity_type_msd, & certificate)
697
738
. await
698
739
. unwrap ( ) ;
699
740
}
0 commit comments