Skip to content

Commit 6408d5c

Browse files
authored
Merge pull request #2375 from input-output-hk/dlachaume/2364/avoid-database-vacuum-blocking-aggregator-api
Feat: move database vacuum to aggregator startup to avoid API interruptions during epoch transitions
2 parents 53e6d45 + 8730458 commit 6408d5c

File tree

7 files changed

+396
-30
lines changed

7 files changed

+396
-30
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-aggregator"
3-
version = "0.7.16"
3+
version = "0.7.17"
44
description = "A Mithril Aggregator server"
55
authors = { workspace = true }
66
edition = { workspace = true }

mithril-aggregator/src/commands/database_command.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,16 @@ impl DatabaseCommand {
2626
pub enum DatabaseSubCommand {
2727
/// Migrate databases located in the given stores directory
2828
Migrate(MigrateCommand),
29+
30+
/// Vacuum the aggregator main database
31+
Vacuum(VacuumCommand),
2932
}
3033

3134
impl DatabaseSubCommand {
3235
pub async fn execute(&self, root_logger: Logger) -> StdResult<()> {
3336
match self {
3437
Self::Migrate(cmd) => cmd.execute(root_logger).await,
38+
Self::Vacuum(cmd) => cmd.execute(root_logger).await,
3539
}
3640
}
3741
}
@@ -79,3 +83,38 @@ impl MigrateCommand {
7983
Ok(())
8084
}
8185
}
86+
87+
#[derive(Parser, Debug, Clone)]
88+
pub struct VacuumCommand {
89+
/// Stores directory
90+
#[clap(long, env = "STORES_DIRECTORY")]
91+
stores_directory: PathBuf,
92+
}
93+
94+
impl VacuumCommand {
95+
pub async fn execute(&self, root_logger: Logger) -> StdResult<()> {
96+
let config = Configuration {
97+
environment: ExecutionEnvironment::Production,
98+
data_stores_directory: self.stores_directory.clone(),
99+
// Temporary solution to avoid the need to provide a full configuration
100+
..Configuration::new_sample(std::env::temp_dir())
101+
};
102+
debug!(root_logger, "DATABASE VACUUM command"; "config" => format!("{config:?}"));
103+
println!(
104+
"Vacuuming database from stores directory: {}",
105+
self.stores_directory.to_string_lossy()
106+
);
107+
let mut dependencies_builder =
108+
DependenciesBuilder::new(root_logger.clone(), config.clone());
109+
110+
dependencies_builder
111+
.get_upkeep_service()
112+
.await
113+
.with_context(|| "Dependencies Builder can not get upkeep service")?
114+
.vacuum()
115+
.await
116+
.with_context(|| "Upkeep service can not vacuum")?;
117+
118+
Ok(())
119+
}
120+
}

mithril-aggregator/src/commands/serve_command.rs

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
1+
use std::{
2+
net::IpAddr,
3+
path::{Path, PathBuf},
4+
time::Duration,
5+
};
6+
17
use anyhow::{anyhow, Context};
8+
use chrono::TimeDelta;
29
use clap::Parser;
310
use config::{builder::DefaultState, ConfigBuilder, Map, Source, Value, ValueKind};
4-
use mithril_common::StdResult;
5-
use mithril_metric::MetricsServer;
611
use slog::{crit, debug, info, warn, Logger};
7-
use std::time::Duration;
8-
use std::{net::IpAddr, path::PathBuf};
912
use tokio::{sync::oneshot, task::JoinSet};
1013

11-
use crate::{dependency_injection::DependenciesBuilder, Configuration};
14+
use mithril_common::StdResult;
15+
use mithril_metric::MetricsServer;
16+
17+
use crate::{dependency_injection::DependenciesBuilder, tools::VacuumTracker, Configuration};
18+
19+
const VACUUM_MINIMUM_INTERVAL: TimeDelta = TimeDelta::weeks(1);
1220

