1
- use std:: marker:: PhantomData ;
1
+ use std:: {
2
+ marker:: PhantomData ,
3
+ sync:: Arc ,
4
+ time:: { Duration , SystemTime , UNIX_EPOCH } ,
5
+ } ;
2
6
3
- use ethportal_api:: { OverlayContentKey , RawContentValue } ;
7
+ use alloy:: eips:: merge:: EPOCH_SLOTS ;
8
+ use ethportal_api:: { jsonrpsee:: tokio, OverlayContentKey , RawContentValue } ;
4
9
use r2d2:: Pool ;
5
10
use r2d2_sqlite:: SqliteConnectionManager ;
6
11
use rusqlite:: { named_params, types:: Type , OptionalExtension } ;
12
+ use tokio:: task:: JoinHandle ;
7
13
use tracing:: { debug, warn} ;
8
14
use trin_metrics:: storage:: StorageMetricsReporter ;
9
15
@@ -15,6 +21,9 @@ use crate::{
15
21
ContentId ,
16
22
} ;
17
23
24
+ pub const BEACON_GENESIS_TIME : u64 = 1606824023 ;
25
+ pub const SLOTS_PER_HISTORICAL_ROOT : u64 = 8192 ;
26
+
18
27
/// The store for storing ephemeral headers, bodies, and receipts.
19
28
#[ allow( unused) ]
20
29
#[ derive( Debug ) ]
@@ -27,6 +36,8 @@ pub struct EphemeralV1Store<TContentKey: OverlayContentKey> {
27
36
metrics : StorageMetricsReporter ,
28
37
/// Phantom Content Key
29
38
_phantom_content_key : PhantomData < TContentKey > ,
39
+ /// Background task handle for periodic purging
40
+ background_purge_task : Option < JoinHandle < ( ) > > ,
30
41
}
31
42
32
43
impl < TContentKey : OverlayContentKey > VersionedContentStore for EphemeralV1Store < TContentKey > {
@@ -59,6 +70,7 @@ impl<TContentKey: OverlayContentKey> VersionedContentStore for EphemeralV1Store<
59
70
metrics : StorageMetricsReporter :: new ( subnetwork) ,
60
71
_phantom_content_key : PhantomData ,
61
72
config,
73
+ background_purge_task : None ,
62
74
} ;
63
75
store. init ( ) ?;
64
76
Ok ( store)
@@ -71,11 +83,76 @@ impl<TContentKey: OverlayContentKey> EphemeralV1Store<TContentKey> {
71
83
fn init ( & mut self ) -> Result < ( ) , ContentStoreError > {
72
84
self . init_usage_stats ( ) ?;
73
85
74
- // TODO: Prune if necessary.
86
+ // Purge content based on the last historical summaries update slot
87
+ let current_epoch = Self :: expected_current_epoch ( ) ;
88
+ let cutoff_slot = Self :: last_summaries_slot ( current_epoch) ;
89
+
90
+ let conn = self . config . sql_connection_pool . get ( ) ?;
91
+ let query = sql:: purge_by_slot ( & self . config . content_type ) ;
92
+ let rows_deleted = conn. execute ( & query, named_params ! { ":slot" : cutoff_slot} ) ?;
93
+ if rows_deleted > 0 {
94
+ debug ! (
95
+ "Purged {} content with slot < {} during initialization" ,
96
+ rows_deleted, cutoff_slot
97
+ ) ;
98
+ }
99
+
100
+ Ok ( ( ) )
101
+ }
102
+
103
+ /// Starts the background task for periodic purging.
104
+ /// This can be called explicitly after initialization if needed.
105
+ pub fn start_background_purge_task ( & mut self ) -> Result < ( ) , ContentStoreError > {
106
+ let config = Arc :: new ( self . config . clone ( ) ) ;
107
+
108
+ let handle = tokio:: spawn ( async move {
109
+ // Run purge immediately when task starts
110
+ if let Err ( e) = Self :: purge_content_before_last_summary_internal ( & config) {
111
+ warn ! ( "Error purging content in background task: {}" , e) ;
112
+ }
113
+
114
+ let mut interval = tokio:: time:: interval ( Duration :: from_secs ( 12 * EPOCH_SLOTS ) ) ; // One epoch duration
115
+ loop {
116
+ interval. tick ( ) . await ;
117
+
118
+ // Check if we're at a historical summaries update boundary
119
+ let current_epoch = Self :: expected_current_epoch ( ) ;
120
+ let next_epoch = current_epoch + 1 ;
121
+ let period = SLOTS_PER_HISTORICAL_ROOT / EPOCH_SLOTS ;
75
122
123
+ if next_epoch % period == 0 {
124
+ if let Err ( e) = Self :: purge_content_before_last_summary_internal ( & config) {
125
+ warn ! ( "Error purging content in background task: {}" , e) ;
126
+ }
127
+ }
128
+ }
129
+ } ) ;
130
+
131
+ self . background_purge_task = Some ( handle) ;
76
132
Ok ( ( ) )
77
133
}
78
134
135
+ /// Stops the background purge task if it's running.
136
+ pub fn stop_background_purge_task ( & mut self ) {
137
+ if let Some ( handle) = self . background_purge_task . take ( ) {
138
+ handle. abort ( ) ;
139
+ }
140
+ }
141
+
142
+ /// Internal method to purge content, used by both the main thread and background task
143
+ fn purge_content_before_last_summary_internal (
144
+ config : & Arc < EphemeralV1StoreConfig > ,
145
+ ) -> Result < usize , ContentStoreError > {
146
+ let current_epoch = Self :: expected_current_epoch ( ) ;
147
+ let cutoff_slot = Self :: last_summaries_slot ( current_epoch) ;
148
+
149
+ let conn = config. sql_connection_pool . get ( ) ?;
150
+ let query = sql:: purge_by_slot ( & config. content_type ) ;
151
+
152
+ let rows_deleted = conn. execute ( & query, named_params ! { ":slot" : cutoff_slot } ) ?;
153
+ Ok ( rows_deleted)
154
+ }
155
+
79
156
// PUBLIC FUNCTIONS
80
157
81
158
/// Returns whether data associated with the content id is already stored.
@@ -225,6 +302,15 @@ impl<TContentKey: OverlayContentKey> EphemeralV1Store<TContentKey> {
225
302
self . metrics . get_summary ( )
226
303
}
227
304
305
+ /// Manually triggers a purge of content before the last historical summary.
306
+ /// This can be used to manually control when content is purged, independent of the background
307
+ /// task.
308
+ ///
309
+ /// Returns the number of rows deleted.
310
+ pub fn trigger_content_purge ( & self ) -> Result < usize , ContentStoreError > {
311
+ Self :: purge_content_before_last_summary_internal ( & Arc :: new ( self . config . clone ( ) ) )
312
+ }
313
+
228
314
// INTERNAL FUNCTIONS
229
315
230
316
/// Lookup and set `usage_stats`.
@@ -263,6 +349,47 @@ impl<TContentKey: OverlayContentKey> EphemeralV1Store<TContentKey> {
263
349
) -> u64 {
264
350
( raw_content_id. len ( ) + raw_content_key. len ( ) + raw_content_value. len ( ) ) as u64
265
351
}
352
+
353
+ fn expected_current_epoch ( ) -> u64 {
354
+ let now = SystemTime :: now ( ) ;
355
+ let now = now. duration_since ( UNIX_EPOCH ) . expect ( "Time went backwards" ) ;
356
+ let since_genesis = now - Duration :: from_secs ( BEACON_GENESIS_TIME ) ;
357
+
358
+ since_genesis. as_secs ( ) / 12 / EPOCH_SLOTS
359
+ }
360
+
361
+ /// Computes the slot at which the last historical summary event occurred.
362
+ /// Historical summary events are appended when the next epoch is a multiple
363
+ /// of `period = SLOTS_PER_HISTORICAL_ROOT / EPOCH_SLOTS`.
364
+ ///
365
+ /// If the current_epoch is less than the first event boundary (and assuming a genesis event
366
+ /// at epoch 0), then this function returns 0.
367
+ fn last_summaries_slot ( current_epoch : u64 ) -> u64 {
368
+ // Calculate the period (in epochs) at which events are appended.
369
+ let period = SLOTS_PER_HISTORICAL_ROOT / EPOCH_SLOTS ;
370
+ // Compute candidate event epoch:
371
+ // This candidate is based on (current_epoch + 1) because events are appended
372
+ // when transitioning to the next epoch.
373
+ let candidate = ( ( current_epoch + 1 ) / period) * period;
374
+ // If candidate is greater than current_epoch, then that event is in the future,
375
+ // so the last event occurred one period earlier.
376
+ let last_summaries_epoch = if candidate > current_epoch {
377
+ candidate. saturating_sub ( period)
378
+ } else {
379
+ candidate
380
+ } ;
381
+
382
+ last_summaries_epoch * EPOCH_SLOTS
383
+ }
384
+ }
385
+
386
+ impl < TContentKey : OverlayContentKey > Drop for EphemeralV1Store < TContentKey > {
387
+ fn drop ( & mut self ) {
388
+ // Cancel the background task when the store is dropped
389
+ if let Some ( handle) = self . background_purge_task . take ( ) {
390
+ handle. abort ( ) ;
391
+ }
392
+ }
266
393
}
267
394
268
395
/// Creates table and indexes if they don't already exist.
@@ -280,6 +407,7 @@ mod tests {
280
407
use anyhow:: Result ;
281
408
use ethportal_api:: { types:: network:: Subnetwork , IdentityContentKey } ;
282
409
use tempfile:: TempDir ;
410
+ use tokio:: time:: { sleep, Duration } ;
283
411
284
412
use super :: * ;
285
413
use crate :: { test_utils:: generate_random_bytes, utils:: setup_sql} ;
@@ -451,4 +579,105 @@ mod tests {
451
579
452
580
Ok ( ( ) )
453
581
}
582
+
583
+ #[ tokio:: test]
584
+ async fn test_background_purge_task ( ) -> Result < ( ) > {
585
+ let temp_dir = TempDir :: new ( ) ?;
586
+ let config = create_config ( & temp_dir) ;
587
+
588
+ // Create store without starting background task
589
+ let mut store = EphemeralV1Store :: < IdentityContentKey > :: create (
590
+ ContentType :: HistoryEphemeral ,
591
+ config. clone ( ) ,
592
+ ) ?;
593
+
594
+ // Verify background task is not running initially
595
+ assert ! ( store. background_purge_task. is_none( ) ) ;
596
+
597
+ // Insert test data with slots before and after the cutoff
598
+ let current_epoch = EphemeralV1Store :: < IdentityContentKey > :: expected_current_epoch ( ) ;
599
+ let cutoff_slot =
600
+ EphemeralV1Store :: < IdentityContentKey > :: last_summaries_slot ( current_epoch) ;
601
+
602
+ let ( key1, value1) = generate_key_value ( ) ;
603
+ let ( key2, value2) = generate_key_value ( ) ;
604
+ let ( key3, value3) = generate_key_value ( ) ;
605
+
606
+ // Insert data with slots before cutoff
607
+ store. insert ( & key1, value1, 0 , cutoff_slot. saturating_sub ( 100 ) ) ?;
608
+ store. insert ( & key2, value2, 0 , cutoff_slot. saturating_sub ( 50 ) ) ?;
609
+
610
+ // Insert data with slot after cutoff
611
+ store. insert ( & key3, value3, 0 , cutoff_slot + 100 ) ?;
612
+
613
+ // Verify data is present before starting background task
614
+ assert ! ( store. has_content( & ContentId :: from( key1. content_id( ) ) ) ?) ;
615
+ assert ! ( store. has_content( & ContentId :: from( key2. content_id( ) ) ) ?) ;
616
+ assert ! ( store. has_content( & ContentId :: from( key3. content_id( ) ) ) ?) ;
617
+
618
+ // Start the background task
619
+ store. start_background_purge_task ( ) ?;
620
+ // Wait for the background task to run and purge data
621
+ sleep ( Duration :: from_secs ( 1 ) ) . await ;
622
+ assert ! ( store. background_purge_task. is_some( ) ) ;
623
+
624
+ // Verify that content before cutoff was purged
625
+ assert ! (
626
+ !store. has_content( & ContentId :: from( key1. content_id( ) ) ) ?,
627
+ "key1 should be purged"
628
+ ) ;
629
+ assert ! (
630
+ !store. has_content( & ContentId :: from( key2. content_id( ) ) ) ?,
631
+ "key2 should be purged"
632
+ ) ;
633
+ assert ! (
634
+ store. has_content( & ContentId :: from( key3. content_id( ) ) ) ?,
635
+ "key3 should not be purged"
636
+ ) ;
637
+
638
+ // Stop the background task
639
+ store. stop_background_purge_task ( ) ;
640
+ assert ! ( store. background_purge_task. is_none( ) ) ;
641
+
642
+ Ok ( ( ) )
643
+ }
644
+
645
+ #[ test]
646
+ fn test_purge_content_during_init ( ) -> Result < ( ) > {
647
+ let temp_dir = TempDir :: new ( ) ?;
648
+ let config = create_config ( & temp_dir) ;
649
+
650
+ // Create and populate store with test data
651
+ let mut store = EphemeralV1Store :: < IdentityContentKey > :: create (
652
+ ContentType :: HistoryEphemeral ,
653
+ config. clone ( ) ,
654
+ ) ?;
655
+
656
+ // Insert test data with slots before and after the cutoff
657
+ let current_epoch = EphemeralV1Store :: < IdentityContentKey > :: expected_current_epoch ( ) ;
658
+ let cutoff_slot =
659
+ EphemeralV1Store :: < IdentityContentKey > :: last_summaries_slot ( current_epoch) ;
660
+
661
+ let ( key1, value1) = generate_key_value ( ) ;
662
+ let ( key2, value2) = generate_key_value ( ) ;
663
+ let ( key3, value3) = generate_key_value ( ) ;
664
+
665
+ // Insert data with slots before cutoff
666
+ store. insert ( & key1, value1, 0 , cutoff_slot. saturating_sub ( 100 ) ) ?;
667
+ store. insert ( & key2, value2, 0 , cutoff_slot. saturating_sub ( 50 ) ) ?;
668
+
669
+ // Insert data with slot after cutoff
670
+ store. insert ( & key3, value3, 0 , cutoff_slot + 100 ) ?;
671
+
672
+ // Create a new store instance to trigger init and purge
673
+ let new_store =
674
+ EphemeralV1Store :: < IdentityContentKey > :: create ( ContentType :: HistoryEphemeral , config) ?;
675
+
676
+ // Verify that content before cutoff was purged
677
+ assert ! ( !new_store. has_content( & ContentId :: from( key1. content_id( ) ) ) ?) ;
678
+ assert ! ( !new_store. has_content( & ContentId :: from( key2. content_id( ) ) ) ?) ;
679
+ assert ! ( new_store. has_content( & ContentId :: from( key3. content_id( ) ) ) ?) ;
680
+
681
+ Ok ( ( ) )
682
+ }
454
683
}
0 commit comments