Skip to content

Commit 5741602

Browse files
committed
add stake pool autoprune
1 parent f8dbd4e commit 5741602

File tree

5 files changed

+79
-18
lines changed

5 files changed

+79
-18
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/src/command_args.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use mithril_common::{
3434
};
3535

3636
use crate::{
37-
database::provider::StakePoolRepository,
37+
database::provider::StakePoolStore,
3838
event_store::{self, TransmitterService},
3939
http_server::routes::router,
4040
tools::{EraTools, GenesisTools, GenesisToolsDependency},
@@ -90,9 +90,9 @@ fn setup_genesis_dependencies(
9090
)?),
9191
config.store_retention_limit,
9292
));
93-
let stake_store = Arc::new(StakePoolRepository::new(Arc::new(Mutex::new(
94-
Connection::open(sqlite_db_path.clone().unwrap())?,
95-
))));
93+
let stake_store = Arc::new(StakePoolStore::new(Arc::new(Mutex::new(Connection::open(
94+
sqlite_db_path.clone().unwrap(),
95+
)?))));
9696
let single_signature_store = Arc::new(SingleSignatureStore::new(
9797
Box::new(SQLiteAdapter::new("single_signature", sqlite_db_path)?),
9898
config.store_retention_limit,
@@ -359,9 +359,9 @@ impl ServeCommand {
359359
)?),
360360
config.store_retention_limit,
361361
));
362-
let stake_store = Arc::new(StakePoolRepository::new(Arc::new(Mutex::new(
363-
Connection::open(sqlite_db_path.clone().unwrap())?,
364-
))));
362+
let stake_store = Arc::new(StakePoolStore::new(Arc::new(Mutex::new(Connection::open(
363+
sqlite_db_path.clone().unwrap(),
364+
)?))));
365365
let single_signature_store = Arc::new(SingleSignatureStore::new(
366366
Box::new(SQLiteAdapter::new(
367367
"single_signature",

mithril-aggregator/src/database/provider/stake_pool.rs

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ use mithril_common::{
1818

1919
use mithril_common::StdError;
2020

21+
/// Delete stake pools for Epoch older than this.
22+
const STAKE_POOL_PRUNE_EPOCH_THRESHOLD: u64 = 2;
23+
2124
/// Stake pool as read from Chain.
2225
/// TODO remove this compile directive ↓
2326
#[allow(dead_code)]
@@ -186,20 +189,62 @@ impl<'conn> Provider<'conn> for UpdateStakePoolProvider<'conn> {
186189
}
187190
}
188191

192+
/// Provider to remove old data from the stake_pool table
193+
pub struct DeleteStakePoolProvider<'conn> {
194+
connection: &'conn Connection,
195+
}
196+
197+
impl<'conn> Provider<'conn> for DeleteStakePoolProvider<'conn> {
198+
type Entity = StakePool;
199+
200+
fn get_connection(&'conn self) -> &'conn Connection {
201+
self.connection
202+
}
203+
204+
fn get_definition(&self, condition: &str) -> String {
205+
// it is important to alias the fields with the same name as the table
206+
// since the table cannot be aliased in a RETURNING statement in SQLite.
207+
let projection = Self::Entity::get_projection()
208+
.expand(SourceAlias::new(&[("{:stake_pool:}", "stake_pool")]));
209+
210+
format!("delete from stake_pool where {condition} returning {projection}")
211+
}
212+
}
213+
214+
impl<'conn> DeleteStakePoolProvider<'conn> {
215+
/// Create a new instance
216+
pub fn new(connection: &'conn Connection) -> Self {
217+
Self { connection }
218+
}
219+
220+
/// Create the SQL condition to prune data older than the given Epoch.
221+
fn get_prune_condition(&self, epoch_threshold: Epoch) -> WhereCondition {
222+
let epoch_value = Value::Integer(i64::try_from(epoch_threshold.0).unwrap());
223+
224+
WhereCondition::new("epoch < ?*", vec![epoch_value])
225+
}
226+
227+
/// Prune the stake pools data older than the given epoch.
228+
pub fn prune(&self, epoch_threshold: Epoch) -> Result<EntityCursor<StakePool>, StdError> {
229+
let filters = self.get_prune_condition(epoch_threshold);
230+
231+
self.find(filters)
232+
}
233+
}
189234
/// Service to deal with stake pools (read & write).
190-
pub struct StakePoolRepository {
235+
pub struct StakePoolStore {
191236
connection: Arc<Mutex<Connection>>,
192237
}
193238

194-
impl StakePoolRepository {
239+
impl StakePoolStore {
195240
/// Create a new StakePool service
196241
pub fn new(connection: Arc<Mutex<Connection>>) -> Self {
197242
Self { connection }
198243
}
199244
}
200245

201246
#[async_trait]
202-
impl StakeStorer for StakePoolRepository {
247+
impl StakeStorer for StakePoolStore {
203248
async fn save_stakes(
204249
&self,
205250
epoch: Epoch,
@@ -221,6 +266,12 @@ impl StakeStorer for StakePoolRepository {
221266
.map_err(|e| AdapterError::GeneralError(format!("{e}")))?;
222267
new_stakes.insert(pool_id.to_string(), stake_pool.stake);
223268
}
269+
// Clean useless old stake distributions if needed.
270+
if epoch.0 > STAKE_POOL_PRUNE_EPOCH_THRESHOLD {
271+
let _ = DeleteStakePoolProvider::new(connection)
272+
.prune(Epoch(epoch.0 - STAKE_POOL_PRUNE_EPOCH_THRESHOLD))
273+
.map_err(AdapterError::InitializationError)?;
274+
}
224275
connection
225276
.execute("commit transaction")
226277
.map_err(|e| AdapterError::QueryError(e.into()))?;
@@ -293,4 +344,15 @@ mod tests {
293344
params
294345
);
295346
}
347+
348+
#[test]
349+
fn prune() {
350+
let connection = Connection::open(":memory:").unwrap();
351+
let provider = DeleteStakePoolProvider::new(&connection);
352+
let condition = provider.get_prune_condition(Epoch(5));
353+
let (condition, params) = condition.expand();
354+
355+
assert_eq!("epoch < ?1".to_string(), condition);
356+
assert_eq!(vec![Value::Integer(5)], params);
357+
}
296358
}

mithril-common/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-common"
3-
version = "0.2.25"
3+
version = "0.2.26"
44
authors = { workspace = true }
55
edition = { workspace = true }
66
documentation = { workspace = true }

mithril-common/src/sqlite/source_alias.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,11 @@ mod tests {
3131
#[test]
3232
fn simple_source_alias() {
3333
let source_alias = SourceAlias::new(&[("first", "one"), ("second", "two")]);
34-
let target = source_alias
35-
.get_iterator()
36-
.map(|(name, alias)| format!("{name} => {alias}"))
37-
.collect::<Vec<String>>()
38-
.join(", ");
34+
let mut fields = "first.one, second.two".to_string();
3935

40-
assert_eq!("first => one, second => two".to_string(), target);
36+
for (alias, source) in source_alias.get_iterator() {
37+
fields = fields.replace(alias, source);
38+
}
39+
assert_eq!("one.one, two.two".to_string(), fields);
4140
}
4241
}

0 commit comments

Comments
 (0)