Skip to content

Commit 518e885

Browse files
Wondertanrenaynay
andcommitted
refactor(store): DeleteTo via loading full headers
fix errors logs Update store/store.go Co-authored-by: rene <[email protected]>
1 parent 3b3271f commit 518e885

File tree

4 files changed

+96
-87
lines changed

4 files changed

+96
-87
lines changed

store/batch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (b *batch[H]) Has(hash header.Hash) bool {
8181
return ok
8282
}
8383

84-
// DeleteRange of headers from the batch.
84+
// DeleteRange of headers from the batch [from:to).
8585
func (b *batch[H]) DeleteRange(from, to uint64) {
8686
b.lk.Lock()
8787
defer b.lk.Unlock()

store/height_indexer.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package store
22

33
import (
44
"context"
5-
"fmt"
65

76
lru "github.com/hashicorp/golang-lru/v2"
87
"github.com/ipfs/go-datastore"
@@ -48,16 +47,3 @@ func (hi *heightIndexer[H]) HashByHeight(ctx context.Context, h uint64) (header.
4847
hi.cache.Add(h, header.Hash(val))
4948
return val, nil
5049
}
51-
52-
// deleteRange of heights from the index.
53-
func (hi *heightIndexer[H]) deleteRange(
54-
ctx context.Context, batch datastore.Batch, from, to uint64,
55-
) error {
56-
for h := from; h < to; h++ {
57-
if err := batch.Delete(ctx, heightKey(h)); err != nil {
58-
return fmt.Errorf("delete height key(%d): %w", h, err)
59-
}
60-
hi.cache.Remove(h)
61-
}
62-
return nil
63-
}

store/store.go

Lines changed: 89 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) {
260260
if err != nil {
261261
var zero H
262262
if errors.Is(err, datastore.ErrNotFound) {
263-
return zero, header.ErrNotFound
263+
return zero, fmt.Errorf("height %d: %w", height, header.ErrNotFound)
264264
}
265265

266266
return zero, err
@@ -378,110 +378,128 @@ func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
378378
if err := s.deleteRange(ctx, tail.Height(), to); err != nil {
379379
return fmt.Errorf("header/store: delete to height %d: %w", to, err)
380380
}
381-
return nil
382-
}
383-
384-
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) error {
385-
batch, err := s.ds.Batch(ctx)
386-
if err != nil {
387-
return fmt.Errorf("delete batch: %w", err)
388-
}
389381

390-
if err := s.prepareDeleteRangeBatch(ctx, batch, from, to); err != nil {
391-
return fmt.Errorf("prepare: %w", err)
392-
}
393-
394-
if err := s.heightIndex.deleteRange(ctx, batch, from, to); err != nil {
395-
return fmt.Errorf("height index: %w", err)
396-
}
397-
398-
err = s.updateTail(ctx, batch, to)
399-
if err != nil {
400-
return fmt.Errorf("update tail: %w", err)
401-
}
402-
403-
if err := batch.Commit(ctx); err != nil {
404-
return fmt.Errorf("delete commit: %w", err)
382+
if head.Height()+1 == to {
383+
// this is the case where we have deleted all the headers
384+
// wipe the store
385+
if err := s.wipe(ctx); err != nil {
386+
return fmt.Errorf("header/store: wipe: %w", err)
387+
}
405388
}
406389

407390
return nil
408391
}
409392

410-
func (s *Store[H]) prepareDeleteRangeBatch(
411-
ctx context.Context, batch datastore.Batch, from, to uint64,
412-
) error {
413-
for h := from; h < to; h++ {
414-
hash, err := s.heightIndex.HashByHeight(ctx, h)
415-
if err != nil {
416-
if errors.Is(err, datastore.ErrNotFound) {
417-
log.Warnw("removing non-existent header", "height", h)
393+
var maxHeadersLoadedPerDelete uint64 = 512
394+
395+
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) error {
396+
log.Infow("deleting headers", "from_height", from, "to_height", to)
397+
for from < to {
398+
amount := min(to-from, maxHeadersLoadedPerDelete)
399+
toDelete := from + amount
400+
headers := make([]H, 0, amount)
401+
402+
for height := from; height < toDelete; height++ {
403+
// take headers individually instead of range
404+
// as getRangeByHeight can't deal with potentially missing headers
405+
h, err := s.getByHeight(ctx, height)
406+
if errors.Is(err, header.ErrNotFound) {
407+
log.Warnf("header/store: attempt to delete header that's not found", height)
418408
continue
419409
}
420-
return fmt.Errorf("hash by height(%d): %w", h, err)
410+
if err != nil {
411+
return fmt.Errorf("getting header while deleting: %w", err)
412+
}
413+
414+
headers = append(headers, h)
421415
}
422-
s.cache.Remove(hash.String())
423416

424-
if err := batch.Delete(ctx, hashKey(hash)); err != nil {
425-
return fmt.Errorf("delete hash key: %w", err)
417+
batch, err := s.ds.Batch(ctx)
418+
if err != nil {
419+
return fmt.Errorf("new batch: %w", err)
426420
}
427-
}
428421

429-
s.pending.DeleteRange(from, to)
430-
return nil
431-
}
422+
for _, h := range headers {
423+
if err := batch.Delete(ctx, hashKey(h.Hash())); err != nil {
424+
return fmt.Errorf("delete hash key (%X): %w", h.Hash(), err)
425+
}
426+
if err := batch.Delete(ctx, heightKey(h.Height())); err != nil {
427+
return fmt.Errorf("delete height key (%d): %w", h.Height(), err)
428+
}
429+
}
432430

433-
func (s *Store[H]) updateTail(ctx context.Context, batch datastore.Batch, to uint64) error {
434-
head, err := s.Head(ctx)
435-
if err != nil {
436-
return err
437-
}
431+
err = s.setTail(ctx, batch, toDelete)
432+
if err != nil {
433+
return fmt.Errorf("setting tail to %d: %w", toDelete, err)
434+
}
438435

439-
newTail, err := s.getByHeight(ctx, to)
440-
if err != nil {
441-
if !errors.Is(err, header.ErrNotFound) {
442-
return fmt.Errorf("cannot fetch next tail: %w", err)
436+
if err := batch.Commit(ctx); err != nil {
437+
return fmt.Errorf("committing delete batch [%d:%d): %w", from, toDelete, err)
443438
}
444439

445-
if to <= head.Height() {
446-
return fmt.Errorf("attempt to delete to %d: %w", to, header.ErrNotFound)
440+
// cleanup caches after disk is flushed
441+
for _, h := range headers {
442+
s.cache.Remove(h.Hash().String())
443+
s.heightIndex.cache.Remove(h.Height())
447444
}
445+
s.pending.DeleteRange(from, toDelete)
448446

449-
// TODO(@Wondertan): this is racy, but not critical
450-
// Will be eventually fixed by
451-
// https://github.com/celestiaorg/go-header/issues/263
452-
// this is the case where we have deleted all the headers
453-
// deinit the store
454-
s.deinit()
447+
log.Infow("deleted header range", "from_height", from, "to_height", toDelete)
455448

456-
err = batch.Delete(ctx, headKey)
457-
if err != nil {
458-
return fmt.Errorf("deleting head in a batch: %w", err)
459-
}
460-
err = batch.Delete(ctx, tailKey)
461-
if err != nil {
462-
return fmt.Errorf("deleting tail in a batch: %w", err)
463-
}
449+
// move iterator
450+
from = toDelete
451+
}
464452

453+
return nil
454+
}
455+
456+
func (s *Store[H]) setTail(ctx context.Context, batch datastore.Batch, to uint64) error {
457+
newTail, err := s.getByHeight(ctx, to)
458+
if errors.Is(err, header.ErrNotFound) {
465459
return nil
466460
}
461+
if err != nil {
462+
return fmt.Errorf("getting tail: %w", err)
463+
}
467464

465+
// set directly to `to`, avoiding iteration in recedeTail
466+
s.tailHeader.Store(&newTail)
468467
if err := writeHeaderHashTo(ctx, batch, newTail, tailKey); err != nil {
469-
return fmt.Errorf("put tail in batch: %w", err)
468+
return fmt.Errorf("writing tailKey in batch: %w", err)
470469
}
471-
s.tailHeader.Store(&newTail)
472-
// do not recede tail, it must be equal to `to`
473470

474-
// update head as well if head, if delete went over it
471+
// update head as well, if delete went over it
472+
head, err := s.Head(ctx)
473+
if err != nil {
474+
return err
475+
}
475476
if to > head.Height() {
476477
if err := writeHeaderHashTo(ctx, batch, newTail, headKey); err != nil {
477-
return fmt.Errorf("put tail in batch: %w", err)
478+
return fmt.Errorf("writing headKey in batch: %w", err)
478479
}
479480
s.contiguousHead.Store(&newTail)
480481
s.advanceHead(ctx)
481482
}
482483
return nil
483484
}
484485

486+
func (s *Store[H]) wipe(ctx context.Context) (rerr error) {
487+
// TODO(@Wondertan): calling deinit here is racy, but not critical
488+
// Will be eventually fixed by
489+
// https://github.com/celestiaorg/go-header/issues/263
490+
s.deinit()
491+
492+
if err := s.ds.Delete(ctx, headKey); err != nil {
493+
rerr = errors.Join(rerr, fmt.Errorf("deleting headKey DB pointer: %w", err))
494+
}
495+
496+
if err := s.ds.Delete(ctx, tailKey); err != nil {
497+
rerr = errors.Join(rerr, fmt.Errorf("deleting tailKey DB pointer: %w", err))
498+
}
499+
500+
return rerr
501+
}
502+
485503
func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
486504
lh := len(headers)
487505
if lh == 0 {
@@ -651,7 +669,7 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) {
651669
if err != nil {
652670
s.metrics.readSingle(ctx, time.Since(startTime), true)
653671
if errors.Is(err, datastore.ErrNotFound) {
654-
return nil, header.ErrNotFound
672+
return nil, fmt.Errorf("hash %X: %w", hash, header.ErrNotFound)
655673
}
656674
return nil, err
657675
}

store/store_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,8 @@ func TestStore_GetRange(t *testing.T) {
526526
}
527527

528528
func TestStore_DeleteTo(t *testing.T) {
529+
maxHeadersLoadedPerDelete = 10
530+
529531
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
530532
t.Cleanup(cancel)
531533

@@ -601,6 +603,7 @@ func TestStore_DeleteTo(t *testing.T) {
601603
for h := from; h < tt.to; h++ {
602604
hash := hashes[h]
603605
assert.False(t, store.cache.Contains(hash.String()))
606+
assert.False(t, store.heightIndex.cache.Contains(h))
604607
assert.False(t, store.pending.Has(hash))
605608
}
606609

@@ -654,6 +657,8 @@ func TestStore_DeleteTo_EmptyStore(t *testing.T) {
654657
}
655658

656659
func TestStore_DeleteTo_MoveHeadAndTail(t *testing.T) {
660+
maxHeadersLoadedPerDelete = 1
661+
657662
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
658663
t.Cleanup(cancel)
659664

@@ -682,7 +687,7 @@ func TestStore_DeleteTo_MoveHeadAndTail(t *testing.T) {
682687
// assert store is not empty
683688
tail, err := store.Tail(ctx)
684689
require.NoError(t, err)
685-
assert.Equal(t, gap[len(gap)-1].Height()+1, tail.Height())
690+
assert.Equal(t, int(gap[len(gap)-1].Height()+1), int(tail.Height()))
686691
head, err := store.Head(ctx)
687692
require.NoError(t, err)
688693
assert.Equal(t, suite.Head().Height(), head.Height())

0 commit comments

Comments
 (0)