|
1 | 1 | use async_trait::async_trait; |
2 | 2 | use chrono::{DateTime, Utc}; |
3 | 3 | use sqlite::{Connection, Value}; |
4 | | -use std::{ |
5 | | - collections::{BTreeMap, HashMap}, |
6 | | - sync::Arc, |
7 | | -}; |
| 4 | +use std::{collections::HashMap, sync::Arc}; |
8 | 5 | use tokio::sync::Mutex; |
9 | 6 |
|
10 | 7 | use mithril_common::{ |
11 | 8 | crypto_helper::KESPeriod, |
12 | 9 | entities::{ |
13 | 10 | Epoch, HexEncodedOpCert, HexEncodedVerificationKey, HexEncodedVerificationKeySignature, |
14 | | - PartyId, SignerWithStake, Stake, |
| 11 | + PartyId, Signer, SignerWithStake, Stake, |
15 | 12 | }, |
16 | 13 | sqlite::{ |
17 | 14 | EntityCursor, HydrationError, Projection, Provider, SourceAlias, SqLiteEntity, |
18 | 15 | WhereCondition, |
19 | 16 | }, |
20 | | - store::adapter::{AdapterError, StoreAdapter}, |
| 17 | + store::{adapter::AdapterError, StoreError}, |
21 | 18 | StdError, |
22 | 19 | }; |
23 | 20 |
|
| 21 | +use crate::VerificationKeyStorer; |
| 22 | + |
24 | 23 | /// SignerRegistration record is the representation of a stored signer_registration. |
25 | 24 | #[derive(Debug, PartialEq, Clone)] |
26 | 25 | pub struct SignerRegistrationRecord { |
@@ -64,9 +63,21 @@ impl SignerRegistrationRecord { |
64 | 63 | } |
65 | 64 | } |
66 | 65 |
|
| 66 | +impl From<SignerRegistrationRecord> for Signer { |
| 67 | + fn from(other: SignerRegistrationRecord) -> Self { |
| 68 | + Self { |
| 69 | + party_id: other.signer_id, |
| 70 | + verification_key: other.verification_key, |
| 71 | + verification_key_signature: other.verification_key_signature, |
| 72 | + operational_certificate: other.operational_certificate, |
| 73 | + kes_period: other.kes_period, |
| 74 | + } |
| 75 | + } |
| 76 | +} |
| 77 | + |
67 | 78 | impl From<SignerRegistrationRecord> for SignerWithStake { |
68 | | - fn from(other: SignerRegistrationRecord) -> SignerWithStake { |
69 | | - SignerWithStake { |
| 79 | + fn from(other: SignerRegistrationRecord) -> Self { |
| 80 | + Self { |
70 | 81 | party_id: other.signer_id, |
71 | 82 | verification_key: other.verification_key, |
72 | 83 | verification_key_signature: other.verification_key_signature, |
@@ -381,144 +392,82 @@ impl<'conn> DeleteSignerRegistrationRecordProvider<'conn> { |
381 | 392 | } |
382 | 393 |
|
383 | 394 | /// Service to deal with signer_registration (read & write). |
384 | | -pub struct SignerRegistrationStoreAdapter { |
| 395 | +pub struct SignerRegistrationStore { |
385 | 396 | connection: Arc<Mutex<Connection>>, |
386 | 397 | } |
387 | 398 |
|
388 | | -impl SignerRegistrationStoreAdapter { |
389 | | - /// Create a new SignerRegistrationStoreAdapter service |
| 399 | +impl SignerRegistrationStore { |
| 400 | + /// Create a new [SignerRegistrationStore] service |
390 | 401 | pub fn new(connection: Arc<Mutex<Connection>>) -> Self { |
391 | 402 | Self { connection } |
392 | 403 | } |
393 | 404 | } |
394 | 405 |
|
395 | 406 | #[async_trait] |
396 | | -impl StoreAdapter for SignerRegistrationStoreAdapter { |
397 | | - type Key = Epoch; |
398 | | - type Record = HashMap<PartyId, SignerWithStake>; |
399 | | - |
400 | | - async fn store_record( |
401 | | - &mut self, |
402 | | - key: &Self::Key, |
403 | | - record: &Self::Record, |
404 | | - ) -> Result<(), AdapterError> { |
| 407 | +impl VerificationKeyStorer for SignerRegistrationStore { |
| 408 | + async fn save_verification_key( |
| 409 | + &self, |
| 410 | + epoch: Epoch, |
| 411 | + signer: SignerWithStake, |
| 412 | + ) -> Result<Option<SignerWithStake>, StoreError> { |
405 | 413 | let connection = &*self.connection.lock().await; |
406 | 414 | let provider = InsertOrReplaceSignerRegistrationRecordProvider::new(connection); |
407 | | - connection |
408 | | - .execute("begin transaction") |
409 | | - .map_err(|e| AdapterError::QueryError(e.into()))?; |
410 | | - |
411 | | - for signer_with_stake in record.values() { |
412 | | - let _signer_registration_record = provider |
413 | | - .persist(SignerRegistrationRecord::from_signer_with_stake( |
414 | | - signer_with_stake.to_owned(), |
415 | | - *key, |
416 | | - )) |
417 | | - .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; |
418 | | - } |
419 | | - |
420 | | - connection |
421 | | - .execute("commit transaction") |
422 | | - .map_err(|e| AdapterError::QueryError(e.into()))?; |
423 | | - |
424 | | - Ok(()) |
425 | | - } |
426 | | - |
427 | | - async fn get_record(&self, key: &Self::Key) -> Result<Option<Self::Record>, AdapterError> { |
428 | | - let connection = &*self.connection.lock().await; |
429 | | - let provider = SignerRegistrationRecordProvider::new(connection); |
430 | | - let cursor = provider |
431 | | - .get_by_epoch(key) |
| 415 | + let existing_record = SignerRegistrationRecordProvider::new(connection) |
| 416 | + .get_by_signer_id_and_epoch(signer.party_id.clone(), &epoch) |
| 417 | + .map_err(|e| AdapterError::QueryError(e))? |
| 418 | + .next(); |
| 419 | + |
| 420 | + let _updated_record = provider |
| 421 | + .persist(SignerRegistrationRecord::from_signer_with_stake( |
| 422 | + signer, epoch, |
| 423 | + )) |
432 | 424 | .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; |
433 | | - let mut signer_with_stakes = HashMap::new(); |
434 | | - for signer_registration_record in cursor { |
435 | | - signer_with_stakes.insert( |
436 | | - signer_registration_record.signer_id.to_string(), |
437 | | - signer_registration_record.into(), |
438 | | - ); |
439 | | - } |
440 | | - if signer_with_stakes.is_empty() { |
441 | | - Ok(None) |
442 | | - } else { |
443 | | - Ok(Some(signer_with_stakes)) |
444 | | - } |
445 | | - } |
446 | 425 |
|
447 | | - async fn record_exists(&self, key: &Self::Key) -> Result<bool, AdapterError> { |
448 | | - Ok(self.get_record(key).await?.is_some()) |
| 426 | + match existing_record { |
| 427 | + None => Ok(None), |
| 428 | + Some(previous_record) => Ok(Some(previous_record.into())), |
| 429 | + } |
449 | 430 | } |
450 | 431 |
|
451 | | - async fn get_last_n_records( |
| 432 | + async fn get_verification_keys( |
452 | 433 | &self, |
453 | | - how_many: usize, |
454 | | - ) -> Result<Vec<(Self::Key, Self::Record)>, AdapterError> { |
| 434 | + epoch: Epoch, |
| 435 | + ) -> Result<Option<HashMap<PartyId, Signer>>, StoreError> { |
455 | 436 | let connection = &*self.connection.lock().await; |
456 | 437 | let provider = SignerRegistrationRecordProvider::new(connection); |
457 | 438 | let cursor = provider |
458 | | - .get_all() |
459 | | - .map_err(|e| AdapterError::GeneralError(format!("{e}")))? |
460 | | - .collect::<Vec<_>>() |
461 | | - .into_iter() |
462 | | - .rev(); |
463 | | - let signer_with_stake_by_epoch: BTreeMap<Self::Key, Self::Record> = cursor.fold( |
464 | | - BTreeMap::<Self::Key, Self::Record>::new(), |
465 | | - |mut acc, signer_registration_record| { |
466 | | - let epoch = signer_registration_record.epoch_setting_id; |
467 | | - let mut signer_with_stakes: Self::Record = |
468 | | - if let Some(signer_with_stakes) = acc.get_mut(&epoch) { |
469 | | - signer_with_stakes.to_owned() |
470 | | - } else { |
471 | | - HashMap::new() |
472 | | - }; |
473 | | - signer_with_stakes.insert( |
474 | | - signer_registration_record.signer_id.to_string(), |
475 | | - signer_registration_record.into(), |
476 | | - ); |
477 | | - acc.insert(epoch, signer_with_stakes); |
478 | | - acc |
479 | | - }, |
480 | | - ); |
481 | | - Ok(signer_with_stake_by_epoch |
482 | | - .into_iter() |
483 | | - .rev() |
484 | | - .take(how_many) |
485 | | - .collect()) |
486 | | - } |
487 | | - |
488 | | - async fn remove(&mut self, key: &Self::Key) -> Result<Option<Self::Record>, AdapterError> { |
489 | | - let connection = &*self.connection.lock().await; |
490 | | - let provider = DeleteSignerRegistrationRecordProvider::new(connection); |
491 | | - let cursor = provider |
492 | | - .delete(*key) |
| 439 | + .get_by_epoch(&epoch) |
493 | 440 | .map_err(|e| AdapterError::GeneralError(format!("{e}")))?; |
494 | | - let mut signer_with_stakes = HashMap::new(); |
495 | | - for signer_registration_record in cursor { |
496 | | - signer_with_stakes.insert( |
497 | | - signer_registration_record.signer_id.to_string(), |
498 | | - signer_registration_record.into(), |
499 | | - ); |
500 | | - } |
501 | 441 |
|
502 | | - if signer_with_stakes.is_empty() { |
503 | | - Ok(None) |
504 | | - } else { |
505 | | - Ok(Some(signer_with_stakes)) |
| 442 | + let signer_with_stakes: HashMap<PartyId, Signer> = |
| 443 | + HashMap::from_iter(cursor.map(|record| (record.signer_id.to_owned(), record.into()))); |
| 444 | + |
| 445 | + match signer_with_stakes.is_empty() { |
| 446 | + true => Ok(None), |
| 447 | + false => Ok(Some(signer_with_stakes)), |
506 | 448 | } |
507 | 449 | } |
508 | 450 |
|
509 | | - async fn get_iter(&self) -> Result<Box<dyn Iterator<Item = Self::Record> + '_>, AdapterError> { |
510 | | - let records = self.get_last_n_records(usize::MAX).await?; |
511 | | - Ok(Box::new(records.into_iter().map(|(_k, v)| v))) |
| 451 | + async fn prune_verification_keys(&self, max_epoch_to_prune: Epoch) -> Result<(), StoreError> { |
| 452 | + let connection = &*self.connection.lock().await; |
| 453 | + let _deleted_records = DeleteSignerRegistrationRecordProvider::new(connection) |
| 454 | + // we want to prune including the given epoch (+1) |
| 455 | + .prune(max_epoch_to_prune + 1) |
| 456 | + .map_err(|e| AdapterError::QueryError(e))? |
| 457 | + .collect::<Vec<_>>(); |
| 458 | + |
| 459 | + Ok(()) |
512 | 460 | } |
513 | 461 | } |
514 | 462 |
|
515 | 463 | #[cfg(test)] |
516 | 464 | mod tests { |
517 | | - use std::collections::{BTreeMap, HashMap}; |
| 465 | + use std::collections::HashMap; |
518 | 466 |
|
519 | 467 | use mithril_common::test_utils::MithrilFixtureBuilder; |
520 | 468 |
|
521 | 469 | use crate::database::provider::{apply_all_migrations_to_db, disable_foreign_key_support}; |
| 470 | + use crate::store::test_verification_key_storer; |
522 | 471 |
|
523 | 472 | use super::*; |
524 | 473 |
|
@@ -849,63 +798,24 @@ mod tests { |
849 | 798 | } |
850 | 799 | } |
851 | 800 |
|
852 | | - #[tokio::test] |
853 | | - async fn test_store_adapter() { |
854 | | - let fixture = MithrilFixtureBuilder::default().with_signers(5).build(); |
855 | | - let signer_with_stakes = fixture.signers_with_stake(); |
856 | | - let signer_with_stakes_by_epoch: Vec<(Epoch, HashMap<PartyId, SignerWithStake>)> = (0..5) |
857 | | - .map(|e| { |
858 | | - ( |
859 | | - Epoch(e), |
860 | | - signer_with_stakes |
861 | | - .clone() |
862 | | - .into_iter() |
863 | | - .map(|s| (s.party_id.to_owned(), s)) |
864 | | - .collect(), |
865 | | - ) |
866 | | - }) |
867 | | - .collect(); |
868 | | - |
| 801 | + pub fn init_signer_registration_store( |
| 802 | + initial_data: Vec<(Epoch, HashMap<PartyId, SignerWithStake>)>, |
| 803 | + ) -> Arc<dyn VerificationKeyStorer> { |
869 | 804 | let connection = Connection::open(":memory:").unwrap(); |
870 | | - setup_signer_registration_db(&connection, Vec::new()).unwrap(); |
871 | | - |
872 | | - let mut signer_registration_store_adapter = |
873 | | - SignerRegistrationStoreAdapter::new(Arc::new(Mutex::new(connection))); |
| 805 | + let initial_data: Vec<(Epoch, Vec<SignerWithStake>)> = initial_data |
| 806 | + .into_iter() |
| 807 | + .map(|(e, signers)| (e, signers.into_values().collect::<Vec<_>>())) |
| 808 | + .collect(); |
874 | 809 |
|
875 | | - for (epoch, signer_with_stakes) in &signer_with_stakes_by_epoch { |
876 | | - assert!(signer_registration_store_adapter |
877 | | - .store_record(epoch, signer_with_stakes) |
878 | | - .await |
879 | | - .is_ok()); |
880 | | - } |
| 810 | + setup_signer_registration_db(&connection, initial_data).unwrap(); |
881 | 811 |
|
882 | | - for (epoch, signer_with_stakes) in &signer_with_stakes_by_epoch { |
883 | | - assert!(signer_registration_store_adapter |
884 | | - .record_exists(epoch) |
885 | | - .await |
886 | | - .unwrap()); |
887 | | - assert_eq!( |
888 | | - Some(signer_with_stakes.to_owned()), |
889 | | - signer_registration_store_adapter |
890 | | - .get_record(epoch) |
891 | | - .await |
892 | | - .unwrap() |
893 | | - ); |
894 | | - } |
895 | | - assert_eq!( |
896 | | - signer_with_stakes_by_epoch |
897 | | - .clone() |
898 | | - .into_iter() |
899 | | - .map(|(k, v)| (k, BTreeMap::from_iter(v.into_iter()))) |
900 | | - .collect::<Vec<(Epoch, BTreeMap<PartyId, SignerWithStake>)>>(), |
901 | | - signer_registration_store_adapter |
902 | | - .get_last_n_records(signer_with_stakes_by_epoch.len()) |
903 | | - .await |
904 | | - .unwrap() |
905 | | - .into_iter() |
906 | | - .rev() |
907 | | - .map(|(k, v)| (k, BTreeMap::from_iter(v.into_iter()))) |
908 | | - .collect::<Vec<(Epoch, BTreeMap<PartyId, SignerWithStake>)>>() |
909 | | - ) |
| 812 | + Arc::new(SignerRegistrationStore::new(Arc::new(Mutex::new( |
| 813 | + connection, |
| 814 | + )))) |
910 | 815 | } |
| 816 | + |
| 817 | + test_verification_key_storer!( |
| 818 | + test_signer_registration_store => |
| 819 | + crate::database::provider::signer_registration::tests::init_signer_registration_store |
| 820 | + ); |
911 | 821 | } |
0 commit comments