1321
/// Server runtime mode
1422
#[derive(Parser, Debug, Clone)]
@@ -170,6 +178,15 @@ impl ServeCommand {
170178
.with_context(|| "Dependencies Builder can not create event store")?;
171179
let event_store_thread = tokio::spawn(async move { event_store.run().await.unwrap() });
172180

181+
// start the database vacuum operation, if needed
182+
self.perform_database_vacuum_if_needed(
183+
&config.data_stores_directory,
184+
&mut dependencies_builder,
185+
VACUUM_MINIMUM_INTERVAL,
186+
&root_logger,
187+
)
188+
.await?;
189+
173190
// start the aggregator runtime
174191
let mut runtime = dependencies_builder
175192
.create_aggregator_runner()
@@ -297,4 +314,51 @@ impl ServeCommand {
297314

298315
Ok(())
299316
}
317+
318+
/// This function checks if a database vacuum is needed and performs it if necessary.
319+
///
320+
/// Errors from [VacuumTracker] operations are logged but not propagated as errors.
321+
async fn perform_database_vacuum_if_needed(
322+
&self,
323+
store_dir: &Path,
324+
dependencies_builder: &mut DependenciesBuilder,
325+
vacuum_min_interval: TimeDelta,
326+
logger: &Logger,
327+
) -> StdResult<()> {
328+
let vacuum_tracker = VacuumTracker::new(store_dir, vacuum_min_interval, logger.clone());
329+
match vacuum_tracker.check_vacuum_needed() {
330+
Ok((true, _)) => {
331+
info!(logger, "Performing vacuum");
332+
333+
let upkeep = dependencies_builder
334+
.get_upkeep_service()
335+
.await
336+
.with_context(|| "Dependencies Builder can not create upkeep")?;
337+
338+
upkeep
339+
.vacuum()
340+
.await
341+
.with_context(|| "Upkeep service failed to vacuum database")?;
342+
343+
match vacuum_tracker.update_last_vacuum_time() {
344+
Ok(last_vacuum) => {
345+
info!(logger, "Vacuum performed"; "last_vacuum" => last_vacuum.to_rfc3339());
346+
}
347+
Err(e) => {
348+
warn!(logger, "Failed to update last vacuum time"; "error" => ?e);
349+
}
350+
}
351+
}
352+
Ok((false, last_vacuum)) => {
353+
let time_display =
354+
last_vacuum.map_or_else(|| "never".to_string(), |time| time.to_rfc3339());
355+
info!(logger, "No vacuum needed"; "last_vacuum" => time_display);
356+
}
357+
Err(e) => {
358+
warn!(logger, "Failed to check if vacuum is needed"; "error" => ?e);
359+
}
360+
}
361+
362+
Ok(())
363+
}
300364
}

mithril-aggregator/src/services/upkeep.rs

Lines changed: 108 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ use slog::{info, Logger};
2424
pub trait UpkeepService: Send + Sync {
2525
/// Run the upkeep service.
2626
async fn run(&self, epoch: Epoch) -> StdResult<()>;
27+
28+
/// Vacuum database.
29+
async fn vacuum(&self) -> StdResult<()>;
2730
}
2831

2932
/// Define the task responsible for pruning a datasource below a certain epoch threshold.
@@ -101,10 +104,7 @@ impl AggregatorUpkeepService {
101104
info!(db_upkeep_logger, "Cleaning main database");
102105
SqliteCleaner::new(&main_db_connection)
103106
.with_logger(db_upkeep_logger.clone())
104-
.with_tasks(&[
105-
SqliteCleaningTask::Vacuum,
106-
SqliteCleaningTask::WalCheckpointTruncate,
107-
])
107+
.with_tasks(&[SqliteCleaningTask::WalCheckpointTruncate])
108108
.run()?;
109109

110110
info!(db_upkeep_logger, "Cleaning cardano transactions database");
@@ -127,6 +127,34 @@ impl AggregatorUpkeepService {
127127
.await
128128
.with_context(|| "Database Upkeep thread crashed")?
129129
}
130+
131+
async fn vacuum_main_database(&self) -> StdResult<()> {
132+
if self.signed_entity_type_lock.has_locked_entities().await {
133+
info!(
134+
self.logger,
135+
"Some entities are locked - Skipping main database vacuum"
136+
);
137+
return Ok(());
138+
}
139+
140+
let main_db_connection = self.main_db_connection.clone();
141+
let db_upkeep_logger = self.logger.clone();
142+
143+
// Run the database upkeep tasks in another thread to avoid blocking the tokio runtime
144+
let db_upkeep_thread = tokio::task::spawn_blocking(move || -> StdResult<()> {
145+
info!(db_upkeep_logger, "Vacuum main database");
146+
SqliteCleaner::new(&main_db_connection)
147+
.with_logger(db_upkeep_logger.clone())
148+
.with_tasks(&[SqliteCleaningTask::Vacuum])
149+
.run()?;
150+
151+
Ok(())
152+
});
153+
154+
db_upkeep_thread
155+
.await
156+
.with_context(|| "Database Upkeep thread crashed")?
157+
}
130158
}
131159

132160
#[async_trait]
@@ -145,12 +173,23 @@ impl UpkeepService for AggregatorUpkeepService {
145173
info!(self.logger, "Upkeep finished");
146174
Ok(())
147175
}
176+
177+
async fn vacuum(&self) -> StdResult<()> {
178+
info!(self.logger, "Start database vacuum");
179+
180+
self.vacuum_main_database()
181+
.await
182+
.with_context(|| "Vacuuming main database failed")?;
183+
184+
info!(self.logger, "Vacuum finished");
185+
186+
Ok(())
187+
}
148188
}
149189

