@@ -135,17 +135,17 @@ impl<'client> Provider<'client> for SignerRecordProvider<'client> {
135
135
}
136
136
137
137
/// Query to insert the signer record
138
- pub struct InsertSignerRecordProvider < ' conn > {
138
+ pub struct RegisterSignerRecordProvider < ' conn > {
139
139
connection : & ' conn Connection ,
140
140
}
141
141
142
- impl < ' conn > InsertSignerRecordProvider < ' conn > {
142
+ impl < ' conn > RegisterSignerRecordProvider < ' conn > {
143
143
/// Create a new instance
144
144
pub fn new ( connection : & ' conn Connection ) -> Self {
145
145
Self { connection }
146
146
}
147
147
148
- fn get_insert_condition ( & self , signer_record : SignerRecord ) -> WhereCondition {
148
+ fn get_register_condition ( & self , signer_record : SignerRecord ) -> WhereCondition {
149
149
WhereCondition :: new (
150
150
"(signer_id, pool_ticker, created_at, updated_at, registered_at) values (?*, ?*, ?*, ?*, ?*)" ,
151
151
vec ! [
@@ -165,7 +165,7 @@ impl<'conn> InsertSignerRecordProvider<'conn> {
165
165
}
166
166
167
167
fn persist ( & self , signer_record : SignerRecord ) -> StdResult < SignerRecord > {
168
- let filters = self . get_insert_condition ( signer_record. clone ( ) ) ;
168
+ let filters = self . get_register_condition ( signer_record. clone ( ) ) ;
169
169
170
170
let entity = self . find ( filters) ?. next ( ) . unwrap_or_else ( || {
171
171
panic ! ( "No entity returned by the persister, signer_record = {signer_record:?}" )
@@ -175,7 +175,7 @@ impl<'conn> InsertSignerRecordProvider<'conn> {
175
175
}
176
176
}
177
177
178
- impl < ' conn > Provider < ' conn > for InsertSignerRecordProvider < ' conn > {
178
+ impl < ' conn > Provider < ' conn > for RegisterSignerRecordProvider < ' conn > {
179
179
type Entity = SignerRecord ;
180
180
181
181
fn get_connection ( & ' conn self ) -> & ' conn Connection {
@@ -188,22 +188,25 @@ impl<'conn> Provider<'conn> for InsertSignerRecordProvider<'conn> {
188
188
let projection =
189
189
Self :: Entity :: get_projection ( ) . expand ( SourceAlias :: new ( & [ ( "{:signer:}" , "signer" ) ] ) ) ;
190
190
191
- format ! ( "insert into signer {condition} on conflict (signer_id) do update set updated_at = excluded.updated_at returning {projection}" )
191
+ format ! (
192
+ "insert into signer {condition} on conflict (signer_id) do update set \
193
+ updated_at = excluded.updated_at, registered_at = excluded.registered_at returning {projection}"
194
+ )
192
195
}
193
196
}
194
197
195
198
/// Query to update the signer record
196
- pub struct UpdateSignerRecordProvider < ' conn > {
199
+ pub struct ImportSignerRecordProvider < ' conn > {
197
200
connection : & ' conn Connection ,
198
201
}
199
202
200
- impl < ' conn > UpdateSignerRecordProvider < ' conn > {
203
+ impl < ' conn > ImportSignerRecordProvider < ' conn > {
201
204
/// Create a new instance
202
205
pub fn new ( connection : & ' conn Connection ) -> Self {
203
206
Self { connection }
204
207
}
205
208
206
- fn get_update_condition ( & self , signer_records : Vec < SignerRecord > ) -> WhereCondition {
209
+ fn get_import_condition ( & self , signer_records : Vec < SignerRecord > ) -> WhereCondition {
207
210
let columns = "(signer_id, pool_ticker, created_at, updated_at, registered_at)" ;
208
211
let values_columns: Vec < & str > = repeat ( "(?*, ?*, ?*, ?*, ?*)" )
209
212
. take ( signer_records. len ( ) )
@@ -234,7 +237,7 @@ impl<'conn> UpdateSignerRecordProvider<'conn> {
234
237
}
235
238
236
239
fn persist ( & self , signer_record : SignerRecord ) -> StdResult < SignerRecord > {
237
- let filters = self . get_update_condition ( vec ! [ signer_record. clone( ) ] ) ;
240
+ let filters = self . get_import_condition ( vec ! [ signer_record. clone( ) ] ) ;
238
241
239
242
let entity = self . find ( filters) ?. next ( ) . unwrap_or_else ( || {
240
243
panic ! ( "No entity returned by the persister, signer_record = {signer_record:?}" )
@@ -244,13 +247,13 @@ impl<'conn> UpdateSignerRecordProvider<'conn> {
244
247
}
245
248
246
249
fn persist_many ( & self , signer_records : Vec < SignerRecord > ) -> StdResult < Vec < SignerRecord > > {
247
- let filters = self . get_update_condition ( signer_records) ;
250
+ let filters = self . get_import_condition ( signer_records) ;
248
251
249
252
Ok ( self . find ( filters) ?. collect ( ) )
250
253
}
251
254
}
252
255
253
- impl < ' conn > Provider < ' conn > for UpdateSignerRecordProvider < ' conn > {
256
+ impl < ' conn > Provider < ' conn > for ImportSignerRecordProvider < ' conn > {
254
257
type Entity = SignerRecord ;
255
258
256
259
fn get_connection ( & ' conn self ) -> & ' conn Connection {
@@ -263,7 +266,10 @@ impl<'conn> Provider<'conn> for UpdateSignerRecordProvider<'conn> {
263
266
let projection =
264
267
Self :: Entity :: get_projection ( ) . expand ( SourceAlias :: new ( & [ ( "{:signer:}" , "signer" ) ] ) ) ;
265
268
266
- format ! ( "insert into signer {condition} on conflict(signer_id) do update set pool_ticker = excluded.pool_ticker, updated_at = excluded.updated_at returning {projection}" )
269
+ format ! (
270
+ "insert into signer {condition} on conflict(signer_id) do update \
271
+ set pool_ticker = excluded.pool_ticker, updated_at = excluded.updated_at returning {projection}"
272
+ )
267
273
}
268
274
}
269
275
@@ -286,34 +292,15 @@ impl SignerStore {
286
292
287
293
Ok ( cursor. collect ( ) )
288
294
}
289
- }
290
295
291
- #[ async_trait]
292
- impl SignerRecorder for SignerStore {
293
- async fn record_signer_id ( & self , signer_id : String ) -> StdResult < ( ) > {
294
- let connection = & * self . connection . lock ( ) . await ;
295
- let provider = InsertSignerRecordProvider :: new ( connection) ;
296
- let created_at = Utc :: now ( ) ;
297
- let updated_at = created_at;
298
- let signer_record = SignerRecord {
299
- signer_id,
300
- pool_ticker : None ,
301
- created_at,
302
- updated_at,
303
- registered_at : None ,
304
- } ;
305
- provider. persist ( signer_record) ?;
306
-
307
- Ok ( ( ) )
308
- }
309
-
310
- async fn record_signer_pool_ticker (
296
+ /// Import a signer in the database, its registered_at date will be left empty
297
+ pub async fn import_signer (
311
298
& self ,
312
299
signer_id : String ,
313
300
pool_ticker : Option < String > ,
314
301
) -> StdResult < ( ) > {
315
302
let connection = & * self . connection . lock ( ) . await ;
316
- let provider = UpdateSignerRecordProvider :: new ( connection) ;
303
+ let provider = ImportSignerRecordProvider :: new ( connection) ;
317
304
let created_at = Utc :: now ( ) ;
318
305
let updated_at = created_at;
319
306
let signer_record = SignerRecord {
@@ -328,12 +315,13 @@ impl SignerRecorder for SignerStore {
328
315
Ok ( ( ) )
329
316
}
330
317
331
- async fn record_many_signers_pool_tickers (
318
+ /// Create many signers at once in the database, their registered_at date will be left empty
319
+ pub async fn import_many_signers (
332
320
& self ,
333
321
pool_ticker_by_id : HashMap < String , Option < String > > ,
334
322
) -> StdResult < ( ) > {
335
323
let connection = & * self . connection . lock ( ) . await ;
336
- let provider = UpdateSignerRecordProvider :: new ( connection) ;
324
+ let provider = ImportSignerRecordProvider :: new ( connection) ;
337
325
338
326
let created_at = Utc :: now ( ) ;
339
327
let updated_at = created_at;
@@ -354,11 +342,33 @@ impl SignerRecorder for SignerStore {
354
342
}
355
343
}
356
344
345
+ #[ async_trait]
346
+ impl SignerRecorder for SignerStore {
347
+ async fn record_signer_registration ( & self , signer_id : String ) -> StdResult < ( ) > {
348
+ let connection = & * self . connection . lock ( ) . await ;
349
+ let provider = RegisterSignerRecordProvider :: new ( connection) ;
350
+ let created_at = Utc :: now ( ) ;
351
+ let updated_at = created_at;
352
+ let registered_at = Some ( created_at) ;
353
+ let signer_record = SignerRecord {
354
+ signer_id,
355
+ pool_ticker : None ,
356
+ created_at,
357
+ updated_at,
358
+ registered_at,
359
+ } ;
360
+ provider. persist ( signer_record) ?;
361
+
362
+ Ok ( ( ) )
363
+ }
364
+ }
365
+
357
366
#[ cfg( test) ]
358
367
mod tests {
359
368
use crate :: database:: provider:: apply_all_migrations_to_db;
360
369
use chrono:: Duration ;
361
370
use mithril_common:: StdResult ;
371
+ use std:: collections:: BTreeMap ;
362
372
363
373
use super :: * ;
364
374
@@ -395,9 +405,9 @@ mod tests {
395
405
let query = {
396
406
// leverage the expanded parameter from this provider which is unit
397
407
// tested on its own above.
398
- let update_provider = UpdateSignerRecordProvider :: new ( connection) ;
408
+ let update_provider = ImportSignerRecordProvider :: new ( connection) ;
399
409
let ( sql_values, _) = update_provider
400
- . get_update_condition ( vec ! [ signer_records. first( ) . unwrap( ) . to_owned( ) ] )
410
+ . get_import_condition ( vec ! [ signer_records. first( ) . unwrap( ) . to_owned( ) ] )
401
411
. expand ( ) ;
402
412
format ! ( "insert into signer {sql_values}" )
403
413
} ;
@@ -461,8 +471,8 @@ mod tests {
461
471
fn insert_signer_record ( ) {
462
472
let signer_record = fake_signer_records ( 1 ) . first ( ) . unwrap ( ) . to_owned ( ) ;
463
473
let connection = Connection :: open ( ":memory:" ) . unwrap ( ) ;
464
- let provider = InsertSignerRecordProvider :: new ( & connection) ;
465
- let condition = provider. get_insert_condition ( signer_record. clone ( ) ) ;
474
+ let provider = RegisterSignerRecordProvider :: new ( & connection) ;
475
+ let condition = provider. get_register_condition ( signer_record. clone ( ) ) ;
466
476
let ( values, params) = condition. expand ( ) ;
467
477
468
478
assert_eq ! (
@@ -485,8 +495,8 @@ mod tests {
485
495
fn update_signer_record ( ) {
486
496
let signer_records = fake_signer_records ( 2 ) ;
487
497
let connection = Connection :: open ( ":memory:" ) . unwrap ( ) ;
488
- let provider = UpdateSignerRecordProvider :: new ( & connection) ;
489
- let condition = provider. get_update_condition ( signer_records. clone ( ) ) ;
498
+ let provider = ImportSignerRecordProvider :: new ( & connection) ;
499
+ let condition = provider. get_import_condition ( signer_records. clone ( ) ) ;
490
500
let ( values, params) = condition. expand ( ) ;
491
501
492
502
assert_eq ! (
@@ -551,7 +561,7 @@ mod tests {
551
561
let connection = Connection :: open ( ":memory:" ) . unwrap ( ) ;
552
562
setup_signer_db ( & connection, Vec :: new ( ) ) . unwrap ( ) ;
553
563
554
- let provider = InsertSignerRecordProvider :: new ( & connection) ;
564
+ let provider = RegisterSignerRecordProvider :: new ( & connection) ;
555
565
556
566
for signer_record in signer_records_fake. clone ( ) {
557
567
let signer_record_saved = provider. persist ( signer_record. clone ( ) ) . unwrap ( ) ;
@@ -572,7 +582,7 @@ mod tests {
572
582
let connection = Connection :: open ( ":memory:" ) . unwrap ( ) ;
573
583
setup_signer_db ( & connection, signer_records_fake. clone ( ) ) . unwrap ( ) ;
574
584
575
- let provider = UpdateSignerRecordProvider :: new ( & connection) ;
585
+ let provider = ImportSignerRecordProvider :: new ( & connection) ;
576
586
577
587
for signer_record in signer_records_fake. clone ( ) {
578
588
let signer_record_saved = provider. persist ( signer_record. clone ( ) ) . unwrap ( ) ;
@@ -595,7 +605,7 @@ mod tests {
595
605
let connection = Connection :: open ( ":memory:" ) . unwrap ( ) ;
596
606
setup_signer_db ( & connection, signer_records_fake. clone ( ) ) . unwrap ( ) ;
597
607
598
- let provider = UpdateSignerRecordProvider :: new ( & connection) ;
608
+ let provider = ImportSignerRecordProvider :: new ( & connection) ;
599
609
let mut saved_records = provider. persist_many ( signer_records_fake. clone ( ) ) . unwrap ( ) ;
600
610
saved_records. sort_by ( |a, b| a. signer_id . cmp ( & b. signer_id ) ) ;
601
611
assert_eq ! ( signer_records_fake, saved_records) ;
@@ -638,31 +648,90 @@ mod tests {
638
648
639
649
for signer_record in signer_records_fake. clone ( ) {
640
650
store_recorder
641
- . record_signer_id ( signer_record. signer_id . clone ( ) )
651
+ . record_signer_registration ( signer_record. signer_id . clone ( ) )
642
652
. await
643
- . expect ( "record_signer_id should not fail" ) ;
653
+ . expect ( "record_signer_registration should not fail" ) ;
644
654
let connection = & * connection. lock ( ) . await ;
645
655
let provider = SignerRecordProvider :: new ( connection) ;
646
656
let signer_records_stored: Vec < SignerRecord > = provider
647
657
. get_by_signer_id ( signer_record. signer_id )
648
658
. unwrap ( )
649
659
. collect :: < Vec < _ > > ( ) ;
650
660
assert_eq ! ( 1 , signer_records_stored. len( ) ) ;
661
+ assert ! (
662
+ signer_records_stored
663
+ . iter( )
664
+ . all( |s| s. registered_at. is_some( ) ) ,
665
+ "registering a signer should set the registration date"
666
+ )
651
667
}
668
+ }
669
+
670
+ #[ tokio:: test]
671
+ async fn test_store_import_signer ( ) {
672
+ let signer_records_fake = fake_signer_records ( 5 ) ;
673
+
674
+ let connection = Connection :: open ( ":memory:" ) . unwrap ( ) ;
675
+ setup_signer_db ( & connection, Vec :: new ( ) ) . unwrap ( ) ;
676
+
677
+ let connection = Arc :: new ( Mutex :: new ( connection) ) ;
678
+ let store = SignerStore :: new ( connection. clone ( ) ) ;
652
679
653
680
for signer_record in signer_records_fake {
654
- let pool_ticker = Some ( format ! ( "new-pool-{}" , signer_record. signer_id) ) ;
655
- store_recorder
656
- . record_signer_pool_ticker ( signer_record. signer_id . clone ( ) , pool_ticker. clone ( ) )
681
+ store
682
+ . import_signer (
683
+ signer_record. signer_id . clone ( ) ,
684
+ signer_record. pool_ticker . clone ( ) ,
685
+ )
657
686
. await
658
- . expect ( "record_signer_pool_ticker should not fail" ) ;
687
+ . expect ( "import_signer should not fail" ) ;
659
688
let connection = & * connection. lock ( ) . await ;
660
689
let provider = SignerRecordProvider :: new ( connection) ;
661
690
let signer_records_stored: Vec < SignerRecord > = provider
662
691
. get_by_signer_id ( signer_record. signer_id )
663
692
. unwrap ( )
664
693
. collect :: < Vec < _ > > ( ) ;
665
- assert_eq ! ( pool_ticker, signer_records_stored[ 0 ] . to_owned( ) . pool_ticker) ;
694
+ assert_eq ! (
695
+ signer_record. pool_ticker,
696
+ signer_records_stored[ 0 ] . to_owned( ) . pool_ticker
697
+ ) ;
698
+ assert ! (
699
+ signer_records_stored
700
+ . iter( )
701
+ . all( |s| s. registered_at. is_none( ) ) ,
702
+ "imported signer should not have a registration date"
703
+ )
666
704
}
667
705
}
706
+
707
+ #[ tokio:: test]
708
+ async fn test_store_import_many_signers ( ) {
709
+ let signers_fake: BTreeMap < _ , _ > = fake_signer_records ( 5 )
710
+ . into_iter ( )
711
+ . map ( |r| ( r. signer_id , r. pool_ticker ) )
712
+ . collect ( ) ;
713
+
714
+ let connection = Connection :: open ( ":memory:" ) . unwrap ( ) ;
715
+ setup_signer_db ( & connection, Vec :: new ( ) ) . unwrap ( ) ;
716
+ let store = SignerStore :: new ( Arc :: new ( Mutex :: new ( connection) ) ) ;
717
+
718
+ store
719
+ . import_many_signers ( signers_fake. clone ( ) . into_iter ( ) . collect ( ) )
720
+ . await
721
+ . expect ( "import_many_signers should not fail" ) ;
722
+
723
+ let signer_records_stored = store. get_all ( ) . await . unwrap ( ) ;
724
+ let signers_stored = signer_records_stored
725
+ . iter ( )
726
+ . cloned ( )
727
+ . map ( |r| ( r. signer_id , r. pool_ticker ) )
728
+ . collect ( ) ;
729
+ assert_eq ! ( signers_fake, signers_stored) ;
730
+ assert ! (
731
+ signer_records_stored
732
+ . iter( )
733
+ . all( |s| s. registered_at. is_none( ) ) ,
734
+ "imported signer should not have a registration date"
735
+ ) ;
736
+ }
668
737
}
0 commit comments