Skip to content

Commit a35fc91

Browse files
authored
Merge pull request #808 from input-output-hk/greg/799/pruning
Add stake SQL store pruning
2 parents 7e2d358 + 2ac1fc6 commit a35fc91

File tree

4 files changed

+202
-21
lines changed

4 files changed

+202
-21
lines changed

mithril-aggregator/src/command_args.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use mithril_common::{
3434
};
3535

3636
use crate::{
37-
database::provider::StakePoolRepository,
37+
database::provider::StakePoolStore,
3838
event_store::{self, TransmitterService},
3939
http_server::routes::router,
4040
tools::{EraTools, GenesisTools, GenesisToolsDependency},
@@ -90,9 +90,9 @@ fn setup_genesis_dependencies(
9090
)?),
9191
config.store_retention_limit,
9292
));
93-
let stake_store = Arc::new(StakePoolRepository::new(Arc::new(Mutex::new(
94-
Connection::open(sqlite_db_path.clone().unwrap())?,
95-
))));
93+
let stake_store = Arc::new(StakePoolStore::new(Arc::new(Mutex::new(Connection::open(
94+
sqlite_db_path.clone().unwrap(),
95+
)?))));
9696
let single_signature_store = Arc::new(SingleSignatureStore::new(
9797
Box::new(SQLiteAdapter::new("single_signature", sqlite_db_path)?),
9898
config.store_retention_limit,
@@ -359,9 +359,9 @@ impl ServeCommand {
359359
)?),
360360
config.store_retention_limit,
361361
));
362-
let stake_store = Arc::new(StakePoolRepository::new(Arc::new(Mutex::new(
363-
Connection::open(sqlite_db_path.clone().unwrap())?,
364-
))));
362+
let stake_store = Arc::new(StakePoolStore::new(Arc::new(Mutex::new(Connection::open(
363+
sqlite_db_path.clone().unwrap(),
364+
)?))));
365365
let single_signature_store = Arc::new(SingleSignatureStore::new(
366366
Box::new(SQLiteAdapter::new(
367367
"single_signature",

mithril-aggregator/src/database/provider/stake_pool.rs

Lines changed: 181 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ use mithril_common::{
1818

1919
use mithril_common::StdError;
2020

21+
/// Delete stake pools for Epoch older than this.
22+
const STAKE_POOL_PRUNE_EPOCH_THRESHOLD: Epoch = Epoch(2);
23+
2124
/// Stake pool as read from Chain.
22-
/// TODO remove this compile directive ↓
23-
#[allow(dead_code)]
25+
#[derive(Debug, PartialEq)]
2426
pub struct StakePool {
2527
/// Pool Id
2628
stake_pool_id: PartyId,
@@ -119,7 +121,7 @@ impl<'client> Provider<'client> for StakePoolProvider<'client> {
119121
let aliases = SourceAlias::new(&[("{:stake_pool:}", "sp")]);
120122
let projection = Self::Entity::get_projection().expand(aliases);
121123

122-
format!("select {projection} from stake_pool as sp where {condition}")
124+
format!("select {projection} from stake_pool as sp where {condition} order by epoch asc, stake desc")
123125
}
124126
}
125127

@@ -186,20 +188,62 @@ impl<'conn> Provider<'conn> for UpdateStakePoolProvider<'conn> {
186188
}
187189
}
188190

191+
/// Provider to remove old data from the stake_pool table
192+
pub struct DeleteStakePoolProvider<'conn> {
193+
connection: &'conn Connection,
194+
}
195+
196+
impl<'conn> Provider<'conn> for DeleteStakePoolProvider<'conn> {
197+
type Entity = StakePool;
198+
199+
fn get_connection(&'conn self) -> &'conn Connection {
200+
self.connection
201+
}
202+
203+
fn get_definition(&self, condition: &str) -> String {
204+
// it is important to alias the fields with the same name as the table
205+
// since the table cannot be aliased in a RETURNING statement in SQLite.
206+
let projection = Self::Entity::get_projection()
207+
.expand(SourceAlias::new(&[("{:stake_pool:}", "stake_pool")]));
208+
209+
format!("delete from stake_pool where {condition} returning {projection}")
210+
}
211+
}
212+
213+
impl<'conn> DeleteStakePoolProvider<'conn> {
214+
/// Create a new instance
215+
pub fn new(connection: &'conn Connection) -> Self {
216+
Self { connection }
217+
}
218+
219+
/// Create the SQL condition to prune data older than the given Epoch.
220+
fn get_prune_condition(&self, epoch_threshold: Epoch) -> WhereCondition {
221+
let epoch_value = Value::Integer(i64::try_from(epoch_threshold.0).unwrap());
222+
223+
WhereCondition::new("epoch < ?*", vec![epoch_value])
224+
}
225+
226+
/// Prune the stake pools data older than the given epoch.
227+
pub fn prune(&self, epoch_threshold: Epoch) -> Result<EntityCursor<StakePool>, StdError> {
228+
let filters = self.get_prune_condition(epoch_threshold);
229+
230+
self.find(filters)
231+
}
232+
}
189233
/// Service to deal with stake pools (read & write).
190-
pub struct StakePoolRepository {
234+
pub struct StakePoolStore {
191235
connection: Arc<Mutex<Connection>>,
192236
}
193237

194-
impl StakePoolRepository {
238+
impl StakePoolStore {
195239
/// Create a new StakePool service
196240
pub fn new(connection: Arc<Mutex<Connection>>) -> Self {
197241
Self { connection }
198242
}
199243
}
200244

201245
#[async_trait]
202-
impl StakeStorer for StakePoolRepository {
246+
impl StakeStorer for StakePoolStore {
203247
async fn save_stakes(
204248
&self,
205249
epoch: Epoch,
@@ -221,6 +265,12 @@ impl StakeStorer for StakePoolRepository {
221265
.map_err(|e| AdapterError::GeneralError(format!("{e}")))?;
222266
new_stakes.insert(pool_id.to_string(), stake_pool.stake);
223267
}
268+
// Clean useless old stake distributions if needed.
269+
if epoch > STAKE_POOL_PRUNE_EPOCH_THRESHOLD {
270+
let _ = DeleteStakePoolProvider::new(connection)
271+
.prune(epoch - STAKE_POOL_PRUNE_EPOCH_THRESHOLD)
272+
.map_err(AdapterError::InitializationError)?;
273+
}
224274
connection
225275
.execute("commit transaction")
226276
.map_err(|e| AdapterError::QueryError(e.into()))?;
@@ -249,6 +299,8 @@ impl StakeStorer for StakePoolRepository {
249299

250300
#[cfg(test)]
251301
mod tests {
302+
use crate::database::migration::get_migrations;
303+
252304
use super::*;
253305

254306
#[test]
@@ -293,4 +345,127 @@ mod tests {
293345
params
294346
);
295347
}
348+
349+
#[test]
350+
fn prune() {
351+
let connection = Connection::open(":memory:").unwrap();
352+
let provider = DeleteStakePoolProvider::new(&connection);
353+
let condition = provider.get_prune_condition(Epoch(5));
354+
let (condition, params) = condition.expand();
355+
356+
assert_eq!("epoch < ?1".to_string(), condition);
357+
assert_eq!(vec![Value::Integer(5)], params);
358+
}
359+
360+
fn setup_db(connection: &Connection) -> Result<(), StdError> {
361+
let migrations = get_migrations();
362+
let migration =
363+
migrations
364+
.iter()
365+
.find(|&m| m.version == 1)
366+
.ok_or_else(|| -> StdError {
367+
"There should be a migration version 1".to_string().into()
368+
})?;
369+
let query = {
370+
// leverage the expanded parameter from this provider which is unit
371+
// tested on its own above.
372+
let update_provider = UpdateStakePoolProvider::new(connection);
373+
let (sql_values, _) = update_provider
374+
.get_update_condition("pool_id", Epoch(1), 1000)
375+
.expand();
376+
377+
connection.execute(&migration.alterations)?;
378+
379+
format!("insert into stake_pool {sql_values}")
380+
};
381+
let stake_distribution: &[(&str, i64, i64); 9] = &[
382+
("pool1", 1, 1000),
383+
("pool2", 1, 1100),
384+
("pool3", 1, 1300),
385+
("pool1", 2, 1230),
386+
("pool2", 2, 1090),
387+
("pool3", 2, 1300),
388+
("pool1", 3, 1250),
389+
("pool2", 3, 1370),
390+
("pool3", 3, 1300),
391+
];
392+
for (pool_id, epoch, stake) in stake_distribution {
393+
let mut statement = connection.prepare(&query)?;
394+
395+
statement.bind(1, *pool_id).unwrap();
396+
statement.bind(2, *epoch).unwrap();
397+
statement.bind(3, *stake).unwrap();
398+
statement.next().unwrap();
399+
}
400+
401+
Ok(())
402+
}
403+
404+
#[test]
405+
fn test_get_stake_pools() {
406+
let connection = Connection::open(":memory:").unwrap();
407+
setup_db(&connection).unwrap();
408+
409+
let provider = StakePoolProvider::new(&connection);
410+
let mut cursor = provider.get_by_epoch(&Epoch(1)).unwrap();
411+
412+
let stake_pool = cursor.next().expect("Should have a stake pool 'pool1'.");
413+
assert_eq!("pool3".to_string(), stake_pool.stake_pool_id);
414+
assert_eq!(Epoch(1), stake_pool.epoch);
415+
assert_eq!(1300, stake_pool.stake);
416+
assert_eq!(2, cursor.count());
417+
418+
let mut cursor = provider.get_by_epoch(&Epoch(3)).unwrap();
419+
420+
let stake_pool = cursor.next().expect("Should have a stake pool 'pool2'.");
421+
assert_eq!("pool2".to_string(), stake_pool.stake_pool_id);
422+
assert_eq!(Epoch(3), stake_pool.epoch);
423+
assert_eq!(1370, stake_pool.stake);
424+
assert_eq!(2, cursor.count());
425+
426+
let cursor = provider.get_by_epoch(&Epoch(5)).unwrap();
427+
assert_eq!(0, cursor.count());
428+
}
429+
430+
#[test]
431+
fn test_update_stakes() {
432+
let connection = Connection::open(":memory:").unwrap();
433+
setup_db(&connection).unwrap();
434+
435+
let provider = UpdateStakePoolProvider::new(&connection);
436+
let stake_pool = provider.persist("pool4", Epoch(3), 9999).unwrap();
437+
438+
assert_eq!("pool4".to_string(), stake_pool.stake_pool_id);
439+
assert_eq!(Epoch(3), stake_pool.epoch);
440+
assert_eq!(9999, stake_pool.stake);
441+
442+
let provider = StakePoolProvider::new(&connection);
443+
let mut cursor = provider.get_by_epoch(&Epoch(3)).unwrap();
444+
let stake_pool = cursor.next().expect("Should have a stake pool 'pool4'.");
445+
446+
assert_eq!("pool4".to_string(), stake_pool.stake_pool_id);
447+
assert_eq!(Epoch(3), stake_pool.epoch);
448+
assert_eq!(9999, stake_pool.stake);
449+
assert_eq!(3, cursor.count());
450+
}
451+
452+
#[test]
453+
fn test_prune() {
454+
let connection = Connection::open(":memory:").unwrap();
455+
setup_db(&connection).unwrap();
456+
457+
let provider = DeleteStakePoolProvider::new(&connection);
458+
let cursor = provider.prune(Epoch(2)).unwrap();
459+
460+
assert_eq!(3, cursor.count());
461+
462+
let provider = StakePoolProvider::new(&connection);
463+
let cursor = provider.get_by_epoch(&Epoch(1)).unwrap();
464+
465+
assert_eq!(0, cursor.count());
466+
467+
let cursor = provider.get_by_epoch(&Epoch(2)).unwrap();
468+
469+
assert_eq!(3, cursor.count());
470+
}
296471
}

mithril-common/src/entities/epoch.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,15 +99,15 @@ impl Sub for Epoch {
9999
type Output = Self;
100100

101101
fn sub(self, rhs: Self) -> Self::Output {
102-
Self(self.0 - rhs.0)
102+
Self(self.0.saturating_sub(rhs.0))
103103
}
104104
}
105105

106106
impl Sub<u64> for Epoch {
107107
type Output = Self;
108108

109109
fn sub(self, rhs: u64) -> Self::Output {
110-
Self(self.0 - rhs)
110+
Self(self.0.saturating_sub(rhs))
111111
}
112112
}
113113

@@ -181,9 +181,16 @@ mod tests {
181181
assert_eq!(Epoch(8), epoch);
182182
}
183183

184+
#[test]
185+
fn saturating_sub() {
186+
assert_eq!(Epoch(0), Epoch(1) - Epoch(5));
187+
assert_eq!(Epoch(0), Epoch(1) - 5_u64);
188+
}
189+
184190
#[test]
185191
fn test_previous() {
186192
assert_eq!(Epoch(2), Epoch(3).previous().unwrap());
193+
assert!(Epoch(0).previous().is_err());
187194
}
188195

189196
#[test]

mithril-common/src/sqlite/source_alias.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,11 @@ mod tests {
3131
#[test]
3232
fn simple_source_alias() {
3333
let source_alias = SourceAlias::new(&[("first", "one"), ("second", "two")]);
34-
let target = source_alias
35-
.get_iterator()
36-
.map(|(name, alias)| format!("{name} => {alias}"))
37-
.collect::<Vec<String>>()
38-
.join(", ");
34+
let mut fields = "first.one, second.two".to_string();
3935

40-
assert_eq!("first => one, second => two".to_string(), target);
36+
for (alias, source) in source_alias.get_iterator() {
37+
fields = fields.replace(alias, source);
38+
}
39+
assert_eq!("one.one, two.two".to_string(), fields);
4140
}
4241
}

0 commit comments

Comments
 (0)