Skip to content

Commit 53dd0fe

Browse files
committed
refactor(store): almost readless deletes
Closes(#318)
1 parent 21b31ff commit 53dd0fe

File tree

6 files changed

+127
-81
lines changed

6 files changed

+127
-81
lines changed

headertest/store.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type Store[H header.Header[H]] struct {
2020
TailHeight uint64
2121

2222
onDeleteMu sync.Mutex
23-
onDelete []func(context.Context, []H) error
23+
onDelete []func(context.Context, uint64) error
2424
}
2525

2626
// NewDummyStore creates a store for DummyHeader.
@@ -80,26 +80,26 @@ func (m *Store[H]) GetByHeight(_ context.Context, height uint64) (H, error) {
8080
}
8181

8282
func (m *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
83-
var deleted []H
8483
for h := m.TailHeight; h < to; h++ {
85-
hdr, ok := m.Headers[h]
86-
if ok {
87-
delete(m.Headers, h)
88-
deleted = append(deleted, hdr)
84+
_, ok := m.Headers[h]
85+
if !ok {
86+
continue
8987
}
90-
}
9188

92-
m.TailHeight = to
93-
for _, deleteFn := range m.onDelete {
94-
err := deleteFn(ctx, deleted)
95-
if err != nil {
96-
return err
89+
for _, deleteFn := range m.onDelete {
90+
err := deleteFn(ctx, h)
91+
if err != nil {
92+
return err
93+
}
9794
}
95+
delete(m.Headers, h) // must be after deleteFn
9896
}
97+
98+
m.TailHeight = to
9999
return nil
100100
}
101101

102-
func (m *Store[H]) OnDelete(fn func(context.Context, []H) error) {
102+
func (m *Store[H]) OnDelete(fn func(context.Context, uint64) error) {
103103
m.onDeleteMu.Lock()
104104
defer m.onDeleteMu.Unlock()
105105

interface.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,10 @@ type Store[H Header[H]] interface {
8888
// DeleteTo deletes the range [Tail():to).
8989
DeleteTo(ctx context.Context, to uint64) error
9090

91-
// OnDelete registers given handler to be called whenever headers are removed from the Store.
92-
OnDelete(func(context.Context, []H) error)
91+
// OnDelete registers given handler to be called whenever a header with the height is being removed.
92+
// OnDelete guarantees that the header is accessible for the handler with GetByHeight and is removed
93+
// only after the handler terminates with nil error.
94+
OnDelete(handler func(ctx context.Context, height uint64) error)
9395
}
9496

9597
// Getter contains the behavior necessary for a component to retrieve

p2p/server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,4 +197,4 @@ func (timeoutStore[H]) DeleteTo(ctx context.Context, _ uint64) error {
197197
return ctx.Err()
198198
}
199199

200-
func (timeoutStore[H]) OnDelete(fn func(context.Context, []H) error) {}
200+
func (timeoutStore[H]) OnDelete(fn func(context.Context, uint64) error) {}

store/height_indexer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func newHeightIndexer[H header.Header[H]](
3434
}
3535

3636
// HashByHeight loads a header hash corresponding to the given height.
37-
func (hi *heightIndexer[H]) HashByHeight(ctx context.Context, h uint64) (header.Hash, error) {
37+
func (hi *heightIndexer[H]) HashByHeight(ctx context.Context, h uint64, cache bool) (header.Hash, error) {
3838
if v, ok := hi.cache.Get(h); ok {
3939
return v, nil
4040
}
@@ -44,6 +44,8 @@ func (hi *heightIndexer[H]) HashByHeight(ctx context.Context, h uint64) (header.
4444
return nil, err
4545
}
4646

47-
hi.cache.Add(h, header.Hash(val))
47+
if cache {
48+
hi.cache.Add(h, header.Hash(val))
49+
}
4850
return val, nil
4951
}

store/store.go

Lines changed: 66 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"os"
78
"slices"
9+
"strconv"
810
"sync"
911
"sync/atomic"
1012
"time"
@@ -61,7 +63,7 @@ type Store[H header.Header[H]] struct {
6163
syncCh chan chan struct{}
6264

6365
onDeleteMu sync.Mutex
64-
onDelete []func(context.Context, []H) error
66+
onDelete []func(context.Context, uint64) error
6567

6668
Params Parameters
6769
}
@@ -271,7 +273,7 @@ func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) {
271273
return h, nil
272274
}
273275

274-
hash, err := s.heightIndex.HashByHeight(ctx, height)
276+
hash, err := s.heightIndex.HashByHeight(ctx, height, true)
275277
if err != nil {
276278
var zero H
277279
if errors.Is(err, datastore.ErrNotFound) {
@@ -357,18 +359,18 @@ func (s *Store[H]) HasAt(ctx context.Context, height uint64) bool {
357359
return head.Height() >= height && height >= tail.Height()
358360
}
359361

360-
func (s *Store[H]) OnDelete(fn func(context.Context, []H) error) {
362+
func (s *Store[H]) OnDelete(fn func(context.Context, uint64) error) {
361363
s.onDeleteMu.Lock()
362364
defer s.onDeleteMu.Unlock()
363365

364-
s.onDelete = append(s.onDelete, func(ctx context.Context, h []H) (rerr error) {
366+
s.onDelete = append(s.onDelete, func(ctx context.Context, height uint64) (rerr error) {
365367
defer func() {
366368
err := recover()
367369
if err != nil {
368-
rerr = fmt.Errorf("header/store: user provided onDelete panicked with: %s", err)
370+
rerr = fmt.Errorf("header/store: user provided onDelete panicked on %d with: %s", height, err)
369371
}
370372
}()
371-
return fn(ctx, h)
373+
return fn(ctx, height)
372374
})
373375
}
374376

@@ -386,13 +388,16 @@ func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
386388
}
387389
if head.Height()+1 < to {
388390
_, err := s.getByHeight(ctx, to)
389-
if err != nil {
391+
if errors.Is(err, header.ErrNotFound) {
390392
return fmt.Errorf(
391393
"header/store: delete to %d beyond current head(%d)",
392394
to,
393395
head.Height(),
394396
)
395397
}
398+
if err != nil {
399+
return fmt.Errorf("delete to potential new head: %w", err)
400+
}
396401

397402
// if `to` is bigger than the current head and is stored - allow delete, making `to` a new head
398403
}
@@ -420,82 +425,80 @@ func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
420425
return nil
421426
}
422427

423-
var maxHeadersLoadedPerDelete uint64 = 512
428+
var maxHeadersLoadedPerDelete uint64 = 1024
424429

425-
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) error {
426-
log.Infow("deleting headers", "from_height", from, "to_height", to)
427-
for from < to {
428-
amount := min(to-from, maxHeadersLoadedPerDelete)
429-
toDelete := from + amount
430-
headers := make([]H, 0, amount)
430+
func init() {
431+
v, ok := os.LookupEnv("HEADER_MAX_LOAD_PER_DELETE")
432+
if !ok {
433+
return
434+
}
431435

432-
for height := from; height < toDelete; height++ {
433-
// take headers individually instead of range
434-
// as getRangeByHeight can't deal with potentially missing headers
435-
h, err := s.getByHeight(ctx, height)
436-
if errors.Is(err, header.ErrNotFound) {
437-
log.Warnf("header/store: attempt to delete header that's not found", height)
438-
continue
439-
}
440-
if err != nil {
441-
return fmt.Errorf("getting header while deleting: %w", err)
442-
}
436+
max, err := strconv.Atoi(v)
437+
if err != nil {
438+
panic(err)
439+
}
440+
441+
maxHeadersLoadedPerDelete = uint64(max)
442+
}
443+
444+
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (rerr error) {
445+
s.onDeleteMu.Lock()
446+
onDelete := slices.Clone(s.onDelete)
447+
s.onDeleteMu.Unlock()
448+
449+
batch, err := s.ds.Batch(ctx)
450+
if err != nil {
451+
return fmt.Errorf("new batch: %w", err)
452+
}
443453

444-
headers = append(headers, h)
454+
height := from
455+
defer func() {
456+
// make new context to always save progress
457+
ctx := context.Background()
458+
newTailHeight := to
459+
if rerr != nil {
460+
newTailHeight = height
445461
}
446462

447-
batch, err := s.ds.Batch(ctx)
463+
err = s.setTail(ctx, batch, newTailHeight)
448464
if err != nil {
449-
return fmt.Errorf("new batch: %w", err)
465+
rerr = errors.Join(rerr, fmt.Errorf("setting tail to %d: %w", newTailHeight, err))
450466
}
451467

452-
s.onDeleteMu.Lock()
453-
onDelete := slices.Clone(s.onDelete)
454-
s.onDeleteMu.Unlock()
455-
for _, deleteFn := range onDelete {
456-
if err := deleteFn(ctx, headers); err != nil {
457-
// abort deletion if onDelete handler fails
458-
// to ensure atomicity between stored headers and user specific data
459-
// TODO(@Wondertan): Batch is not actually atomic and could write some data at this point
460-
// but its fine for now: https://github.com/celestiaorg/go-header/issues/307
461-
// TODO2(@Wondertan): Once we move to txn, find a way to pass txn through context,
462-
// so that users can use it in their onDelete handlers
463-
// to ensure atomicity between deleted headers and user specific data
464-
return fmt.Errorf("on delete handler: %w", err)
465-
}
468+
if err := batch.Commit(ctx); err != nil {
469+
rerr = errors.Join(rerr, fmt.Errorf("committing delete batch [%d:%d): %w", from, newTailHeight, err))
466470
}
471+
}()
467472

468-
for _, h := range headers {
469-
if err := batch.Delete(ctx, hashKey(h.Hash())); err != nil {
470-
return fmt.Errorf("delete hash key (%X): %w", h.Hash(), err)
471-
}
472-
if err := batch.Delete(ctx, heightKey(h.Height())); err != nil {
473-
return fmt.Errorf("delete height key (%d): %w", h.Height(), err)
474-
}
473+
for ; height < to; height++ {
474+
hash, err := s.heightIndex.HashByHeight(ctx, height, false)
475+
if errors.Is(err, datastore.ErrNotFound) {
476+
log.Warnf("attempt to delete header that's not found", "height", height)
477+
continue
475478
}
476-
477-
err = s.setTail(ctx, batch, toDelete)
478479
if err != nil {
479-
return fmt.Errorf("setting tail to %d: %w", toDelete, err)
480+
return fmt.Errorf("hash by height %d: %w", height, err)
480481
}
481482

482-
if err := batch.Commit(ctx); err != nil {
483-
return fmt.Errorf("committing delete batch [%d:%d): %w", from, toDelete, err)
483+
for _, deleteFn := range onDelete {
484+
if err := deleteFn(ctx, height); err != nil {
485+
return fmt.Errorf("on delete handler for %d: %w", height, err)
486+
}
484487
}
485488

486-
// cleanup caches after disk is flushed
487-
for _, h := range headers {
488-
s.cache.Remove(h.Hash().String())
489-
s.heightIndex.cache.Remove(h.Height())
489+
if err := batch.Delete(ctx, hashKey(hash)); err != nil {
490+
return fmt.Errorf("delete hash key (%X): %w", hash, err)
491+
}
492+
if err := batch.Delete(ctx, heightKey(height)); err != nil {
493+
return fmt.Errorf("delete height key (%d): %w", height, err)
490494
}
491-
s.pending.DeleteRange(from, toDelete)
492-
493-
log.Infow("deleted header range", "from_height", from, "to_height", toDelete)
494495

495-
// move iterator
496-
from = toDelete
496+
s.cache.Remove(hash.String())
497+
s.heightIndex.cache.Remove(height)
498+
s.pending.DeleteRange(height, height+1)
497499
}
498500

501+
log.Infow("deleted headers", "from_height", from, "to_height", to)
499502
return nil
500503
}
501504

store/store_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,45 @@ func TestStore_DeleteTo_Synchronized(t *testing.T) {
732732
require.EqualValues(t, 100, tail.Height())
733733
}
734734

735+
func TestStore_OnDelete(t *testing.T) {
736+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
737+
t.Cleanup(cancel)
738+
739+
suite := headertest.NewTestSuite(t)
740+
741+
ds := sync.MutexWrap(datastore.NewMapDatastore())
742+
store, err := NewStore[*headertest.DummyHeader](ds)
743+
require.NoError(t, err)
744+
745+
err = store.Start(ctx)
746+
require.NoError(t, err)
747+
748+
err = store.Append(ctx, suite.GenDummyHeaders(50)...)
749+
require.NoError(t, err)
750+
// artificial gap
751+
_ = suite.GenDummyHeaders(50)
752+
753+
err = store.Append(ctx, suite.GenDummyHeaders(50)...)
754+
require.NoError(t, err)
755+
756+
deleted := 0
757+
store.OnDelete(func(ctx context.Context, height uint64) error {
758+
hdr, err := store.GetByHeight(ctx, height)
759+
assert.NoError(t, err, "must be accessible")
760+
require.NotNil(t, hdr)
761+
deleted++
762+
return nil
763+
})
764+
765+
err = store.DeleteTo(ctx, 101)
766+
require.NoError(t, err)
767+
assert.Equal(t, 50, deleted)
768+
769+
hdr, err := store.GetByHeight(ctx, 50)
770+
assert.Error(t, err)
771+
assert.Nil(t, hdr)
772+
}
773+
735774
func TestStorePendingCacheMiss(t *testing.T) {
736775
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
737776
t.Cleanup(cancel)

0 commit comments

Comments
 (0)