Skip to content

Commit 9f11b37

Browse files
committed
feat: purge ephemeral storage
1 parent 4d605ac commit 9f11b37

File tree

2 files changed

+240
-3
lines changed

2 files changed

+240
-3
lines changed

crates/storage/src/versioned/ephemeral_v1/sql.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,11 @@ pub fn entry_count_and_size(content_type: &ContentType) -> String {
7676
table_name(content_type)
7777
)
7878
}
79+
80+
pub fn purge_by_slot(content_type: &ContentType) -> String {
81+
format!(
82+
"DELETE FROM {}
83+
WHERE slot < :slot",
84+
table_name(content_type)
85+
)
86+
}

crates/storage/src/versioned/ephemeral_v1/store.rs

Lines changed: 232 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
1-
use std::marker::PhantomData;
1+
use std::{
2+
marker::PhantomData,
3+
sync::Arc,
4+
time::{Duration, SystemTime, UNIX_EPOCH},
5+
};
26

3-
use ethportal_api::{OverlayContentKey, RawContentValue};
7+
use alloy::eips::merge::EPOCH_SLOTS;
8+
use ethportal_api::{jsonrpsee::tokio, OverlayContentKey, RawContentValue};
49
use r2d2::Pool;
510
use r2d2_sqlite::SqliteConnectionManager;
611
use rusqlite::{named_params, types::Type, OptionalExtension};
12+
use tokio::task::JoinHandle;
713
use tracing::{debug, warn};
814
use trin_metrics::storage::StorageMetricsReporter;
915

@@ -15,6 +21,9 @@ use crate::{
1521
ContentId,
1622
};
1723

24+
pub const BEACON_GENESIS_TIME: u64 = 1606824023;
25+
pub const SLOTS_PER_HISTORICAL_ROOT: u64 = 8192;
26+
1827
/// The store for storing ephemeral headers, bodies, and receipts.
1928
#[allow(unused)]
2029
#[derive(Debug)]
@@ -27,6 +36,8 @@ pub struct EphemeralV1Store<TContentKey: OverlayContentKey> {
2736
metrics: StorageMetricsReporter,
2837
/// Phantom Content Key
2938
_phantom_content_key: PhantomData<TContentKey>,
39+
/// Background task handle for periodic purging
40+
background_purge_task: Option<JoinHandle<()>>,
3041
}
3142

