Skip to content

Commit 0479af3

Browse files
authored
feat(store)!: OnDelete method (#304)
Closes #301 Depends on #302
1 parent 77672f5 commit 0479af3

File tree

4 files changed

+75
-2
lines changed

4 files changed

+75
-2
lines changed

headertest/store.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"sync"
78
"testing"
89

910
"github.com/celestiaorg/go-header"
@@ -17,6 +18,9 @@ type Store[H header.Header[H]] struct {
1718
Headers map[uint64]H
1819
HeadHeight uint64
1920
TailHeight uint64
21+
22+
onDeleteMu sync.Mutex
23+
onDelete []func(context.Context, []H) error
2024
}
2125

2226
// NewDummyStore creates a store for DummyHeader.
@@ -75,15 +79,33 @@ func (m *Store[H]) GetByHeight(_ context.Context, height uint64) (H, error) {
7579
return zero, header.ErrNotFound
7680
}
7781

78-
func (m *Store[H]) DeleteTo(_ context.Context, to uint64) error {
82+
func (m *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
83+
var deleted []H
7984
for h := m.TailHeight; h < to; h++ {
80-
delete(m.Headers, h)
85+
hdr, ok := m.Headers[h]
86+
if ok {
87+
delete(m.Headers, h)
88+
deleted = append(deleted, hdr)
89+
}
8190
}
8291

8392
m.TailHeight = to
93+
for _, deleteFn := range m.onDelete {
94+
err := deleteFn(ctx, deleted)
95+
if err != nil {
96+
return err
97+
}
98+
}
8499
return nil
85100
}
86101

102+
func (m *Store[H]) OnDelete(fn func(context.Context, []H) error) {
103+
m.onDeleteMu.Lock()
104+
defer m.onDeleteMu.Unlock()
105+
106+
m.onDelete = append(m.onDelete, fn)
107+
}
108+
87109
func (m *Store[H]) GetRange(ctx context.Context, from, to uint64) ([]H, error) {
88110
return m.getRangeByHeight(ctx, from, to)
89111
}

interface.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ type Store[H Header[H]] interface {
8787

8888
// DeleteTo deletes the range [Tail():to).
8989
DeleteTo(ctx context.Context, to uint64) error
90+
91+
// OnDelete registers given handler to be called whenever headers are removed from the Store.
92+
OnDelete(func(context.Context, []H) error)
9093
}
9194

9295
// Getter contains the behavior necessary for a component to retrieve

p2p/server_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,3 +196,5 @@ func (timeoutStore[H]) DeleteTo(ctx context.Context, _ uint64) error {
196196
<-ctx.Done()
197197
return ctx.Err()
198198
}
199+
200+
func (timeoutStore[H]) OnDelete(fn func(context.Context, []H) error) {}

store/store.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"slices"
8+
"sync"
79
"sync/atomic"
810
"time"
911

@@ -58,6 +60,9 @@ type Store[H header.Header[H]] struct {
5860
// syncCh is a channel used to synchronize writes
5961
syncCh chan chan struct{}
6062

63+
onDeleteMu sync.Mutex
64+
onDelete []func(context.Context, []H) error
65+
6166
Params Parameters
6267
}
6368

@@ -252,6 +257,16 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {
252257
}
253258

254259
func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) {
260+
head, _ := s.Head(ctx)
261+
if !head.IsZero() && head.Height() == height {
262+
return head, nil
263+
}
264+
265+
tail, _ := s.Tail(ctx)
266+
if !tail.IsZero() && tail.Height() == height {
267+
return tail, nil
268+
}
269+
255270
if h := s.pending.GetByHeight(height); !h.IsZero() {
256271
return h, nil
257272
}
@@ -342,6 +357,21 @@ func (s *Store[H]) HasAt(ctx context.Context, height uint64) bool {
342357
return head.Height() >= height && height >= tail.Height()
343358
}
344359

360+
func (s *Store[H]) OnDelete(fn func(context.Context, []H) error) {
361+
s.onDeleteMu.Lock()
362+
defer s.onDeleteMu.Unlock()
363+
364+
s.onDelete = append(s.onDelete, func(ctx context.Context, h []H) (rerr error) {
365+
defer func() {
366+
err := recover()
367+
if err != nil {
368+
rerr = fmt.Errorf("header/store: user provided onDelete panicked with: %s", err)
369+
}
370+
}()
371+
return fn(ctx, h)
372+
})
373+
}
374+
345375
// DeleteTo implements [header.Store] interface.
346376
func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
347377
// ensure all the pending headers are synchronized
@@ -419,6 +449,22 @@ func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) error {
419449
return fmt.Errorf("new batch: %w", err)
420450
}
421451

452+
s.onDeleteMu.Lock()
453+
onDelete := slices.Clone(s.onDelete)
454+
s.onDeleteMu.Unlock()
455+
for _, deleteFn := range onDelete {
456+
if err := deleteFn(ctx, headers); err != nil {
457+
// abort deletion if onDelete handler fails
458+
// to ensure atomicity between stored headers and user specific data
459+
// TODO(@Wondertan): Batch is not actually atomic and could write some data at this point
460+
// but its fine for now: https://github.com/celestiaorg/go-header/issues/307
461+
// TODO2(@Wondertan): Once we move to txn, find a way to pass txn through context,
462+
// so that users can use it in their onDelete handlers
463+
// to ensure atomicity between deleted headers and user specific data
464+
return fmt.Errorf("on delete handler: %w", err)
465+
}
466+
}
467+
422468
for _, h := range headers {
423469
if err := batch.Delete(ctx, hashKey(h.Hash())); err != nil {
424470
return fmt.Errorf("delete hash key (%X): %w", h.Hash(), err)

0 commit comments

Comments
 (0)