Skip to content

Commit 4cbe62a

Browse files
committed
feat(aggregator): add vacuum tracker to run the vacuum operation at aggregator startup with minimum interval
1 parent 87b20ba commit 4cbe62a

File tree

3 files changed

+252
-5
lines changed

3 files changed

+252
-5
lines changed

mithril-aggregator/src/commands/serve_command.rs

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
1+
use std::{
2+
net::IpAddr,
3+
path::{Path, PathBuf},
4+
time::Duration,
5+
};
6+
17
use anyhow::{anyhow, Context};
8+
use chrono::TimeDelta;
29
use clap::Parser;
310
use config::{builder::DefaultState, ConfigBuilder, Map, Source, Value, ValueKind};
4-
use mithril_common::StdResult;
5-
use mithril_metric::MetricsServer;
611
use slog::{crit, debug, info, warn, Logger};
7-
use std::time::Duration;
8-
use std::{net::IpAddr, path::PathBuf};
912
use tokio::{sync::oneshot, task::JoinSet};
1013

11-
use crate::{dependency_injection::DependenciesBuilder, Configuration};
14+
use mithril_common::StdResult;
15+
use mithril_metric::MetricsServer;
16+
17+
use crate::{dependency_injection::DependenciesBuilder, tools::VacuumTracker, Configuration};
1218

1319
/// Server runtime mode
1420
#[derive(Parser, Debug, Clone)]
@@ -170,6 +176,15 @@ impl ServeCommand {
170176
.with_context(|| "Dependencies Builder can not create event store")?;
171177
let event_store_thread = tokio::spawn(async move { event_store.run().await.unwrap() });
172178

179+
// start the database vacuum operation, if needed
180+
self.perform_database_vacuum_if_needed(
181+
&config.data_stores_directory,
182+
&mut dependencies_builder,
183+
TimeDelta::weeks(1),
184+
&root_logger,
185+
)
186+
.await?;
187+
173188
// start the aggregator runtime
174189
let mut runtime = dependencies_builder
175190
.create_aggregator_runner()
@@ -297,4 +312,51 @@ impl ServeCommand {
297312

298313
Ok(())
299314
}
315+
316+
/// This function checks if a database vacuum is needed and performs it if necessary.
317+
///
318+
/// Errors from [VacuumTracker] operations are logged but not propagated as errors.
319+
async fn perform_database_vacuum_if_needed(
320+
&self,
321+
store_dir: &Path,
322+
dependencies_builder: &mut DependenciesBuilder,
323+
vacuum_min_interval: TimeDelta,
324+
logger: &Logger,
325+
) -> StdResult<()> {
326+
let vacuum_tracker = VacuumTracker::new(store_dir, vacuum_min_interval, logger.clone());
327+
match vacuum_tracker.check_vacuum_needed() {
328+
Ok((true, _)) => {
329+
info!(logger, "Performing vacuum");
330+
331+
let upkeep = dependencies_builder
332+
.get_upkeep_service()
333+
.await
334+
.with_context(|| "Dependencies Builder can not create upkeep")?;
335+
336+
upkeep
337+
.vacuum()
338+
.await
339+
.with_context(|| "Upkeep service failed to vacuum database")?;
340+
341+
match vacuum_tracker.update_last_vacuum_time() {
342+
Ok(last_vacuum) => {
343+
info!(logger, "Vacuum performed"; "last_vacuum" => last_vacuum.to_rfc3339());
344+
}
345+
Err(e) => {
346+
warn!(logger, "Failed to update last vacuum time"; "error" => ?e);
347+
}
348+
}
349+
}
350+
Ok((false, last_vacuum)) => {
351+
let time_display =
352+
last_vacuum.map_or_else(|| "never".to_string(), |time| time.to_rfc3339());
353+
info!(logger, "No vacuum needed"; "last_vacuum" => time_display);
354+
}
355+
Err(e) => {
356+
warn!(logger, "Failed to check if vacuum is needed"; "error" => ?e);
357+
}
358+
}
359+
360+
Ok(())
361+
}
300362
}

mithril-aggregator/src/tools/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub mod mocks;
99
mod signer_importer;
1010
mod single_signature_authenticator;
1111
pub mod url_sanitizer;
12+
mod vacuum_tracker;
1213

