@@ -10,8 +10,9 @@ import (
10
10
"sync/atomic"
11
11
"time"
12
12
13
- "github.com/celestiaorg/go-header"
14
13
"github.com/ipfs/go-datastore"
14
+
15
+ "github.com/celestiaorg/go-header"
15
16
)
16
17
17
18
// OnDelete implements [header.Store] interface.
@@ -23,7 +24,11 @@ func (s *Store[H]) OnDelete(fn func(context.Context, uint64) error) {
23
24
defer func () {
24
25
err := recover ()
25
26
if err != nil {
26
- rerr = fmt .Errorf ("header/store: user provided onDelete panicked on %d with: %s" , height , err )
27
+ rerr = fmt .Errorf (
28
+ "header/store: user provided onDelete panicked on %d with: %s" ,
29
+ height ,
30
+ err ,
31
+ )
27
32
}
28
33
}()
29
34
return fn (ctx , height )
@@ -129,7 +134,12 @@ func (s *Store[H]) deleteSequential(ctx context.Context, from, to uint64) (err e
129
134
}
130
135
131
136
// delete deletes a single header from the store, its caches and indexies, notifying any registered onDelete handlers.
132
- func (s * Store [H ]) delete (ctx context.Context , height uint64 , batch datastore.Batch , onDelete []func (ctx context.Context , height uint64 ) error ) error {
137
+ func (s * Store [H ]) delete (
138
+ ctx context.Context ,
139
+ height uint64 ,
140
+ batch datastore.Batch ,
141
+ onDelete []func (ctx context.Context , height uint64 ) error ,
142
+ ) error {
133
143
// some of the methods may not handle context cancellation properly
134
144
if ctx .Err () != nil {
135
145
return context .Cause (ctx )
@@ -182,15 +192,15 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (err err
182
192
// this prevents store's state corruption from partial deletion
183
193
sub := deadline .Sub (startTime ) / 100 * 95
184
194
var cancel context.CancelFunc
185
- deleteCtx , cancel = context .WithDeadlineCause (ctx , startTime .Add (sub ), deleteTimeoutError )
195
+ deleteCtx , cancel = context .WithDeadlineCause (ctx , startTime .Add (sub ), errDeleteTimeout )
186
196
defer cancel ()
187
197
}
188
198
189
199
var highestDeleted atomic.Uint64
190
200
defer func () {
191
201
newTailHeight := highestDeleted .Load () + 1
192
202
if err != nil {
193
- if errors .Is (err , deleteTimeoutError ) {
203
+ if errors .Is (err , errDeleteTimeout ) {
194
204
log .Warnw ("partial delete" ,
195
205
"from_height" , from ,
196
206
"expected_to_height" , to ,
@@ -244,29 +254,22 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (err err
244
254
}
245
255
}()
246
256
247
- for {
248
- select {
249
- case height , ok := <- jobCh :
250
- if ! ok {
251
- return
257
+ for height := range jobCh {
258
+ if err := s . delete ( deleteCtx , height , batch , onDelete ); err != nil {
259
+ select {
260
+ case errCh <- fmt . Errorf ( "delete header %d: %w" , height , err ):
261
+ default :
252
262
}
253
-
254
- if err := s .delete (deleteCtx , height , batch , onDelete ); err != nil {
255
- select {
256
- case errCh <- fmt .Errorf ("delete header %d: %w" , height , err ):
257
- default :
258
- }
259
- return
260
- }
261
-
262
- lastHeight = height
263
+ return
263
264
}
265
+
266
+ lastHeight = height
264
267
}
265
268
}
266
269
267
270
var wg sync.WaitGroup
268
- for i := 0 ; i < workerNum ; i ++ {
269
- wg . Add ( 1 )
271
+ wg . Add ( workerNum )
272
+ for range workerNum {
270
273
go func () {
271
274
defer wg .Done ()
272
275
worker ()
@@ -278,8 +281,8 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (err err
278
281
select {
279
282
case jobCh <- height :
280
283
i ++
281
- if i % 100000 == 0 {
282
- log .Debugf ("deleting %d header" , i )
284
+ if uint64 ( 1 ) % deleteRangeParallelThreshold == 0 {
285
+ log .Debugf ("deleting %dth header height %d " , deleteRangeParallelThreshold , height )
283
286
}
284
287
case err = <- errCh :
285
288
close (jobCh )
@@ -291,4 +294,4 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (err err
291
294
return err
292
295
}
293
296
294
- var deleteTimeoutError = errors .New ("delete timeout" )
297
+ var errDeleteTimeout = errors .New ("delete timeout" )
0 commit comments