4
4
"context"
5
5
"errors"
6
6
"fmt"
7
- "slices"
8
7
"sync"
9
8
"sync/atomic"
10
9
"time"
@@ -357,136 +356,7 @@ func (s *Store[H]) HasAt(ctx context.Context, height uint64) bool {
357
356
return head .Height () >= height && height >= tail .Height ()
358
357
}
359
358
360
- func (s * Store [H ]) OnDelete (fn func (context.Context , uint64 ) error ) {
361
- s .onDeleteMu .Lock ()
362
- defer s .onDeleteMu .Unlock ()
363
-
364
- s .onDelete = append (s .onDelete , func (ctx context.Context , height uint64 ) (rerr error ) {
365
- defer func () {
366
- err := recover ()
367
- if err != nil {
368
- rerr = fmt .Errorf ("header/store: user provided onDelete panicked on %d with: %s" , height , err )
369
- }
370
- }()
371
- return fn (ctx , height )
372
- })
373
- }
374
-
375
- // DeleteTo implements [header.Store] interface.
376
- func (s * Store [H ]) DeleteTo (ctx context.Context , to uint64 ) error {
377
- // ensure all the pending headers are synchronized
378
- err := s .Sync (ctx )
379
- if err != nil {
380
- return err
381
- }
382
-
383
- head , err := s .Head (ctx )
384
- if err != nil {
385
- return fmt .Errorf ("header/store: reading head: %w" , err )
386
- }
387
- if head .Height ()+ 1 < to {
388
- _ , err := s .getByHeight (ctx , to )
389
- if errors .Is (err , header .ErrNotFound ) {
390
- return fmt .Errorf (
391
- "header/store: delete to %d beyond current head(%d)" ,
392
- to ,
393
- head .Height (),
394
- )
395
- }
396
- if err != nil {
397
- return fmt .Errorf ("delete to potential new head: %w" , err )
398
- }
399
-
400
- // if `to` is bigger than the current head and is stored - allow delete, making `to` a new head
401
- }
402
-
403
- tail , err := s .Tail (ctx )
404
- if err != nil {
405
- return fmt .Errorf ("header/store: reading tail: %w" , err )
406
- }
407
- if tail .Height () >= to {
408
- return fmt .Errorf ("header/store: delete to %d below current tail(%d)" , to , tail .Height ())
409
- }
410
-
411
- if err := s .deleteRange (ctx , tail .Height (), to ); err != nil {
412
- return fmt .Errorf ("header/store: delete to height %d: %w" , to , err )
413
- }
414
-
415
- if head .Height ()+ 1 == to {
416
- // this is the case where we have deleted all the headers
417
- // wipe the store
418
- if err := s .wipe (ctx ); err != nil {
419
- return fmt .Errorf ("header/store: wipe: %w" , err )
420
- }
421
- }
422
-
423
- return nil
424
- }
425
-
426
- func (s * Store [H ]) deleteRange (ctx context.Context , from , to uint64 ) (rerr error ) {
427
- s .onDeleteMu .Lock ()
428
- onDelete := slices .Clone (s .onDelete )
429
- s .onDeleteMu .Unlock ()
430
-
431
- batch , err := s .ds .Batch (ctx )
432
- if err != nil {
433
- return fmt .Errorf ("new batch: %w" , err )
434
- }
435
-
436
- height := from
437
- defer func () {
438
- // make new context to always save progress
439
- ctx := context .Background ()
440
-
441
- log .Infow ("deleted headers" , "from_height" , from , "to_height" , to )
442
- newTailHeight := to
443
- if rerr != nil {
444
- log .Warnw ("partial delete with error" , "expected_to_height" , newTailHeight , "actual_to_height" , height , "err" , err )
445
- newTailHeight = height
446
- }
447
-
448
- err = s .setTail (ctx , batch , newTailHeight )
449
- if err != nil {
450
- rerr = errors .Join (rerr , fmt .Errorf ("setting tail to %d: %w" , newTailHeight , err ))
451
- }
452
-
453
- if err := batch .Commit (ctx ); err != nil {
454
- rerr = errors .Join (rerr , fmt .Errorf ("committing delete batch [%d:%d): %w" , from , newTailHeight , err ))
455
- }
456
- }()
457
-
458
- for ; height < to ; height ++ {
459
- hash , err := s .heightIndex .HashByHeight (ctx , height , false )
460
- if errors .Is (err , datastore .ErrNotFound ) {
461
- log .Warnf ("attempt to delete header that's not found" , "height" , height )
462
- continue
463
- }
464
- if err != nil {
465
- return fmt .Errorf ("hash by height %d: %w" , height , err )
466
- }
467
-
468
- for _ , deleteFn := range onDelete {
469
- if err := deleteFn (ctx , height ); err != nil {
470
- return fmt .Errorf ("on delete handler for %d: %w" , height , err )
471
- }
472
- }
473
-
474
- if err := batch .Delete (ctx , hashKey (hash )); err != nil {
475
- return fmt .Errorf ("delete hash key (%X): %w" , hash , err )
476
- }
477
- if err := batch .Delete (ctx , heightKey (height )); err != nil {
478
- return fmt .Errorf ("delete height key (%d): %w" , height , err )
479
- }
480
-
481
- s .cache .Remove (hash .String ())
482
- s .heightIndex .cache .Remove (height )
483
- s .pending .DeleteRange (height , height + 1 )
484
- }
485
-
486
- return nil
487
- }
488
-
489
- func (s * Store [H ]) setTail (ctx context.Context , batch datastore.Batch , to uint64 ) error {
359
+ func (s * Store [H ]) setTail (ctx context.Context , write datastore.Write , to uint64 ) error {
490
360
newTail , err := s .getByHeight (ctx , to )
491
361
if errors .Is (err , header .ErrNotFound ) {
492
362
return nil
@@ -497,19 +367,16 @@ func (s *Store[H]) setTail(ctx context.Context, batch datastore.Batch, to uint64
497
367
498
368
// set directly to `to`, avoiding iteration in recedeTail
499
369
s .tailHeader .Store (& newTail )
500
- if err := writeHeaderHashTo (ctx , batch , newTail , tailKey ); err != nil {
370
+ if err := writeHeaderHashTo (ctx , write , newTail , tailKey ); err != nil {
501
371
return fmt .Errorf ("writing tailKey in batch: %w" , err )
502
372
}
503
373
log .Infow ("new tail" , "height" , newTail .Height (), "hash" , newTail .Hash ())
504
374
s .metrics .newTail (newTail .Height ())
505
375
506
376
// update head as well, if delete went over it
507
- head , err := s .Head (ctx )
508
- if err != nil {
509
- return err
510
- }
511
- if to > head .Height () {
512
- if err := writeHeaderHashTo (ctx , batch , newTail , headKey ); err != nil {
377
+ head , _ := s .Head (ctx )
378
+ if head .IsZero () || to > head .Height () {
379
+ if err := writeHeaderHashTo (ctx , write , newTail , headKey ); err != nil {
513
380
return fmt .Errorf ("writing headKey in batch: %w" , err )
514
381
}
515
382
s .contiguousHead .Store (& newTail )
@@ -817,7 +684,7 @@ func (s *Store[H]) deinit() {
817
684
818
685
func writeHeaderHashTo [H header.Header [H ]](
819
686
ctx context.Context ,
820
- batch datastore.Batch ,
687
+ write datastore.Write ,
821
688
h H ,
822
689
key datastore.Key ,
823
690
) error {
@@ -826,7 +693,7 @@ func writeHeaderHashTo[H header.Header[H]](
826
693
return err
827
694
}
828
695
829
- if err := batch .Put (ctx , key , hashBytes ); err != nil {
696
+ if err := write .Put (ctx , key , hashBytes ); err != nil {
830
697
return err
831
698
}
832
699
0 commit comments