1314
pub use certificates_hash_migrator::CertificatesHashMigrator;
1415
pub use digest_helpers::extract_digest_from_path;
@@ -18,6 +19,7 @@ pub use signer_importer::{
1819
CExplorerSignerRetriever, SignersImporter, SignersImporterPersister, SignersImporterRetriever,
1920
};
2021
pub use single_signature_authenticator::*;
22+
pub use vacuum_tracker::VacuumTracker;
2123

2224
/// Downcast the error to the specified error type and check if the error satisfies the condition.
2325
pub(crate) fn downcast_check<E>(
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
use std::{
2+
fs,
3+
path::{Path, PathBuf},
4+
};
5+
6+
use anyhow::Context;
7+
use chrono::{DateTime, TimeDelta, Utc};
8+
use slog::{debug, info, Logger};
9+
10+
use mithril_common::StdResult;
11+
12+
const LAST_VACUUM_FILENAME: &str = "last_vacuum_time";
13+
14+
type LastVacuumTime = DateTime<Utc>;
15+
16+
/// Helper to track when vacuum was last performed
17+
#[derive(Debug, Clone)]
18+
pub struct VacuumTracker {
19+
tracker_file: PathBuf,
20+
min_interval: TimeDelta,
21+
logger: Logger,
22+
}
23+
24+
impl VacuumTracker {
25+
/// Create a new [VacuumTracker] for the given store directory
26+
pub fn new(store_dir: &Path, interval: TimeDelta, logger: Logger) -> Self {
27+
let last_vacuum_file = store_dir.join(LAST_VACUUM_FILENAME);
28+
29+
Self {
30+
tracker_file: last_vacuum_file,
31+
min_interval: interval,
32+
logger,
33+
}
34+
}
35+
36+
/// Check if enough time has passed since last vacuum (returning the last vacuum timestamp)
37+
pub fn check_vacuum_needed(&self) -> StdResult<(bool, Option<LastVacuumTime>)> {
38+
if !self.tracker_file.exists() {
39+
debug!(
40+
self.logger,
41+
"No previous vacuum timestamp found, vacuum can be performed"
42+
);
43+
return Ok((true, None));
44+
}
45+
46+
let last_vacuum = fs::read_to_string(&self.tracker_file).with_context(|| {
47+
format!(
48+
"Failed to read vacuum timestamp file: {:?}",
49+
self.tracker_file
50+
)
51+
})?;
52+
let last_vacuum = DateTime::parse_from_rfc3339(&last_vacuum)?.with_timezone(&Utc);
53+
54+
let duration_since_last = Utc::now() - (last_vacuum);
55+
56+
let should_vacuum = duration_since_last >= self.min_interval;
57+
58+
if should_vacuum {
59+
info!(
60+
self.logger,
61+
"Sufficient time has passed since last vacuum";
62+
"last_vacuum" => last_vacuum.to_string(),
63+
"elapsed_days" => duration_since_last.num_days(),
64+
"min_interval_days" => self.min_interval.num_days()
65+
);
66+
} else {
67+
info!(
68+
self.logger,
69+
"Not enough time elapsed since last vacuum";
70+
"last_vacuum" => last_vacuum.to_string(),
71+
"elapsed_days" => duration_since_last.num_days(),
72+
"min_interval_days" => self.min_interval.num_days()
73+
);
74+
};
75+
76+
Ok((should_vacuum, Some(last_vacuum)))
77+
}
78+
79+
/// Update the last vacuum time to now
80+
pub fn update_last_vacuum_time(&self) -> StdResult<LastVacuumTime> {
81+
let timestamp = Utc::now();
82+
83+
fs::write(&self.tracker_file, timestamp.to_rfc3339()).with_context(|| {
84+
format!(
85+
"Failed to write to last vacuum time file: {:?}",
86+
self.tracker_file
87+
)
88+
})?;
89+
90+
Ok(timestamp)
91+
}
92+
}
93+
94+
#[cfg(test)]
95+
mod tests {
96+
use std::thread::sleep;
97+
98+
use mithril_common::temp_dir_create;
99+
100+
use crate::test_tools::TestLogger;
101+
102+
use super::*;
103+
104+
const DUMMY_INTERVAL: TimeDelta = TimeDelta::milliseconds(99);
105+
106+
#[test]
107+
fn update_last_vacuum_time_creates_file_with_current_timestamp() {
108+
let tracker = VacuumTracker::new(&temp_dir_create!(), DUMMY_INTERVAL, TestLogger::stdout());
109+
110+
assert!(!tracker.tracker_file.exists());
111+
112+
let saved_timestamp = tracker.update_last_vacuum_time().unwrap();
113+
let approximative_expected_saved_timestamp = Utc::now();
114+
115+
let vacuum_file_content = fs::read_to_string(tracker.tracker_file).unwrap();
116+
let timestamp_retrieved = DateTime::parse_from_rfc3339(&vacuum_file_content).unwrap();
117+
let diff = timestamp_retrieved
118+
.signed_duration_since(approximative_expected_saved_timestamp)
119+
.num_milliseconds();
120+
assert!(diff < 1);
121+
assert_eq!(timestamp_retrieved, saved_timestamp);
122+
}
123+
124+
#[test]
125+
fn update_last_vacuum_time_overwrites_previous_timestamp() {
126+
let tracker = VacuumTracker::new(&temp_dir_create!(), DUMMY_INTERVAL, TestLogger::stdout());
127+
128+
let initial_saved_timestamp = tracker.update_last_vacuum_time().unwrap();
129+
let last_saved_timestamp = tracker.update_last_vacuum_time().unwrap();
130+
131+
let vacuum_file_content = fs::read_to_string(tracker.tracker_file).unwrap();
132+
let timestamp_retrieved = DateTime::parse_from_rfc3339(&vacuum_file_content).unwrap();
133+
assert!(last_saved_timestamp > initial_saved_timestamp);
134+
assert_eq!(timestamp_retrieved, last_saved_timestamp);
135+
}
136+
137+
#[test]
138+
fn update_last_vacuum_time_fails_on_write_error() {
139+
let dir_not_exist = Path::new("path-does-not-exist");
140+
let tracker = VacuumTracker::new(dir_not_exist, DUMMY_INTERVAL, TestLogger::stdout());
141+
142+
tracker
143+
.update_last_vacuum_time()
144+
.expect_err("Update last vacuum time should fail when error while writing to file");
145+
}
146+
147+
#[test]
148+
fn check_vacuum_needed_returns_true_when_no_previous_record() {
149+
let tracker = VacuumTracker::new(&temp_dir_create!(), DUMMY_INTERVAL, TestLogger::stdout());
150+
151+
let (is_vacuum_needed, last_timestamp) = tracker.check_vacuum_needed().unwrap();
152+
153+
assert!(is_vacuum_needed);
154+
assert!(last_timestamp.is_none());
155+
}
156+
157+
#[test]
158+
fn check_vacuum_needed_returns_true_after_interval_elapsed() {
159+
let min_interval = TimeDelta::milliseconds(10);
160+
let tracker = VacuumTracker::new(&temp_dir_create!(), min_interval, TestLogger::stdout());
161+
162+
let saved_timestamp = tracker.update_last_vacuum_time().unwrap();
163+
sleep(min_interval.to_std().unwrap());
164+
165+
let (is_vacuum_needed, last_timestamp) = tracker.check_vacuum_needed().unwrap();
166+
167+
assert!(is_vacuum_needed);
168+
assert_eq!(last_timestamp, Some(saved_timestamp));
169+
}
170+
171+
#[test]
172+
fn check_vacuum_needed_returns_false_within_interval() {
173+
let min_interval = TimeDelta::minutes(2);
174+
let tracker = VacuumTracker::new(&temp_dir_create!(), min_interval, TestLogger::stdout());
175+
176+
let saved_timestamp = tracker.update_last_vacuum_time().unwrap();
177+
178+
let (is_vacuum_needed, last_timestamp) = tracker.check_vacuum_needed().unwrap();
179+
180+
assert!(!is_vacuum_needed);
181+
assert_eq!(last_timestamp, Some(saved_timestamp));
182+
}
183+
}

0 commit comments

Comments
 (0)