Skip to content

Commit 59b5f31

Browse files
committed
Pruning of old vkeys is now done manually instead of when saving
This in order to prune only when needed, reducing the risk of doing it on data that are not yet to be pruned. The manual call will be likely done when a signer registration round open.
1 parent 4094a14 commit 59b5f31

File tree

5 files changed

+79
-103
lines changed

5 files changed

+79
-103
lines changed

mithril-aggregator/src/database/provider/signer_registration.rs

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -394,17 +394,12 @@ impl<'conn> DeleteSignerRegistrationRecordProvider<'conn> {
394394
/// Service to deal with signer_registration (read & write).
395395
pub struct SignerRegistrationStore {
396396
connection: Arc<Mutex<Connection>>,
397-
/// Number of epoch before previous records will be deleted at the next save
398-
epoch_retention_limit: Option<u64>,
399397
}
400398

401399
impl SignerRegistrationStore {
402400
/// Create a new [SignerRegistrationStore] service
403-
pub fn new(connection: Arc<Mutex<Connection>>, epoch_retention_limit: Option<u64>) -> Self {
404-
Self {
405-
connection,
406-
epoch_retention_limit,
407-
}
401+
pub fn new(connection: Arc<Mutex<Connection>>) -> Self {
402+
Self { connection }
408403
}
409404
}
410405

@@ -428,17 +423,6 @@ impl VerificationKeyStorer for SignerRegistrationStore {
428423
))
429424
.map_err(|e| AdapterError::GeneralError(format!("{e}")))?;
430425

431-
if let Some(threshold) = self.epoch_retention_limit {
432-
// Note: this means that if called with an epoch in the future this could remove all
433-
// current records, the caller should check that the given epoch is the current one
434-
// or we must get it to do the work ourself here.
435-
let _deleted_records = DeleteSignerRegistrationRecordProvider::new(connection)
436-
// we want to prune including the given epoch (+1)
437-
.prune(epoch - threshold + 1)
438-
.map_err(|e| AdapterError::QueryError(e))?
439-
.collect::<Vec<_>>();
440-
}
441-
442426
match existing_record {
443427
None => Ok(None),
444428
Some(previous_record) => Ok(Some(previous_record.into())),
@@ -463,6 +447,17 @@ impl VerificationKeyStorer for SignerRegistrationStore {
463447
false => Ok(Some(signer_with_stakes)),
464448
}
465449
}
450+
451+
async fn prune_verification_keys(&self, max_epoch_to_prune: Epoch) -> Result<(), StoreError> {
452+
let connection = &*self.connection.lock().await;
453+
let _deleted_records = DeleteSignerRegistrationRecordProvider::new(connection)
454+
// we want to prune including the given epoch (+1)
455+
.prune(max_epoch_to_prune + 1)
456+
.map_err(|e| AdapterError::QueryError(e))?
457+
.collect::<Vec<_>>();
458+
459+
Ok(())
460+
}
466461
}
467462

