Skip to content

Commit 6585d33

Browse files
committed
refactor(store): almost readless parallel deletes
Closes(#318)
1 parent 21b31ff commit 6585d33

File tree

7 files changed

+376
-177
lines changed

7 files changed

+376
-177
lines changed

headertest/store.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type Store[H header.Header[H]] struct {
2020
TailHeight uint64
2121

2222
onDeleteMu sync.Mutex
23-
onDelete []func(context.Context, []H) error
23+
onDelete []func(context.Context, uint64) error
2424
}
2525

2626
// NewDummyStore creates a store for DummyHeader.
@@ -80,26 +80,26 @@ func (m *Store[H]) GetByHeight(_ context.Context, height uint64) (H, error) {
8080
}
8181

8282
func (m *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
83-
var deleted []H
8483
for h := m.TailHeight; h < to; h++ {
85-
hdr, ok := m.Headers[h]
86-
if ok {
87-
delete(m.Headers, h)
88-
deleted = append(deleted, hdr)
84+
_, ok := m.Headers[h]
85+
if !ok {
86+
continue
8987
}
90-
}
9188

92-
m.TailHeight = to
93-
for _, deleteFn := range m.onDelete {
94-
err := deleteFn(ctx, deleted)
95-
if err != nil {
96-
return err
89+
for _, deleteFn := range m.onDelete {
90+
err := deleteFn(ctx, h)
91+
if err != nil {
92+
return err
93+
}
9794
}
95+
delete(m.Headers, h) // must be after deleteFn
9896
}
97+
98+
m.TailHeight = to
9999
return nil
100100
}
101101

102-
func (m *Store[H]) OnDelete(fn func(context.Context, []H) error) {
102+
func (m *Store[H]) OnDelete(fn func(context.Context, uint64) error) {
103103
m.onDeleteMu.Lock()
104104
defer m.onDeleteMu.Unlock()
105105

interface.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,10 @@ type Store[H Header[H]] interface {
8888
// DeleteTo deletes the range [Tail():to).
8989
DeleteTo(ctx context.Context, to uint64) error
9090

91-
// OnDelete registers given handler to be called whenever headers are removed from the Store.
92-
OnDelete(func(context.Context, []H) error)
91+
// OnDelete registers given handler to be called whenever a header with the height is being removed.
92+
// OnDelete guarantees that the header is accessible for the handler with GetByHeight and is removed
93+
// only after the handler terminates with nil error.
94+
OnDelete(handler func(ctx context.Context, height uint64) error)
9395
}
9496

9597
// Getter contains the behavior necessary for a component to retrieve

p2p/server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,4 +197,4 @@ func (timeoutStore[H]) DeleteTo(ctx context.Context, _ uint64) error {
197197
return ctx.Err()
198198
}
199199

200-
func (timeoutStore[H]) OnDelete(fn func(context.Context, []H) error) {}
200+
func (timeoutStore[H]) OnDelete(fn func(context.Context, uint64) error) {}

store/height_indexer.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ func newHeightIndexer[H header.Header[H]](
3434
}
3535

3636
// HashByHeight loads a header hash corresponding to the given height.
37-
func (hi *heightIndexer[H]) HashByHeight(ctx context.Context, h uint64) (header.Hash, error) {
37+
func (hi *heightIndexer[H]) HashByHeight(
38+
ctx context.Context,
39+
h uint64,
40+
cache bool,
41+
) (header.Hash, error) {
3842
if v, ok := hi.cache.Get(h); ok {
3943
return v, nil
4044
}
@@ -44,6 +48,8 @@ func (hi *heightIndexer[H]) HashByHeight(ctx context.Context, h uint64) (header.
4448
return nil, err
4549
}
4650

47-
hi.cache.Add(h, header.Hash(val))
51+
if cache {
52+
hi.cache.Add(h, header.Hash(val))
53+
}
4854
return val, nil
4955
}

store/store.go

Lines changed: 11 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"slices"
87
"sync"
98
"sync/atomic"
109
"time"
@@ -61,7 +60,7 @@ type Store[H header.Header[H]] struct {
6160
syncCh chan chan struct{}
6261

6362
onDeleteMu sync.Mutex
64-
onDelete []func(context.Context, []H) error
63+
onDelete []func(context.Context, uint64) error
6564

6665
Params Parameters
6766
}
@@ -271,7 +270,7 @@ func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) {
271270
return h, nil
272271
}
273272

274-
hash, err := s.heightIndex.HashByHeight(ctx, height)
273+
hash, err := s.heightIndex.HashByHeight(ctx, height, true)
275274
if err != nil {
276275
var zero H
277276
if errors.Is(err, datastore.ErrNotFound) {
@@ -357,149 +356,7 @@ func (s *Store[H]) HasAt(ctx context.Context, height uint64) bool {
357356
return head.Height() >= height && height >= tail.Height()
358357
}
359358

360-
func (s *Store[H]) OnDelete(fn func(context.Context, []H) error) {
361-
s.onDeleteMu.Lock()
362-
defer s.onDeleteMu.Unlock()
363-
364-
s.onDelete = append(s.onDelete, func(ctx context.Context, h []H) (rerr error) {
365-
defer func() {
366-
err := recover()
367-
if err != nil {
368-
rerr = fmt.Errorf("header/store: user provided onDelete panicked with: %s", err)
369-
}
370-
}()
371-
return fn(ctx, h)
372-
})
373-
}
374-
375-
// DeleteTo implements [header.Store] interface.
376-
func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
377-
// ensure all the pending headers are synchronized
378-
err := s.Sync(ctx)
379-
if err != nil {
380-
return err
381-
}
382-
383-
head, err := s.Head(ctx)
384-
if err != nil {
385-
return fmt.Errorf("header/store: reading head: %w", err)
386-
}
387-
if head.Height()+1 < to {
388-
_, err := s.getByHeight(ctx, to)
389-
if err != nil {
390-
return fmt.Errorf(
391-
"header/store: delete to %d beyond current head(%d)",
392-
to,
393-
head.Height(),
394-
)
395-
}
396-
397-
// if `to` is bigger than the current head and is stored - allow delete, making `to` a new head
398-
}
399-
400-
tail, err := s.Tail(ctx)
401-
if err != nil {
402-
return fmt.Errorf("header/store: reading tail: %w", err)
403-
}
404-
if tail.Height() >= to {
405-
return fmt.Errorf("header/store: delete to %d below current tail(%d)", to, tail.Height())
406-
}
407-
408-
if err := s.deleteRange(ctx, tail.Height(), to); err != nil {
409-
return fmt.Errorf("header/store: delete to height %d: %w", to, err)
410-
}
411-
412-
if head.Height()+1 == to {
413-
// this is the case where we have deleted all the headers
414-
// wipe the store
415-
if err := s.wipe(ctx); err != nil {
416-
return fmt.Errorf("header/store: wipe: %w", err)
417-
}
418-
}
419-
420-
return nil
421-
}
422-
423-
var maxHeadersLoadedPerDelete uint64 = 512
424-
425-
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) error {
426-
log.Infow("deleting headers", "from_height", from, "to_height", to)
427-
for from < to {
428-
amount := min(to-from, maxHeadersLoadedPerDelete)
429-
toDelete := from + amount
430-
headers := make([]H, 0, amount)
431-
432-
for height := from; height < toDelete; height++ {
433-
// take headers individually instead of range
434-
// as getRangeByHeight can't deal with potentially missing headers
435-
h, err := s.getByHeight(ctx, height)
436-
if errors.Is(err, header.ErrNotFound) {
437-
log.Warnf("header/store: attempt to delete header that's not found", height)
438-
continue
439-
}
440-
if err != nil {
441-
return fmt.Errorf("getting header while deleting: %w", err)
442-
}
443-
444-
headers = append(headers, h)
445-
}
446-
447-
batch, err := s.ds.Batch(ctx)
448-
if err != nil {
449-
return fmt.Errorf("new batch: %w", err)
450-
}
451-
452-
s.onDeleteMu.Lock()
453-
onDelete := slices.Clone(s.onDelete)
454-
s.onDeleteMu.Unlock()
455-
for _, deleteFn := range onDelete {
456-
if err := deleteFn(ctx, headers); err != nil {
457-
// abort deletion if onDelete handler fails
458-
// to ensure atomicity between stored headers and user specific data
459-
// TODO(@Wondertan): Batch is not actually atomic and could write some data at this point
460-
// but its fine for now: https://github.com/celestiaorg/go-header/issues/307
461-
// TODO2(@Wondertan): Once we move to txn, find a way to pass txn through context,
462-
// so that users can use it in their onDelete handlers
463-
// to ensure atomicity between deleted headers and user specific data
464-
return fmt.Errorf("on delete handler: %w", err)
465-
}
466-
}
467-
468-
for _, h := range headers {
469-
if err := batch.Delete(ctx, hashKey(h.Hash())); err != nil {
470-
return fmt.Errorf("delete hash key (%X): %w", h.Hash(), err)
471-
}
472-
if err := batch.Delete(ctx, heightKey(h.Height())); err != nil {
473-
return fmt.Errorf("delete height key (%d): %w", h.Height(), err)
474-
}
475-
}
476-
477-
err = s.setTail(ctx, batch, toDelete)
478-
if err != nil {
479-
return fmt.Errorf("setting tail to %d: %w", toDelete, err)
480-
}
481-
482-
if err := batch.Commit(ctx); err != nil {
483-
return fmt.Errorf("committing delete batch [%d:%d): %w", from, toDelete, err)
484-
}
485-
486-
// cleanup caches after disk is flushed
487-
for _, h := range headers {
488-
s.cache.Remove(h.Hash().String())
489-
s.heightIndex.cache.Remove(h.Height())
490-
}
491-
s.pending.DeleteRange(from, toDelete)
492-
493-
log.Infow("deleted header range", "from_height", from, "to_height", toDelete)
494-
495-
// move iterator
496-
from = toDelete
497-
}
498-
499-
return nil
500-
}
501-
502-
func (s *Store[H]) setTail(ctx context.Context, batch datastore.Batch, to uint64) error {
359+
func (s *Store[H]) setTail(ctx context.Context, write datastore.Write, to uint64) error {
503360
newTail, err := s.getByHeight(ctx, to)
504361
if errors.Is(err, header.ErrNotFound) {
505362
return nil
@@ -510,17 +367,16 @@ func (s *Store[H]) setTail(ctx context.Context, batch datastore.Batch, to uint64
510367

511368
// set directly to `to`, avoiding iteration in recedeTail
512369
s.tailHeader.Store(&newTail)
513-
if err := writeHeaderHashTo(ctx, batch, newTail, tailKey); err != nil {
370+
if err := writeHeaderHashTo(ctx, write, newTail, tailKey); err != nil {
514371
return fmt.Errorf("writing tailKey in batch: %w", err)
515372
}
373+
log.Infow("new tail", "height", newTail.Height(), "hash", newTail.Hash())
374+
s.metrics.newTail(newTail.Height())
516375

517376
// update head as well, if delete went over it
518-
head, err := s.Head(ctx)
519-
if err != nil {
520-
return err
521-
}
522-
if to > head.Height() {
523-
if err := writeHeaderHashTo(ctx, batch, newTail, headKey); err != nil {
377+
head, _ := s.Head(ctx)
378+
if head.IsZero() || to > head.Height() {
379+
if err := writeHeaderHashTo(ctx, write, newTail, headKey); err != nil {
524380
return fmt.Errorf("writing headKey in batch: %w", err)
525381
}
526382
s.contiguousHead.Store(&newTail)
@@ -828,7 +684,7 @@ func (s *Store[H]) deinit() {
828684

829685
func writeHeaderHashTo[H header.Header[H]](
830686
ctx context.Context,
831-
batch datastore.Batch,
687+
write datastore.Write,
832688
h H,
833689
key datastore.Key,
834690
) error {
@@ -837,7 +693,7 @@ func writeHeaderHashTo[H header.Header[H]](
837693
return err
838694
}
839695

840-
if err := batch.Put(ctx, key, hashBytes); err != nil {
696+
if err := write.Put(ctx, key, hashBytes); err != nil {
841697
return err
842698
}
843699

0 commit comments

Comments
 (0)