Skip to content

Commit 87b20ba

Browse files
committed
feat(aggregator): add vacuum dedicated method to UpkeepService
1 parent f03de22 commit 87b20ba

File tree

1 file changed

+101
-8
lines changed

1 file changed

+101
-8
lines changed

mithril-aggregator/src/services/upkeep.rs

Lines changed: 101 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ use slog::{info, Logger};
2424
pub trait UpkeepService: Send + Sync {
2525
/// Run the upkeep service.
2626
async fn run(&self, epoch: Epoch) -> StdResult<()>;
27+
28+
/// Vacuum database.
29+
async fn vacuum(&self) -> StdResult<()>;
2730
}
2831

2932
/// Define the task responsible for pruning a datasource below a certain epoch threshold.
@@ -124,6 +127,34 @@ impl AggregatorUpkeepService {
124127
.await
125128
.with_context(|| "Database Upkeep thread crashed")?
126129
}
130+
131+
async fn vacuum_main_database(&self) -> StdResult<()> {
132+
if self.signed_entity_type_lock.has_locked_entities().await {
133+
info!(
134+
self.logger,
135+
"Some entities are locked - Skipping main database vacuum"
136+
);
137+
return Ok(());
138+
}
139+
140+
let main_db_connection = self.main_db_connection.clone();
141+
let db_upkeep_logger = self.logger.clone();
142+
143+
// Run the database upkeep tasks in another thread to avoid blocking the tokio runtime
144+
let db_upkeep_thread = tokio::task::spawn_blocking(move || -> StdResult<()> {
145+
info!(db_upkeep_logger, "Vacuum main database");
146+
SqliteCleaner::new(&main_db_connection)
147+
.with_logger(db_upkeep_logger.clone())
148+
.with_tasks(&[SqliteCleaningTask::Vacuum])
149+
.run()?;
150+
151+
Ok(())
152+
});
153+
154+
db_upkeep_thread
155+
.await
156+
.with_context(|| "Database Upkeep thread crashed")?
157+
}
127158
}
128159

129160
#[async_trait]
@@ -142,12 +173,23 @@ impl UpkeepService for AggregatorUpkeepService {
142173
info!(self.logger, "Upkeep finished");
143174
Ok(())
144175
}
176+
177+
async fn vacuum(&self) -> StdResult<()> {
178+
info!(self.logger, "Start database vacuum");
179+
180+
self.vacuum_main_database()
181+
.await
182+
.with_context(|| "Vacuuming main database failed")?;
183+
184+
info!(self.logger, "Vacuum finished");
185+
186+
Ok(())
187+
}
145188
}
146189

147190
#[cfg(test)]
148191
mod tests {
149-
use mithril_common::entities::SignedEntityTypeDiscriminants;
150-
use mithril_common::test_utils::TempDir;
192+
use mithril_common::{entities::SignedEntityTypeDiscriminants, temp_dir_create};
151193
use mockall::predicate::eq;
152194

153195
use crate::database::test_helper::{
@@ -184,7 +226,7 @@ mod tests {
184226
#[tokio::test]
185227
async fn test_cleanup_database() {
186228
let (main_db_path, ctx_db_path, event_store_db_path, log_path) = {
187-
let db_dir = TempDir::create("aggregator_upkeep", "test_cleanup_database");
229+
let db_dir = temp_dir_create!();
188230
(
189231
db_dir.join("main.db"),
190232
db_dir.join("cardano_tx.db"),
@@ -231,11 +273,7 @@ mod tests {
231273

232274
#[tokio::test]
233275
async fn test_doesnt_cleanup_db_if_any_entity_is_locked() {
234-
let log_path = TempDir::create(
235-
"aggregator_upkeep",
236-
"test_doesnt_cleanup_db_if_any_entity_is_locked",
237-
)
238-
.join("upkeep.log");
276+
let log_path = temp_dir_create!().join("upkeep.log");
239277

240278
let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
241279
signed_entity_type_lock
@@ -282,4 +320,59 @@ mod tests {
282320

283321
service.run(Epoch(14)).await.expect("Upkeep service failed");
284322
}
323+
324+
#[tokio::test]
325+
async fn test_doesnt_vacuum_db_if_any_entity_is_locked() {
326+
let log_path = temp_dir_create!().join("vacuum.log");
327+
328+
let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
329+
signed_entity_type_lock
330+
.lock(SignedEntityTypeDiscriminants::CardanoTransactions)
331+
.await;
332+
333+
// Separate block to force log flushing by dropping the service that owns the logger
334+
{
335+
let service = AggregatorUpkeepService {
336+
signed_entity_type_lock: signed_entity_type_lock.clone(),
337+
logger: TestLogger::file(&log_path),
338+
..default_upkeep_service()
339+
};
340+
service.vacuum().await.expect("Vacuum failed");
341+
}
342+
343+
let logs = std::fs::read_to_string(&log_path).unwrap();
344+
345+
assert_eq!(
346+
logs.matches(SqliteCleaningTask::Vacuum.log_message())
347+
.count(),
348+
0,
349+
);
350+
}
351+
352+
#[tokio::test]
353+
async fn test_vacuum_database() {
354+
let log_path = temp_dir_create!().join("vacuum.log");
355+
356+
let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
357+
signed_entity_type_lock
358+
.lock(SignedEntityTypeDiscriminants::CardanoTransactions)
359+
.await;
360+
361+
// Separate block to force log flushing by dropping the service that owns the logger
362+
{
363+
let service = AggregatorUpkeepService {
364+
logger: TestLogger::file(&log_path),
365+
..default_upkeep_service()
366+
};
367+
service.vacuum().await.expect("Vacuum failed");
368+
}
369+
370+
let logs = std::fs::read_to_string(&log_path).unwrap();
371+
372+
assert_eq!(
373+
logs.matches(SqliteCleaningTask::Vacuum.log_message())
374+
.count(),
375+
1,
376+
);
377+
}
285378
}

0 commit comments

Comments
 (0)