From 88845e8940214545a8d7acf76864d2a23d54ae81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9COgnyan?= Date: Sun, 23 Mar 2025 15:14:20 +0200 Subject: [PATCH] feat: purge ephemeral storage --- .../storage/src/versioned/ephemeral_v1/sql.rs | 8 + .../src/versioned/ephemeral_v1/store.rs | 232 +++++++++++++++++- 2 files changed, 237 insertions(+), 3 deletions(-) diff --git a/crates/storage/src/versioned/ephemeral_v1/sql.rs b/crates/storage/src/versioned/ephemeral_v1/sql.rs index 9eb30486a..bf9af34da 100644 --- a/crates/storage/src/versioned/ephemeral_v1/sql.rs +++ b/crates/storage/src/versioned/ephemeral_v1/sql.rs @@ -76,3 +76,11 @@ pub fn entry_count_and_size(content_type: &ContentType) -> String { table_name(content_type) ) } + +pub fn purge_by_slot(content_type: &ContentType) -> String { + format!( + "DELETE FROM {} + WHERE slot < :slot", + table_name(content_type) + ) +} diff --git a/crates/storage/src/versioned/ephemeral_v1/store.rs b/crates/storage/src/versioned/ephemeral_v1/store.rs index 30a45b2b2..27c5c1be7 100644 --- a/crates/storage/src/versioned/ephemeral_v1/store.rs +++ b/crates/storage/src/versioned/ephemeral_v1/store.rs @@ -1,9 +1,15 @@ -use std::marker::PhantomData; +use std::{ + marker::PhantomData, + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; -use ethportal_api::{OverlayContentKey, RawContentValue}; +use alloy::eips::merge::EPOCH_SLOTS; +use ethportal_api::{jsonrpsee::tokio, OverlayContentKey, RawContentValue}; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; use rusqlite::{named_params, types::Type, OptionalExtension}; +use tokio::task::JoinHandle; use tracing::{debug, warn}; use trin_metrics::storage::StorageMetricsReporter; @@ -15,6 +21,9 @@ use crate::{ ContentId, }; +pub const BEACON_GENESIS_TIME: u64 = 1606824023; +pub const SLOTS_PER_HISTORICAL_ROOT: u64 = 8192; + /// The store for storing ephemeral headers, bodies, and receipts. #[allow(unused)] #[derive(Debug)] @@ -27,6 +36,8 @@ pub struct EphemeralV1Store { metrics: StorageMetricsReporter, /// Phantom Content Key _phantom_content_key: PhantomData, + /// Background task handle for periodic purging + background_purge_task: Option>, } impl VersionedContentStore for EphemeralV1Store { @@ -59,6 +70,7 @@ impl VersionedContentStore for EphemeralV1Store< metrics: StorageMetricsReporter::new(subnetwork), _phantom_content_key: PhantomData, config, + background_purge_task: None, }; store.init()?; Ok(store) @@ -71,13 +83,61 @@ impl EphemeralV1Store { fn init(&mut self) -> Result<(), ContentStoreError> { self.init_usage_stats()?; - // TODO: Prune if necessary. + // Purge content based on the last historical summaries update slot + let rows_deleted = + Self::purge_content_before_last_summary_internal(&Arc::new(self.config.clone()))?; + + if rows_deleted > 0 { + debug!( + "Purged {} ephemeral content with during initialization", + rows_deleted + ); + } Ok(()) } // PUBLIC FUNCTIONS + /// Starts the background task for periodic purging. + /// This can be called explicitly after initialization if needed. + pub fn start_background_purge_task(&mut self) -> Result<(), ContentStoreError> { + let config = Arc::new(self.config.clone()); + + let handle = tokio::spawn(async move { + // Run purge immediately when task starts + if let Err(e) = Self::purge_content_before_last_summary_internal(&config) { + warn!("Error purging content in background task: {}", e); + } + + let mut interval = tokio::time::interval(Duration::from_secs(12 * EPOCH_SLOTS)); // One epoch duration + loop { + interval.tick().await; + + // Check if we're at a historical summaries update boundary + let current_epoch = Self::expected_current_epoch(); + let next_epoch = current_epoch + 1; + let period = SLOTS_PER_HISTORICAL_ROOT / EPOCH_SLOTS; + + if next_epoch % period == 0 { + if let Err(e) = Self::purge_content_before_last_summary_internal(&config) { + warn!("Error purging content in background task: {}", e); + } + } + } + }); + + self.background_purge_task = Some(handle); + Ok(()) + } + + /// Stops the background purge task if it's running. + pub fn stop_background_purge_task(&mut self) { + if let Some(handle) = self.background_purge_task.take() { + handle.abort(); + } + } + /// Returns whether data associated with the content id is already stored. pub fn has_content(&self, content_id: &ContentId) -> Result { let timer = self.metrics.start_process_timer("has_content"); @@ -225,6 +285,15 @@ impl EphemeralV1Store { self.metrics.get_summary() } + /// Manually triggers a purge of content before the last historical summary. + /// This can be used to manually control when content is purged, independent of the background + /// task. + /// + /// Returns the number of rows deleted. + pub fn trigger_content_purge(&self) -> Result { + Self::purge_content_before_last_summary_internal(&Arc::new(self.config.clone())) + } + // INTERNAL FUNCTIONS /// Lookup and set `usage_stats`. @@ -263,6 +332,61 @@ impl EphemeralV1Store { ) -> u64 { (raw_content_id.len() + raw_content_key.len() + raw_content_value.len()) as u64 } + + fn expected_current_epoch() -> u64 { + let now = SystemTime::now(); + let now = now.duration_since(UNIX_EPOCH).expect("Time went backwards"); + let since_genesis = now - Duration::from_secs(BEACON_GENESIS_TIME); + + since_genesis.as_secs() / 12 / EPOCH_SLOTS + } + + /// Internal method to purge content, used by both the main thread and background task + fn purge_content_before_last_summary_internal( + config: &Arc, + ) -> Result { + let current_epoch = Self::expected_current_epoch(); + let cutoff_slot = Self::last_summaries_slot(current_epoch); + + let conn = config.sql_connection_pool.get()?; + let query = sql::purge_by_slot(&config.content_type); + + let rows_deleted = conn.execute(&query, named_params! { ":slot": cutoff_slot })?; + Ok(rows_deleted) + } + + /// Computes the slot at which the last historical summary event occurred. + /// Historical summary events are appended when the next epoch is a multiple + /// of `period = SLOTS_PER_HISTORICAL_ROOT / EPOCH_SLOTS`. + /// + /// If the current_epoch is less than the first event boundary (and assuming a genesis event + /// at epoch 0), then this function returns 0. + fn last_summaries_slot(current_epoch: u64) -> u64 { + // Calculate the period (in epochs) at which events are appended. + let period = SLOTS_PER_HISTORICAL_ROOT / EPOCH_SLOTS; + // Compute candidate event epoch: + // This candidate is based on (current_epoch + 1) because events are appended + // when transitioning to the next epoch. + let candidate = ((current_epoch + 1) / period) * period; + // If candidate is greater than current_epoch, then that event is in the future, + // so the last event occurred one period earlier. + let last_summaries_epoch = if candidate > current_epoch { + candidate.saturating_sub(period) + } else { + candidate + }; + + last_summaries_epoch * EPOCH_SLOTS + } +} + +impl Drop for EphemeralV1Store { + fn drop(&mut self) { + // Cancel the background task when the store is dropped + if let Some(handle) = self.background_purge_task.take() { + handle.abort(); + } + } } /// Creates table and indexes if they don't already exist. @@ -280,6 +404,7 @@ mod tests { use anyhow::Result; use ethportal_api::{types::network::Subnetwork, IdentityContentKey}; use tempfile::TempDir; + use tokio::time::{sleep, Duration}; use super::*; use crate::{test_utils::generate_random_bytes, utils::setup_sql}; @@ -451,4 +576,105 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_background_purge_task() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir); + + // Create store without starting background task + let mut store = EphemeralV1Store::::create( + ContentType::HistoryEphemeral, + config.clone(), + )?; + + // Verify background task is not running initially + assert!(store.background_purge_task.is_none()); + + // Insert test data with slots before and after the cutoff + let current_epoch = EphemeralV1Store::::expected_current_epoch(); + let cutoff_slot = + EphemeralV1Store::::last_summaries_slot(current_epoch); + + let (key1, value1) = generate_key_value(); + let (key2, value2) = generate_key_value(); + let (key3, value3) = generate_key_value(); + + // Insert data with slots before cutoff + store.insert(&key1, value1, 0, cutoff_slot.saturating_sub(100))?; + store.insert(&key2, value2, 0, cutoff_slot.saturating_sub(50))?; + + // Insert data with slot after cutoff + store.insert(&key3, value3, 0, cutoff_slot + 100)?; + + // Verify data is present before starting background task + assert!(store.has_content(&ContentId::from(key1.content_id()))?); + assert!(store.has_content(&ContentId::from(key2.content_id()))?); + assert!(store.has_content(&ContentId::from(key3.content_id()))?); + + // Start the background task + store.start_background_purge_task()?; + // Wait for the background task to run and purge data + sleep(Duration::from_secs(1)).await; + assert!(store.background_purge_task.is_some()); + + // Verify that content before cutoff was purged + assert!( + !store.has_content(&ContentId::from(key1.content_id()))?, + "key1 should be purged" + ); + assert!( + !store.has_content(&ContentId::from(key2.content_id()))?, + "key2 should be purged" + ); + assert!( + store.has_content(&ContentId::from(key3.content_id()))?, + "key3 should not be purged" + ); + + // Stop the background task + store.stop_background_purge_task(); + assert!(store.background_purge_task.is_none()); + + Ok(()) + } + + #[test] + fn test_purge_content_during_init() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir); + + // Create and populate store with test data + let mut store = EphemeralV1Store::::create( + ContentType::HistoryEphemeral, + config.clone(), + )?; + + // Insert test data with slots before and after the cutoff + let current_epoch = EphemeralV1Store::::expected_current_epoch(); + let cutoff_slot = + EphemeralV1Store::::last_summaries_slot(current_epoch); + + let (key1, value1) = generate_key_value(); + let (key2, value2) = generate_key_value(); + let (key3, value3) = generate_key_value(); + + // Insert data with slots before cutoff + store.insert(&key1, value1, 0, cutoff_slot.saturating_sub(100))?; + store.insert(&key2, value2, 0, cutoff_slot.saturating_sub(50))?; + + // Insert data with slot after cutoff + store.insert(&key3, value3, 0, cutoff_slot + 100)?; + + // Create a new store instance to trigger init and purge + let new_store = + EphemeralV1Store::::create(ContentType::HistoryEphemeral, config)?; + + // Verify that content before cutoff was purged + assert!(!new_store.has_content(&ContentId::from(key1.content_id()))?); + assert!(!new_store.has_content(&ContentId::from(key2.content_id()))?); + assert!(new_store.has_content(&ContentId::from(key3.content_id()))?); + + Ok(()) + } }