150190
#[cfg(test)]
151191
mod tests {
152-
use mithril_common::entities::SignedEntityTypeDiscriminants;
153-
use mithril_common::test_utils::TempDir;
192+
use mithril_common::{entities::SignedEntityTypeDiscriminants, temp_dir_create};
154193
use mockall::predicate::eq;
155194

156195
use crate::database::test_helper::{
@@ -187,7 +226,7 @@ mod tests {
187226
#[tokio::test]
188227
async fn test_cleanup_database() {
189228
let (main_db_path, ctx_db_path, event_store_db_path, log_path) = {
190-
let db_dir = TempDir::create("aggregator_upkeep", "test_cleanup_database");
229+
let db_dir = temp_dir_create!();
191230
(
192231
db_dir.join("main.db"),
193232
db_dir.join("cardano_tx.db"),
@@ -218,27 +257,23 @@ mod tests {
218257

219258
let logs = std::fs::read_to_string(&log_path).unwrap();
220259

221-
assert_eq!(
222-
logs.matches(SqliteCleaningTask::Vacuum.log_message())
223-
.count(),
224-
1,
225-
"Should have run only once since only the main database has a `Vacuum` cleanup"
226-
);
227260
assert_eq!(
228261
logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message())
229262
.count(),
230263
3,
231264
"Should have run three times since the three databases have a `WalCheckpointTruncate` cleanup"
232265
);
266+
assert_eq!(
267+
logs.matches(SqliteCleaningTask::Vacuum.log_message())
268+
.count(),
269+
0,
270+
"Upkeep operation should not include Vacuum tasks"
271+
);
233272
}
234273

235274
#[tokio::test]
236275
async fn test_doesnt_cleanup_db_if_any_entity_is_locked() {
237-
let log_path = TempDir::create(
238-
"aggregator_upkeep",
239-
"test_doesnt_cleanup_db_if_any_entity_is_locked",
240-
)
241-
.join("upkeep.log");
276+
let log_path = temp_dir_create!().join("upkeep.log");
242277

243278
let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
244279
signed_entity_type_lock
@@ -257,11 +292,6 @@ mod tests {
257292

258293
let logs = std::fs::read_to_string(&log_path).unwrap();
259294

260-
assert_eq!(
261-
logs.matches(SqliteCleaningTask::Vacuum.log_message())
262-
.count(),
263-
0,
264-
);
265295
assert_eq!(
266296
logs.matches(SqliteCleaningTask::WalCheckpointTruncate.log_message())
267297
.count(),
@@ -290,4 +320,59 @@ mod tests {
290320

291321
service.run(Epoch(14)).await.expect("Upkeep service failed");
292322
}
323+
324+
#[tokio::test]
325+
async fn test_doesnt_vacuum_db_if_any_entity_is_locked() {
326+
let log_path = temp_dir_create!().join("vacuum.log");
327+
328+
let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
329+
signed_entity_type_lock
330+
.lock(SignedEntityTypeDiscriminants::CardanoTransactions)
331+
.await;
332+
333+
// Separate block to force log flushing by dropping the service that owns the logger
334+
{
335+
let service = AggregatorUpkeepService {
336+
signed_entity_type_lock: signed_entity_type_lock.clone(),
337+
logger: TestLogger::file(&log_path),
338+
..default_upkeep_service()
339+
};
340+
service.vacuum().await.expect("Vacuum failed");
341+
}
342+
343+
let logs = std::fs::read_to_string(&log_path).unwrap();
344+
345+
assert_eq!(
346+
logs.matches(SqliteCleaningTask::Vacuum.log_message())
347+
.count(),
348+
0,
349+
);
350+
}
351+
352+
#[tokio::test]
353+
async fn test_vacuum_database() {
354+
let log_path = temp_dir_create!().join("vacuum.log");
355+
356+
let signed_entity_type_lock = Arc::new(SignedEntityTypeLock::default());
357+
signed_entity_type_lock
358+
.lock(SignedEntityTypeDiscriminants::CardanoTransactions)
359+
.await;
360+
361+
// Separate block to force log flushing by dropping the service that owns the logger
362+
{
363+
let service = AggregatorUpkeepService {
364+
logger: TestLogger::file(&log_path),
365+
..default_upkeep_service()
366+
};
367+
service.vacuum().await.expect("Vacuum failed");
368+
}
369+
370+
let logs = std::fs::read_to_string(&log_path).unwrap();
371+
372+
assert_eq!(
373+
logs.matches(SqliteCleaningTask::Vacuum.log_message())
374+
.count(),
375+
1,
376+
);
377+
}
293378
}

mithril-aggregator/src/tools/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub mod mocks;
99
mod signer_importer;
1010
mod single_signature_authenticator;
1111
pub mod url_sanitizer;
12+
mod vacuum_tracker;
1213

1314
pub use certificates_hash_migrator::CertificatesHashMigrator;
1415
pub use digest_helpers::extract_digest_from_path;
@@ -18,6 +19,7 @@ pub use signer_importer::{
1819
CExplorerSignerRetriever, SignersImporter, SignersImporterPersister, SignersImporterRetriever,
1920
};
2021
pub use single_signature_authenticator::*;
22+
pub use vacuum_tracker::VacuumTracker;
2123

2224
/// Downcast the error to the specified error type and check if the error satisfies the condition.
2325
pub(crate) fn downcast_check<E>(

0 commit comments

Comments
 (0)