4
4
"context"
5
5
"errors"
6
6
"fmt"
7
- "slices"
8
7
"sync"
9
8
"sync/atomic"
10
9
"time"
@@ -15,7 +14,6 @@ import (
15
14
logging "github.com/ipfs/go-log/v2"
16
15
17
16
"github.com/celestiaorg/go-header"
18
- badger4 "github.com/ipfs/go-ds-badger4"
19
17
)
20
18
21
19
var log = logging .Logger ("header/store" )
@@ -358,193 +356,8 @@ func (s *Store[H]) HasAt(ctx context.Context, height uint64) bool {
358
356
return head .Height () >= height && height >= tail .Height ()
359
357
}
360
358
361
- func (s * Store [H ]) OnDelete (fn func (context.Context , uint64 ) error ) {
362
- s .onDeleteMu .Lock ()
363
- defer s .onDeleteMu .Unlock ()
364
-
365
- s .onDelete = append (s .onDelete , func (ctx context.Context , height uint64 ) (rerr error ) {
366
- defer func () {
367
- err := recover ()
368
- if err != nil {
369
- rerr = fmt .Errorf ("header/store: user provided onDelete panicked on %d with: %s" , height , err )
370
- }
371
- }()
372
- return fn (ctx , height )
373
- })
374
- }
375
-
376
- // DeleteTo implements [header.Store] interface.
377
- func (s * Store [H ]) DeleteTo (ctx context.Context , to uint64 ) error {
378
- // ensure all the pending headers are synchronized
379
- err := s .Sync (ctx )
380
- if err != nil {
381
- return err
382
- }
383
-
384
- head , err := s .Head (ctx )
385
- if err != nil {
386
- return fmt .Errorf ("header/store: reading head: %w" , err )
387
- }
388
- if head .Height ()+ 1 < to {
389
- _ , err := s .getByHeight (ctx , to )
390
- if errors .Is (err , header .ErrNotFound ) {
391
- return fmt .Errorf (
392
- "header/store: delete to %d beyond current head(%d)" ,
393
- to ,
394
- head .Height (),
395
- )
396
- }
397
- if err != nil {
398
- return fmt .Errorf ("delete to potential new head: %w" , err )
399
- }
400
-
401
- // if `to` is bigger than the current head and is stored - allow delete, making `to` a new head
402
- }
403
-
404
- tail , err := s .Tail (ctx )
405
- if err != nil {
406
- return fmt .Errorf ("header/store: reading tail: %w" , err )
407
- }
408
- if tail .Height () >= to {
409
- return fmt .Errorf ("header/store: delete to %d below current tail(%d)" , to , tail .Height ())
410
- }
411
-
412
- if err := s .deleteRange (ctx , tail .Height (), to ); err != nil {
413
- return fmt .Errorf ("header/store: delete to height %d: %w" , to , err )
414
- }
415
-
416
- if head .Height ()+ 1 == to {
417
- // this is the case where we have deleted all the headers
418
- // wipe the store
419
- if err := s .wipe (ctx ); err != nil {
420
- return fmt .Errorf ("header/store: wipe: %w" , err )
421
- }
422
- }
423
-
424
- return nil
425
- }
426
-
427
- var deleteTimeoutError = errors .New ("delete timeout" )
428
-
429
- // deleteRange deletes range of headers defined by from and to
430
- // it gracefully handles context and errors
431
- // attempting to save interrupted progress.
432
- func (s * Store [H ]) deleteRange (ctx context.Context , from , to uint64 ) (err error ) {
433
- s .onDeleteMu .Lock ()
434
- onDelete := slices .Clone (s .onDelete )
435
- s .onDeleteMu .Unlock ()
436
-
437
- batch , err := s .ds .Batch (ctx )
438
- if err != nil {
439
- return fmt .Errorf ("new batch: %w" , err )
440
- }
441
- ctx = badger4 .WithBatch (ctx , batch )
442
-
443
- startTime := time .Now ()
444
- deleteCtx := ctx
445
- if deadline , ok := ctx .Deadline (); ok {
446
- // allocate 95% of caller's set deadline for deletion
447
- // and give leftover to save progress
448
- // this prevents store's state corruption from partial deletion
449
- sub := deadline .Sub (startTime ) / 100 * 95
450
- var cancel context.CancelFunc
451
- deleteCtx , cancel = context .WithDeadlineCause (ctx , startTime .Add (sub ), deleteTimeoutError )
452
- defer cancel ()
453
- }
454
-
455
- log .Debugw ("starting delete range" , "from_height" , from , "to_height" , to )
456
-
457
- height := from
458
- defer func () {
459
- newTailHeight := to
460
- if err != nil {
461
- if errors .Is (err , deleteTimeoutError ) {
462
- log .Warnw ("partial delete" ,
463
- "from_height" , from ,
464
- "expected_to_height" , newTailHeight ,
465
- "actual_to_height" , height ,
466
- "took" , time .Since (startTime ),
467
- )
468
- } else {
469
- log .Errorw ("partial delete with error" ,
470
- "from_height" , from ,
471
- "expected_to_height" , newTailHeight ,
472
- "actual_to_height" , height ,
473
- "took" , time .Since (startTime ),
474
- "err" , err ,
475
- )
476
- }
477
-
478
- newTailHeight = height
479
- } else if to - from > 1 {
480
- log .Infow ("deleted headers" , "from_height" , from , "to_height" , to , "took" , time .Since (startTime ))
481
- }
482
-
483
- derr := s .setTail (ctx , batch , newTailHeight )
484
- if derr != nil {
485
- err = errors .Join (err , fmt .Errorf ("setting tail to %d: %w" , newTailHeight , derr ))
486
- }
487
-
488
- if derr := batch .Commit (ctx ); derr != nil {
489
- err = errors .Join (err , fmt .Errorf ("committing delete batch [%d:%d): %w" , from , newTailHeight , derr ))
490
- }
491
- }()
492
-
493
- for i := 0 ; height < to ; height ++ {
494
- if err := s .delete (deleteCtx , height , batch , onDelete ); err != nil {
495
- return fmt .Errorf ("delete header %d: %w" , height , err )
496
- }
497
-
498
- if i != 0 && i % 100000 == 0 {
499
- log .Debugf ("deleted %d headers" , i )
500
- }
501
-
502
- i ++
503
- }
504
-
505
- return nil
506
- }
507
-
508
- // delete deletes a single header from the store, its caches and indexies, notifying any registered onDelete handlers.
509
- func (s * Store [H ]) delete (ctx context.Context , height uint64 , batch datastore.Batch , onDelete []func (ctx context.Context , height uint64 ) error ) error {
510
- // some of the methods may not handle context cancellation properly
511
- if ctx .Err () != nil {
512
- return context .Cause (ctx )
513
- }
514
-
515
- hash , err := s .heightIndex .HashByHeight (ctx , height , false )
516
- if errors .Is (err , datastore .ErrNotFound ) {
517
- log .Warnf ("attempt to delete header that's not found" , "height" , height )
518
- return nil
519
- }
520
- if err != nil {
521
- return fmt .Errorf ("hash by height %d: %w" , height , err )
522
- }
523
-
524
- for _ , deleteFn := range onDelete {
525
- if err := deleteFn (ctx , height ); err != nil {
526
- return fmt .Errorf ("on delete handler for %d: %w" , height , err )
527
- }
528
- }
529
-
530
- if err := batch .Delete (ctx , hashKey (hash )); err != nil {
531
- return fmt .Errorf ("delete hash key (%X): %w" , hash , err )
532
- }
533
- if err := batch .Delete (ctx , heightKey (height )); err != nil {
534
- return fmt .Errorf ("delete height key (%d): %w" , height , err )
535
- }
536
-
537
- s .cache .Remove (hash .String ())
538
- s .heightIndex .cache .Remove (height )
539
- s .pending .DeleteRange (height , height + 1 )
540
- return nil
541
- }
542
-
543
- func (s * Store [H ]) setTail (ctx context.Context , batch datastore.Batch , to uint64 ) error {
359
+ func (s * Store [H ]) setTail (ctx context.Context , batch datastore.Write , to uint64 ) error {
544
360
newTail , err := s .getByHeight (ctx , to )
545
- if errors .Is (err , header .ErrNotFound ) {
546
- return nil
547
- }
548
361
if err != nil {
549
362
return fmt .Errorf ("getting tail: %w" , err )
550
363
}
@@ -872,7 +685,7 @@ func (s *Store[H]) deinit() {
872
685
873
686
func writeHeaderHashTo [H header.Header [H ]](
874
687
ctx context.Context ,
875
- batch datastore.Batch ,
688
+ batch datastore.Write ,
876
689
h H ,
877
690
key datastore.Key ,
878
691
) error {
0 commit comments