3243
impl<TContentKey: OverlayContentKey> VersionedContentStore for EphemeralV1Store<TContentKey> {
@@ -59,6 +70,7 @@ impl<TContentKey: OverlayContentKey> VersionedContentStore for EphemeralV1Store<
5970
metrics: StorageMetricsReporter::new(subnetwork),
6071
_phantom_content_key: PhantomData,
6172
config,
73+
background_purge_task: None,
6274
};
6375
store.init()?;
6476
Ok(store)
@@ -71,13 +83,64 @@ impl<TContentKey: OverlayContentKey> EphemeralV1Store<TContentKey> {
7183
fn init(&mut self) -> Result<(), ContentStoreError> {
7284
self.init_usage_stats()?;
7385

74-
// TODO: Prune if necessary.
86+
// Purge content based on the last historical summaries update slot
87+
let current_epoch = Self::expected_current_epoch();
88+
let cutoff_slot = Self::last_summaries_slot(current_epoch);
89+
90+
let conn = self.config.sql_connection_pool.get()?;
91+
let query = sql::purge_by_slot(&self.config.content_type);
92+
let rows_deleted = conn.execute(&query, named_params! { ":slot": cutoff_slot})?;
93+
if rows_deleted > 0 {
94+
debug!(
95+
"Purged {} content with slot < {} during initialization",
96+
rows_deleted, cutoff_slot
97+
);
98+
}
7599

76100
Ok(())
77101
}
78102

79103
// PUBLIC FUNCTIONS
80104

105+
/// Starts the background task for periodic purging.
106+
/// This can be called explicitly after initialization if needed.
107+
pub fn start_background_purge_task(&mut self) -> Result<(), ContentStoreError> {
108+
let config = Arc::new(self.config.clone());
109+
110+
let handle = tokio::spawn(async move {
111+
// Run purge immediately when task starts
112+
if let Err(e) = Self::purge_content_before_last_summary_internal(&config) {
113+
warn!("Error purging content in background task: {}", e);
114+
}
115+
116+
let mut interval = tokio::time::interval(Duration::from_secs(12 * EPOCH_SLOTS)); // One epoch duration
117+
loop {
118+
interval.tick().await;
119+
120+
// Check if we're at a historical summaries update boundary
121+
let current_epoch = Self::expected_current_epoch();
122+
let next_epoch = current_epoch + 1;
123+
let period = SLOTS_PER_HISTORICAL_ROOT / EPOCH_SLOTS;
124+
125+
if next_epoch % period == 0 {
126+
if let Err(e) = Self::purge_content_before_last_summary_internal(&config) {
127+
warn!("Error purging content in background task: {}", e);
128+
}
129+
}
130+
}
131+
});
132+
133+
self.background_purge_task = Some(handle);
134+
Ok(())
135+
}
136+
137+
/// Stops the background purge task if it's running.
138+
pub fn stop_background_purge_task(&mut self) {
139+
if let Some(handle) = self.background_purge_task.take() {
140+
handle.abort();
141+
}
142+
}
143+
81144
/// Returns whether data associated with the content id is already stored.
82145
pub fn has_content(&self, content_id: &ContentId) -> Result<bool, ContentStoreError> {
83146
let timer = self.metrics.start_process_timer("has_content");
@@ -225,6 +288,15 @@ impl<TContentKey: OverlayContentKey> EphemeralV1Store<TContentKey> {
225288
self.metrics.get_summary()
226289
}
227290

291+
/// Manually triggers a purge of content before the last historical summary.
292+
/// This can be used to manually control when content is purged, independent of the background
293+
/// task.
294+
///
295+
/// Returns the number of rows deleted.
296+
pub fn trigger_content_purge(&self) -> Result<usize, ContentStoreError> {
297+
Self::purge_content_before_last_summary_internal(&Arc::new(self.config.clone()))
298+
}
299+
228300
// INTERNAL FUNCTIONS
229301

230302
/// Lookup and set `usage_stats`.
@@ -263,6 +335,61 @@ impl<TContentKey: OverlayContentKey> EphemeralV1Store<TContentKey> {
263335
) -> u64 {
264336
(raw_content_id.len() + raw_content_key.len() + raw_content_value.len()) as u64
265337
}
338+
339+
fn expected_current_epoch() -> u64 {
340+
let now = SystemTime::now();
341+
let now = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
342+
let since_genesis = now - Duration::from_secs(BEACON_GENESIS_TIME);
343+
344+
since_genesis.as_secs() / 12 / EPOCH_SLOTS
345+
}
346+
347+
/// Internal method to purge content, used by both the main thread and background task
348+
fn purge_content_before_last_summary_internal(
349+
config: &Arc<EphemeralV1StoreConfig>,
350+
) -> Result<usize, ContentStoreError> {
351+
let current_epoch = Self::expected_current_epoch();
352+
let cutoff_slot = Self::last_summaries_slot(current_epoch);
353+
354+
let conn = config.sql_connection_pool.get()?;
355+
let query = sql::purge_by_slot(&config.content_type);
356+
357+
let rows_deleted = conn.execute(&query, named_params! { ":slot": cutoff_slot })?;
358+
Ok(rows_deleted)
359+
}
360+
361+
/// Computes the slot at which the last historical summary event occurred.
362+
/// Historical summary events are appended when the next epoch is a multiple
363+
/// of `period = SLOTS_PER_HISTORICAL_ROOT / EPOCH_SLOTS`.
364+
///
365+
/// If the current_epoch is less than the first event boundary (and assuming a genesis event
366+
/// at epoch 0), then this function returns 0.
367+
fn last_summaries_slot(current_epoch: u64) -> u64 {
368+
// Calculate the period (in epochs) at which events are appended.
369+
let period = SLOTS_PER_HISTORICAL_ROOT / EPOCH_SLOTS;
370+
// Compute candidate event epoch:
371+
// This candidate is based on (current_epoch + 1) because events are appended
372+
// when transitioning to the next epoch.
373+
let candidate = ((current_epoch + 1) / period) * period;
374+
// If candidate is greater than current_epoch, then that event is in the future,
375+
// so the last event occurred one period earlier.
376+
let last_summaries_epoch = if candidate > current_epoch {
377+
candidate.saturating_sub(period)
378+
} else {
379+
candidate
380+
};
381+
382+
last_summaries_epoch * EPOCH_SLOTS
383+
}
384+
}
385+
386+
impl<TContentKey: OverlayContentKey> Drop for EphemeralV1Store<TContentKey> {
387+
fn drop(&mut self) {
388+
// Cancel the background task when the store is dropped
389+
if let Some(handle) = self.background_purge_task.take() {
390+
handle.abort();
391+
}
392+
}
266393
}
267394

268395
/// Creates table and indexes if they don't already exist.
@@ -280,6 +407,7 @@ mod tests {
280407
use anyhow::Result;
281408
use ethportal_api::{types::network::Subnetwork, IdentityContentKey};
282409
use tempfile::TempDir;
410+
use tokio::time::{sleep, Duration};
283411

284412
use super::*;
285413
use crate::{test_utils::generate_random_bytes, utils::setup_sql};
@@ -451,4 +579,105 @@ mod tests {
451579

452580
Ok(())
453581
}
582+
583+
#[tokio::test]
584+
async fn test_background_purge_task() -> Result<()> {
585+
let temp_dir = TempDir::new()?;
586+
let config = create_config(&temp_dir);
587+
588+
// Create store without starting background task
589+
let mut store = EphemeralV1Store::<IdentityContentKey>::create(
590+
ContentType::HistoryEphemeral,
591+
config.clone(),
592+
)?;
593+
594+
// Verify background task is not running initially
595+
assert!(store.background_purge_task.is_none());
596+
597+
// Insert test data with slots before and after the cutoff
598+
let current_epoch = EphemeralV1Store::<IdentityContentKey>::expected_current_epoch();
599+
let cutoff_slot =
600+
EphemeralV1Store::<IdentityContentKey>::last_summaries_slot(current_epoch);
601+
602+
let (key1, value1) = generate_key_value();
603+
let (key2, value2) = generate_key_value();
604+
let (key3, value3) = generate_key_value();
605+
606+
// Insert data with slots before cutoff
607+
store.insert(&key1, value1, 0, cutoff_slot.saturating_sub(100))?;
608+
store.insert(&key2, value2, 0, cutoff_slot.saturating_sub(50))?;
609+
610+
// Insert data with slot after cutoff
611+
store.insert(&key3, value3, 0, cutoff_slot + 100)?;
612+
613+
// Verify data is present before starting background task
614+
assert!(store.has_content(&ContentId::from(key1.content_id()))?);
615+
assert!(store.has_content(&ContentId::from(key2.content_id()))?);
616+
assert!(store.has_content(&ContentId::from(key3.content_id()))?);
617+
618+
// Start the background task
619+
store.start_background_purge_task()?;
620+
// Wait for the background task to run and purge data
621+
sleep(Duration::from_secs(1)).await;
622+
assert!(store.background_purge_task.is_some());
623+
624+
// Verify that content before cutoff was purged
625+
assert!(
626+
!store.has_content(&ContentId::from(key1.content_id()))?,
627+
"key1 should be purged"
628+
);
629+
assert!(
630+
!store.has_content(&ContentId::from(key2.content_id()))?,
631+
"key2 should be purged"
632+
);
633+
assert!(
634+
store.has_content(&ContentId::from(key3.content_id()))?,
635+
"key3 should not be purged"
636+
);
637+
638+
// Stop the background task
639+
store.stop_background_purge_task();
640+
assert!(store.background_purge_task.is_none());
641+
642+
Ok(())
643+
}
644+
645+
#[test]
646+
fn test_purge_content_during_init() -> Result<()> {
647+
let temp_dir = TempDir::new()?;
648+
let config = create_config(&temp_dir);
649+
650+
// Create and populate store with test data
651+
let mut store = EphemeralV1Store::<IdentityContentKey>::create(
652+
ContentType::HistoryEphemeral,
653+
config.clone(),
654+
)?;
655+
656+
// Insert test data with slots before and after the cutoff
657+
let current_epoch = EphemeralV1Store::<IdentityContentKey>::expected_current_epoch();
658+
let cutoff_slot =
659+
EphemeralV1Store::<IdentityContentKey>::last_summaries_slot(current_epoch);
660+
661+
let (key1, value1) = generate_key_value();
662+
let (key2, value2) = generate_key_value();
663+
let (key3, value3) = generate_key_value();
664+
665+
// Insert data with slots before cutoff
666+
store.insert(&key1, value1, 0, cutoff_slot.saturating_sub(100))?;
667+
store.insert(&key2, value2, 0, cutoff_slot.saturating_sub(50))?;
668+
669+
// Insert data with slot after cutoff
670+
store.insert(&key3, value3, 0, cutoff_slot + 100)?;
671+
672+
// Create a new store instance to trigger init and purge
673+
let new_store =
674+
EphemeralV1Store::<IdentityContentKey>::create(ContentType::HistoryEphemeral, config)?;
675+
676+
// Verify that content before cutoff was purged
677+
assert!(!new_store.has_content(&ContentId::from(key1.content_id()))?);
678+
assert!(!new_store.has_content(&ContentId::from(key2.content_id()))?);
679+
assert!(new_store.has_content(&ContentId::from(key3.content_id()))?);
680+
681+
Ok(())
682+
}
454683
}

0 commit comments

Comments
 (0)