468463
#[cfg(test)]
@@ -805,7 +800,6 @@ mod tests {
805800

806801
pub fn init_signer_registration_store(
807802
initial_data: Vec<(Epoch, HashMap<PartyId, SignerWithStake>)>,
808-
retention_limit: Option<usize>,
809803
) -> Arc<dyn VerificationKeyStorer> {
810804
let connection = Connection::open(":memory:").unwrap();
811805
let initial_data: Vec<(Epoch, Vec<SignerWithStake>)> = initial_data
@@ -815,10 +809,9 @@ mod tests {
815809

816810
setup_signer_registration_db(&connection, initial_data).unwrap();
817811

818-
Arc::new(SignerRegistrationStore::new(
819-
Arc::new(Mutex::new(connection)),
820-
retention_limit.map(|threshold| threshold as u64),
821-
))
812+
Arc::new(SignerRegistrationStore::new(Arc::new(Mutex::new(
813+
connection,
814+
))))
822815
}
823816

824817
test_verification_key_storer!(

mithril-aggregator/src/dependency_injection/builder.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,6 @@ impl DependenciesBuilder {
404404
async fn build_verification_key_store(&mut self) -> Result<Arc<dyn VerificationKeyStorer>> {
405405
Ok(Arc::new(SignerRegistrationStore::new(
406406
self.get_sqlite_connection().await?,
407-
self.configuration.store_retention_limit.map(|l| l as u64),
408407
)))
409408
}
410409

mithril-aggregator/src/multi_signer.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -595,10 +595,9 @@ mod tests {
595595

596596
async fn setup_multi_signer() -> MultiSignerImpl {
597597
let beacon = fake_data::beacon();
598-
let verification_key_store = VerificationKeyStore::new(
599-
Box::new(MemoryAdapter::<Epoch, HashMap<PartyId, SignerWithStake>>::new(None).unwrap()),
600-
None,
601-
);
598+
let verification_key_store = VerificationKeyStore::new(Box::new(
599+
MemoryAdapter::<Epoch, HashMap<PartyId, SignerWithStake>>::new(None).unwrap(),
600+
));
602601
let stake_store = StakeStore::new(
603602
Box::new(MemoryAdapter::<Epoch, StakeDistribution>::new(None).unwrap()),
604603
None,

mithril-aggregator/src/signer_registerer.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,9 @@ mod tests {
295295
#[tokio::test]
296296
async fn can_register_signer_if_registration_round_is_opened_with_operational_certificate() {
297297
let chain_observer = FakeObserver::default();
298-
let verification_key_store = Arc::new(VerificationKeyStore::new(
299-
Box::new(MemoryAdapter::<Epoch, HashMap<PartyId, SignerWithStake>>::new(None).unwrap()),
300-
None,
301-
));
298+
let verification_key_store = Arc::new(VerificationKeyStore::new(Box::new(
299+
MemoryAdapter::<Epoch, HashMap<PartyId, SignerWithStake>>::new(None).unwrap(),
300+
)));
302301
let mut signer_recorder = MockSignerRecorder::new();
303302
signer_recorder
304303
.expect_record_signer_id()
@@ -341,10 +340,9 @@ mod tests {
341340
#[tokio::test]
342341
async fn can_register_signer_if_registration_round_is_opened_without_operational_certificate() {
343342
let chain_observer = FakeObserver::default();
344-
let verification_key_store = Arc::new(VerificationKeyStore::new(
345-
Box::new(MemoryAdapter::<Epoch, HashMap<PartyId, SignerWithStake>>::new(None).unwrap()),
346-
None,
347-
));
343+
let verification_key_store = Arc::new(VerificationKeyStore::new(Box::new(
344+
MemoryAdapter::<Epoch, HashMap<PartyId, SignerWithStake>>::new(None).unwrap(),
345+
)));
348346
let mut signer_recorder = MockSignerRecorder::new();
349347
signer_recorder
350348
.expect_record_signer_id()
@@ -390,10 +388,9 @@ mod tests {
390388
#[tokio::test]
391389
async fn cant_register_signer_if_registration_round_is_not_opened() {
392390
let chain_observer = FakeObserver::default();
393-
let verification_key_store = Arc::new(VerificationKeyStore::new(
394-
Box::new(MemoryAdapter::<Epoch, HashMap<PartyId, SignerWithStake>>::new(None).unwrap()),
395-
None,
396-
));
391+
let verification_key_store = Arc::new(VerificationKeyStore::new(Box::new(
392+
MemoryAdapter::<Epoch, HashMap<PartyId, SignerWithStake>>::new(None).unwrap(),
393+
)));
397394
let signer_recorder = MockSignerRecorder::new();
398395
let signer_registerer = MithrilSignerRegisterer::new(
399396
Arc::new(chain_observer),

mithril-aggregator/src/store/verification_key_store.rs

Lines changed: 51 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use async_trait::async_trait;
2-
use mithril_common::store::StorePruner;
32
use std::collections::HashMap;
43
use tokio::sync::RwLock;
54

@@ -24,40 +23,25 @@ pub trait VerificationKeyStorer: Sync + Send {
2423
&self,
2524
epoch: Epoch,
2625
) -> Result<Option<HashMap<PartyId, Signer>>, StoreError>;
26+
27+
/// Prune all verification keys that are at or below the given epoch.
28+
async fn prune_verification_keys(&self, max_epoch_to_prune: Epoch) -> Result<(), StoreError>;
2729
}
2830

2931
/// Store for the `VerificationKey`.
3032
pub struct VerificationKeyStore {
3133
adapter: RwLock<Adapter>,
32-
retention_limit: Option<usize>,
3334
}
3435

3536
impl VerificationKeyStore {
3637
/// Create a new instance.
37-
pub fn new(adapter: Adapter, retention_limit: Option<usize>) -> Self {
38+
pub fn new(adapter: Adapter) -> Self {
3839
Self {
3940
adapter: RwLock::new(adapter),
40-
retention_limit,
4141
}
4242
}
4343
}
4444

45-
#[async_trait]
46-
impl StorePruner for VerificationKeyStore {
47-
type Key = Epoch;
48-
type Record = HashMap<PartyId, SignerWithStake>;
49-
50-
fn get_adapter(
51-
&self,
52-
) -> &RwLock<Box<dyn StoreAdapter<Key = Self::Key, Record = Self::Record>>> {
53-
&self.adapter
54-
}
55-
56-
fn get_max_records(&self) -> Option<usize> {
57-
self.retention_limit
58-
}
59-
}
60-
6145
#[async_trait]
6246
impl VerificationKeyStorer for VerificationKeyStore {
6347
async fn save_verification_key(
@@ -75,7 +59,6 @@ impl VerificationKeyStorer for VerificationKeyStore {
7559
.await
7660
.store_record(&epoch, &signers)
7761
.await?;
78-
self.prune().await?;
7962

8063
Ok(prev_signer)
8164
}
@@ -87,6 +70,21 @@ impl VerificationKeyStorer for VerificationKeyStore {
8770
let record = self.adapter.read().await.get_record(&epoch).await?;
8871
Ok(record.map(|h| h.into_iter().map(|(k, v)| (k, v.into())).collect()))
8972
}
73+
74+
async fn prune_verification_keys(&self, max_epoch_to_prune: Epoch) -> Result<(), StoreError> {
75+
let mut adapter = self.adapter.write().await;
76+
77+
for (epoch, _record) in adapter
78+
.get_last_n_records(usize::MAX)
79+
.await?
80+
.into_iter()
81+
.filter(|(e, _)| e <= &max_epoch_to_prune)
82+
{
83+
adapter.remove(&epoch).await?;
84+
}
85+
86+
Ok(())
87+
}
9088
}
9189

9290
/// Macro that generate tests that a [VerificationKeyStorer] must pass
@@ -118,8 +116,8 @@ macro_rules! test_verification_key_storer {
118116
}
119117

120118
#[tokio::test]
121-
async fn check_retention_limit() {
122-
test_suite::check_retention_limit(&$store_builder).await;
119+
async fn can_prune_keys_from_given_epoch_retention_limit() {
120+
test_suite::can_prune_keys_from_given_epoch_retention_limit(&$store_builder).await;
123121
}
124122
}
125123
};
@@ -139,11 +137,8 @@ pub mod test_suite {
139137

140138
/// A builder of [VerificationKeyStorer], the arguments are:
141139
/// * initial_data
142-
/// * optional retention limit
143-
type StoreBuilder = dyn Fn(
144-
Vec<(Epoch, HashMap<PartyId, SignerWithStake>)>,
145-
Option<usize>,
146-
) -> Arc<dyn VerificationKeyStorer>;
140+
type StoreBuilder =
141+
dyn Fn(Vec<(Epoch, HashMap<PartyId, SignerWithStake>)>) -> Arc<dyn VerificationKeyStorer>;
147142

148143
fn build_signers(
149144
nb_epoch: u64,
@@ -177,7 +172,7 @@ pub mod test_suite {
177172

178173
pub async fn save_key_in_empty_store(store_builder: &StoreBuilder) {
179174
let signers = build_signers(0, 0);
180-
let store = store_builder(signers, None);
175+
let store = store_builder(signers);
181176
let res = store
182177
.save_verification_key(
183178
Epoch(0),
@@ -198,7 +193,7 @@ pub mod test_suite {
198193

199194
pub async fn update_signer_in_store(store_builder: &StoreBuilder) {
200195
let signers = build_signers(1, 1);
201-
let store = store_builder(signers, None);
196+
let store = store_builder(signers);
202197
let res = store
203198
.save_verification_key(
204199
Epoch(1),
@@ -229,23 +224,23 @@ pub mod test_suite {
229224

230225
pub async fn get_verification_keys_for_empty_epoch(store_builder: &StoreBuilder) {
231226
let signers = build_signers(2, 1);
232-
let store = store_builder(signers, None);
227+
let store = store_builder(signers);
233228
let res = store.get_verification_keys(Epoch(0)).await.unwrap();
234229

235230
assert!(res.is_none());
236231
}
237232

238233
pub async fn get_verification_keys_for_existing_epoch(store_builder: &StoreBuilder) {
239234
let signers = build_signers(2, 2);
235+
let store = store_builder(signers.clone());
236+
240237
let expected_signers: Option<BTreeMap<PartyId, Signer>> = signers
241-
.iter()
238+
.into_iter()
242239
.filter(|(e, _)| e == 1)
243-
.cloned()
244240
.map(|(_, signers)| {
245241
BTreeMap::from_iter(signers.into_iter().map(|(p, s)| (p, s.into())))
246242
})
247243
.next();
248-
let store = store_builder(signers, None);
249244
let res = store
250245
.get_verification_keys(Epoch(1))
251246
.await
@@ -255,30 +250,27 @@ pub mod test_suite {
255250
assert_eq!(expected_signers, res);
256251
}
257252

258-
pub async fn check_retention_limit(store_builder: &StoreBuilder) {
259-
let signers = build_signers(2, 2);
260-
let store = store_builder(signers, Some(2));
261-
assert!(store
262-
.get_verification_keys(Epoch(1))
263-
.await
264-
.unwrap()
265-
.is_some());
266-
let _ = store
267-
.save_verification_key(
268-
Epoch(3),
269-
SignerWithStake {
270-
party_id: "party_id".to_string(),
271-
verification_key: "whatever".to_string(),
272-
verification_key_signature: None,
273-
operational_certificate: None,
274-
kes_period: None,
275-
stake: 10,
276-
},
277-
)
278-
.await
279-
.unwrap();
280-
let first_epoch_keys = store.get_verification_keys(Epoch(1)).await.unwrap();
281-
assert_eq!(None, first_epoch_keys);
253+
pub async fn can_prune_keys_from_given_epoch_retention_limit(store_builder: &StoreBuilder) {
254+
let signers = build_signers(6, 2);
255+
let store = store_builder(signers);
256+
257+
for epoch in 1..6 {
258+
assert!(
259+
store
260+
.get_verification_keys(Epoch(epoch))
261+
.await
262+
.unwrap()
263+
.is_some(),
264+
"Keys should exist before pruning"
265+
);
266+
store
267+
.prune_verification_keys(Epoch(epoch))
268+
.await
269+
.expect("Pruning should not fail");
270+
271+
let pruned_epoch_keys = store.get_verification_keys(Epoch(epoch)).await.unwrap();
272+
assert_eq!(None, pruned_epoch_keys);
273+
}
282274
}
283275
}
284276

@@ -294,7 +286,6 @@ mod tests {
294286

295287
pub fn init_store(
296288
initial_data: Vec<(Epoch, HashMap<PartyId, SignerWithStake>)>,
297-
retention_limit: Option<usize>,
298289
) -> Arc<dyn VerificationKeyStorer> {
299290
let values = if initial_data.is_empty() {
300291
None
@@ -305,10 +296,7 @@ mod tests {
305296
let adapter: MemoryAdapter<Epoch, HashMap<PartyId, SignerWithStake>> =
306297
MemoryAdapter::new(values).unwrap();
307298

308-
Arc::new(VerificationKeyStore::new(
309-
Box::new(adapter),
310-
retention_limit,
311-
))
299+
Arc::new(VerificationKeyStore::new(Box::new(adapter)))
312300
}
313301

314302
test_verification_key_storer!(

0 commit comments

Comments
 (0)