88 "github.com/ethereum/go-ethereum/log"
99)
1010
11+ const MaxDeleteRangeSize = uint64 (50_000 )
12+
1113type Strategy interface {
1214 Name () string
1315 RetentionBlocks () uint64
@@ -17,6 +19,10 @@ type Strategy interface {
1719 ReadCursor (db ethdb.KeyValueReader ) * uint64
1820 WriteCursor (db ethdb.KeyValueWriter , cur uint64 )
1921
22+ // Head Persistency
23+ ReadPrunerHead (db ethdb.KeyValueReader ) * uint64
24+ WritePrunerHead (db ethdb.KeyValueWriter , head uint64 )
25+
2026 // Find earliest height <= cutoff that has data for this domain.
2127 FindEarliest (db ethdb.Database , cutoff uint64 ) (earliest uint64 , ok bool )
2228
@@ -37,10 +43,13 @@ type pruner struct {
3743 strategy Strategy
3844}
3945
40- // This pruner implements an online solution of pruning data. It demands the data to be pruned
41- // to be on the KV database. That's why, for example, if you prune blocks you need to disable ancient db
4246func NewPruner (db ethdb.Database , s Strategy ) * pruner {
43- return & pruner {db : db , strategy : s , quit : make (chan struct {}), stopped : make (chan struct {})}
47+ return & pruner {
48+ db : db ,
49+ strategy : s ,
50+ quit : make (chan struct {}),
51+ stopped : make (chan struct {}),
52+ }
4453}
4554
4655func (p * pruner ) Start () {
@@ -60,6 +69,7 @@ func (p *pruner) Start() {
6069 }()
6170 log .Info (p .strategy .Name ()+ ": started" , "retentionBlocks" , p .strategy .RetentionBlocks (), "interval" , p .strategy .Interval ().String ())
6271}
72+
6373func (p * pruner ) Close () error {
6474 close (p .quit )
6575 <- p .stopped
@@ -74,12 +84,35 @@ func (p *pruner) prune() {
7484 }
7585 latest := head .Number .Uint64 ()
7686
87+ prevHeadPtr := p .strategy .ReadPrunerHead (p .db )
88+ prevHead := latest
89+ if prevHeadPtr != nil {
90+ prevHead = * prevHeadPtr
91+ }
92+
93+ if latest < prevHead {
94+ // Reorg between prevHead and latest (could have happened while offline).
95+ if err := p .handleReorg (latest , prevHead ); err != nil {
96+ log .Error (p .strategy .Name ()+ ": reorg cleanup failed" , "newHead" , latest , "oldHead" , prevHead , "err" , err )
97+ // Do not update stored head; we want to try again next run.
98+ return
99+ }
100+ }
101+
77102 var cutoff uint64
78103 if rb := p .strategy .RetentionBlocks (); latest > rb {
79104 cutoff = latest - rb
80105 }
81106
82107 cur := p .strategy .ReadCursor (p .db )
108+
109+ if cur != nil && * cur > latest {
110+ log .Warn (p .strategy .Name ()+ ": cursor beyond head; clamping" , "cursor" , * cur , "head" , latest )
111+ p .strategy .WriteCursor (p .db , latest )
112+ tmp := latest
113+ cur = & tmp
114+ }
115+
83116 if cur == nil {
84117 if e , ok := p .strategy .FindEarliest (p .db , cutoff ); ok {
85118 log .Info (p .strategy .Name ()+ ": no cursor stored" , "earliestFound" , e )
@@ -90,45 +123,108 @@ func (p *pruner) prune() {
90123 log .Info (p .strategy .Name ()+ ": no data ≤ cutoff; starting at cutoff" , "cutoff" , cutoff )
91124 }
92125 }
126+
93127 if * cur >= cutoff {
94- batch := p .db .NewBatch ()
95- p .strategy .WriteCursor (batch , * cur )
96- _ = batch .Write ()
97- log .Info (p .strategy .Name ()+ ": no data to prune" , "cursor" , * cur , "cutoff" , cutoff )
128+ p .strategy .WritePrunerHead (p .db , latest )
98129 return
99130 }
100131
101- // Chunk to keep batches reasonable.
102- const step = uint64 (50_000 )
132+ // Normal pruning: delete [cur .. cutoff-1] inclusive, then move cursor to cutoff.
103133 from := * cur
104- for from < cutoff {
105- to := from + step
106- if to > cutoff {
107- to = cutoff
134+ to := cutoff - 1
135+
136+ if err := p .deleteRange (from , to ); err != nil {
137+ log .Error (p .strategy .Name ()+ ": batch write error during prune" , "from" , from , "to" , to , "err" , err )
138+ return
139+ }
140+
141+ p .strategy .WriteCursor (p .db , cutoff )
142+ log .Info (p .strategy .Name ()+ ": successfully pruned" , "count" , cutoff - from , "from" , from , "to" , to )
143+ p .strategy .WritePrunerHead (p .db , latest )
144+ }
145+
146+ func (p * pruner ) handleReorg (newHead , oldHead uint64 ) error {
147+ log .Warn (p .strategy .Name ()+ ": reorg detected" , "newHead" , newHead , "oldHead" , oldHead )
148+
149+ cur := p .strategy .ReadCursor (p .db )
150+ if cur == nil {
151+ cur = new (uint64 )
152+ * cur = 0 // meaning no pruned data yet
153+ }
154+
155+ // Range of non-canonical heights
156+ deleteFrom := max (* cur , newHead + 1 )
157+ deleteTo := oldHead
158+
159+ if deleteFrom <= deleteTo {
160+ log .Warn (p .strategy .Name ()+ ": deleting non-canonical data" ,
161+ "from" , deleteFrom , "to" , deleteTo )
162+
163+ if err := p .deleteRange (deleteFrom , deleteTo ); err != nil {
164+ log .Error (p .strategy .Name ()+ ": reorg cleanup failed; keeping cursor unchanged" ,
165+ "from" , deleteFrom , "to" , deleteTo , "err" , err )
166+ return err
108167 }
109- nhs := p .strategy .ReadNumberHashes (p .db , from , to - 1 )
168+ }
169+
170+ if * cur > newHead {
171+ p .strategy .WriteCursor (p .db , newHead )
172+ log .Warn (p .strategy .Name ()+ ": cursor rolled back" , "oldCursor" , * cur , "newCursor" , newHead )
173+ }
174+
175+ return nil
176+ }
177+
178+ func max (a , b uint64 ) uint64 {
179+ if a > b {
180+ return a
181+ }
182+ return b
183+ }
184+
185+ func (p * pruner ) deleteRange (from , to uint64 ) error {
186+ if from > to {
187+ return nil
188+ }
189+
190+ const step = MaxDeleteRangeSize
191+ start := from
192+
193+ for start <= to {
194+ end := start + step - 1
195+ if end > to {
196+ end = to
197+ }
198+
199+ nhs := p .strategy .ReadNumberHashes (p .db , start , end )
110200
111201 batch := p .db .NewBatch ()
112- var lastNum = ^ uint64 (0 ) // sentinel
202+ var lastNum = ^ uint64 (0 )
203+
113204 for _ , nh := range nhs {
114205 p .strategy .DeletePerHash (batch , nh .Number , nh .Hash )
206+
115207 if nh .Number != lastNum {
116- if lastNum != ^ uint64 (0 ) { // avoid multiple deletes per same height in case of multiple hashs in same height
208+ if lastNum != ^ uint64 (0 ) {
117209 p .strategy .DeletePerHeight (batch , lastNum )
118210 }
119211 lastNum = nh .Number
120212 }
121213 }
122- // flush last height if any rows existed
214+
123215 if lastNum != ^ uint64 (0 ) {
124216 p .strategy .DeletePerHeight (batch , lastNum )
125217 }
126- p . strategy . WriteCursor ( batch , to )
218+
127219 if err := batch .Write (); err != nil {
128- log .Error (p .strategy .Name ()+ ": batch write error" , "from" , from , "to" , to - 1 , "err" , err )
129- return
220+ log .Error (p .strategy .Name ()+ ": batch write error during reorg cleanup" ,
221+ "from" , start , "to" , end , "err" , err )
222+ return err
130223 }
131- log .Info (p .strategy .Name ()+ ": successfully pruned" , "count" , (to - 1 )- from , "from" , from , "to" , to - 1 )
132- from = to
224+
225+ log .Info (p .strategy .Name ()+ ": removed reverted data" , "from" , start , "to" , end )
226+
227+ start = end + 1
133228 }
229+ return nil
134230}
0 commit comments