4
4
"context"
5
5
"errors"
6
6
"fmt"
7
- "slices"
8
7
"sync"
9
8
"sync/atomic"
10
9
"time"
@@ -61,7 +60,7 @@ type Store[H header.Header[H]] struct {
61
60
syncCh chan chan struct {}
62
61
63
62
onDeleteMu sync.Mutex
64
- onDelete []func (context.Context , [] H ) error
63
+ onDelete []func (context.Context , uint64 ) error
65
64
66
65
Params Parameters
67
66
}
@@ -271,7 +270,7 @@ func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) {
271
270
return h , nil
272
271
}
273
272
274
- hash , err := s .heightIndex .HashByHeight (ctx , height )
273
+ hash , err := s .heightIndex .HashByHeight (ctx , height , true )
275
274
if err != nil {
276
275
var zero H
277
276
if errors .Is (err , datastore .ErrNotFound ) {
@@ -357,149 +356,7 @@ func (s *Store[H]) HasAt(ctx context.Context, height uint64) bool {
357
356
return head .Height () >= height && height >= tail .Height ()
358
357
}
359
358
360
- func (s * Store [H ]) OnDelete (fn func (context.Context , []H ) error ) {
361
- s .onDeleteMu .Lock ()
362
- defer s .onDeleteMu .Unlock ()
363
-
364
- s .onDelete = append (s .onDelete , func (ctx context.Context , h []H ) (rerr error ) {
365
- defer func () {
366
- err := recover ()
367
- if err != nil {
368
- rerr = fmt .Errorf ("header/store: user provided onDelete panicked with: %s" , err )
369
- }
370
- }()
371
- return fn (ctx , h )
372
- })
373
- }
374
-
375
- // DeleteTo implements [header.Store] interface.
376
- func (s * Store [H ]) DeleteTo (ctx context.Context , to uint64 ) error {
377
- // ensure all the pending headers are synchronized
378
- err := s .Sync (ctx )
379
- if err != nil {
380
- return err
381
- }
382
-
383
- head , err := s .Head (ctx )
384
- if err != nil {
385
- return fmt .Errorf ("header/store: reading head: %w" , err )
386
- }
387
- if head .Height ()+ 1 < to {
388
- _ , err := s .getByHeight (ctx , to )
389
- if err != nil {
390
- return fmt .Errorf (
391
- "header/store: delete to %d beyond current head(%d)" ,
392
- to ,
393
- head .Height (),
394
- )
395
- }
396
-
397
- // if `to` is bigger than the current head and is stored - allow delete, making `to` a new head
398
- }
399
-
400
- tail , err := s .Tail (ctx )
401
- if err != nil {
402
- return fmt .Errorf ("header/store: reading tail: %w" , err )
403
- }
404
- if tail .Height () >= to {
405
- return fmt .Errorf ("header/store: delete to %d below current tail(%d)" , to , tail .Height ())
406
- }
407
-
408
- if err := s .deleteRange (ctx , tail .Height (), to ); err != nil {
409
- return fmt .Errorf ("header/store: delete to height %d: %w" , to , err )
410
- }
411
-
412
- if head .Height ()+ 1 == to {
413
- // this is the case where we have deleted all the headers
414
- // wipe the store
415
- if err := s .wipe (ctx ); err != nil {
416
- return fmt .Errorf ("header/store: wipe: %w" , err )
417
- }
418
- }
419
-
420
- return nil
421
- }
422
-
423
- var maxHeadersLoadedPerDelete uint64 = 512
424
-
425
- func (s * Store [H ]) deleteRange (ctx context.Context , from , to uint64 ) error {
426
- log .Infow ("deleting headers" , "from_height" , from , "to_height" , to )
427
- for from < to {
428
- amount := min (to - from , maxHeadersLoadedPerDelete )
429
- toDelete := from + amount
430
- headers := make ([]H , 0 , amount )
431
-
432
- for height := from ; height < toDelete ; height ++ {
433
- // take headers individually instead of range
434
- // as getRangeByHeight can't deal with potentially missing headers
435
- h , err := s .getByHeight (ctx , height )
436
- if errors .Is (err , header .ErrNotFound ) {
437
- log .Warnf ("header/store: attempt to delete header that's not found" , height )
438
- continue
439
- }
440
- if err != nil {
441
- return fmt .Errorf ("getting header while deleting: %w" , err )
442
- }
443
-
444
- headers = append (headers , h )
445
- }
446
-
447
- batch , err := s .ds .Batch (ctx )
448
- if err != nil {
449
- return fmt .Errorf ("new batch: %w" , err )
450
- }
451
-
452
- s .onDeleteMu .Lock ()
453
- onDelete := slices .Clone (s .onDelete )
454
- s .onDeleteMu .Unlock ()
455
- for _ , deleteFn := range onDelete {
456
- if err := deleteFn (ctx , headers ); err != nil {
457
- // abort deletion if onDelete handler fails
458
- // to ensure atomicity between stored headers and user specific data
459
- // TODO(@Wondertan): Batch is not actually atomic and could write some data at this point
460
- // but its fine for now: https://github.com/celestiaorg/go-header/issues/307
461
- // TODO2(@Wondertan): Once we move to txn, find a way to pass txn through context,
462
- // so that users can use it in their onDelete handlers
463
- // to ensure atomicity between deleted headers and user specific data
464
- return fmt .Errorf ("on delete handler: %w" , err )
465
- }
466
- }
467
-
468
- for _ , h := range headers {
469
- if err := batch .Delete (ctx , hashKey (h .Hash ())); err != nil {
470
- return fmt .Errorf ("delete hash key (%X): %w" , h .Hash (), err )
471
- }
472
- if err := batch .Delete (ctx , heightKey (h .Height ())); err != nil {
473
- return fmt .Errorf ("delete height key (%d): %w" , h .Height (), err )
474
- }
475
- }
476
-
477
- err = s .setTail (ctx , batch , toDelete )
478
- if err != nil {
479
- return fmt .Errorf ("setting tail to %d: %w" , toDelete , err )
480
- }
481
-
482
- if err := batch .Commit (ctx ); err != nil {
483
- return fmt .Errorf ("committing delete batch [%d:%d): %w" , from , toDelete , err )
484
- }
485
-
486
- // cleanup caches after disk is flushed
487
- for _ , h := range headers {
488
- s .cache .Remove (h .Hash ().String ())
489
- s .heightIndex .cache .Remove (h .Height ())
490
- }
491
- s .pending .DeleteRange (from , toDelete )
492
-
493
- log .Infow ("deleted header range" , "from_height" , from , "to_height" , toDelete )
494
-
495
- // move iterator
496
- from = toDelete
497
- }
498
-
499
- return nil
500
- }
501
-
502
- func (s * Store [H ]) setTail (ctx context.Context , batch datastore.Batch , to uint64 ) error {
359
+ func (s * Store [H ]) setTail (ctx context.Context , write datastore.Write , to uint64 ) error {
503
360
newTail , err := s .getByHeight (ctx , to )
504
361
if errors .Is (err , header .ErrNotFound ) {
505
362
return nil
@@ -510,17 +367,16 @@ func (s *Store[H]) setTail(ctx context.Context, batch datastore.Batch, to uint64
510
367
511
368
// set directly to `to`, avoiding iteration in recedeTail
512
369
s .tailHeader .Store (& newTail )
513
- if err := writeHeaderHashTo (ctx , batch , newTail , tailKey ); err != nil {
370
+ if err := writeHeaderHashTo (ctx , write , newTail , tailKey ); err != nil {
514
371
return fmt .Errorf ("writing tailKey in batch: %w" , err )
515
372
}
373
+ log .Infow ("new tail" , "height" , newTail .Height (), "hash" , newTail .Hash ())
374
+ s .metrics .newTail (newTail .Height ())
516
375
517
376
// update head as well, if delete went over it
518
- head , err := s .Head (ctx )
519
- if err != nil {
520
- return err
521
- }
522
- if to > head .Height () {
523
- if err := writeHeaderHashTo (ctx , batch , newTail , headKey ); err != nil {
377
+ head , _ := s .Head (ctx )
378
+ if head .IsZero () || to > head .Height () {
379
+ if err := writeHeaderHashTo (ctx , write , newTail , headKey ); err != nil {
524
380
return fmt .Errorf ("writing headKey in batch: %w" , err )
525
381
}
526
382
s .contiguousHead .Store (& newTail )
@@ -828,7 +684,7 @@ func (s *Store[H]) deinit() {
828
684
829
685
func writeHeaderHashTo [H header.Header [H ]](
830
686
ctx context.Context ,
831
- batch datastore.Batch ,
687
+ write datastore.Write ,
832
688
h H ,
833
689
key datastore.Key ,
834
690
) error {
@@ -837,7 +693,7 @@ func writeHeaderHashTo[H header.Header[H]](
837
693
return err
838
694
}
839
695
840
- if err := batch .Put (ctx , key , hashBytes ); err != nil {
696
+ if err := write .Put (ctx , key , hashBytes ); err != nil {
841
697
return err
842
698
}
843
699
0 commit comments