Skip to content

Commit f58fe79

Browse files
cristalolegWondertan
authored andcommitted
feat!(store): add DeleteTo and update tail accordingly (#275)
Fixes #257
1 parent 2b2d0f8 commit f58fe79

File tree

8 files changed

+254
-16
lines changed

8 files changed

+254
-16
lines changed

headertest/store.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ type Generator[H header.Header[H]] interface {
1616
type Store[H header.Header[H]] struct {
1717
Headers map[uint64]H
1818
HeadHeight uint64
19+
TailHeight uint64
1920
}
2021

2122
// NewDummyStore creates a store for DummyHeader.
@@ -52,16 +53,12 @@ func (m *Store[H]) Head(context.Context, ...header.HeadOption[H]) (H, error) {
5253
}
5354

5455
func (m *Store[H]) Tail(context.Context) (H, error) {
55-
err := header.ErrNotFound
56-
57-
var tail H
58-
for _, h := range m.Headers {
59-
if tail.IsZero() || h.Height() < tail.Height() {
60-
tail = h
61-
}
56+
tail, ok := m.Headers[m.TailHeight]
57+
if !ok {
58+
var zero H
59+
return zero, header.ErrNotFound
6260
}
63-
64-
return tail, err
61+
return tail, nil
6562
}
6663

6764
func (m *Store[H]) Get(_ context.Context, hash header.Hash) (H, error) {
@@ -82,6 +79,15 @@ func (m *Store[H]) GetByHeight(_ context.Context, height uint64) (H, error) {
8279
return zero, header.ErrNotFound
8380
}
8481

82+
func (m *Store[H]) DeleteTo(_ context.Context, to uint64) error {
83+
for h := m.TailHeight; h < to; h++ {
84+
delete(m.Headers, h)
85+
}
86+
87+
m.TailHeight = to
88+
return nil
89+
}
90+
8591
func (m *Store[H]) GetRange(ctx context.Context, from, to uint64) ([]H, error) {
8692
return m.getRangeByHeight(ctx, from, to)
8793
}
@@ -122,10 +128,13 @@ func (m *Store[H]) HasAt(_ context.Context, height uint64) bool {
122128

123129
func (m *Store[H]) Append(_ context.Context, headers ...H) error {
124130
for _, header := range headers {
125-
m.Headers[header.Height()] = header
126-
// set head
127-
if header.Height() > m.HeadHeight {
128-
m.HeadHeight = header.Height()
131+
height := header.Height()
132+
m.Headers[height] = header
133+
if height > m.HeadHeight {
134+
m.HeadHeight = height
135+
}
136+
if height < m.TailHeight {
137+
m.TailHeight = height
129138
}
130139
}
131140
return nil

interface.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ type Store[H Header[H]] interface {
8787

8888
// GetRange returns the range [from:to).
8989
GetRange(context.Context, uint64, uint64) ([]H, error)
90+
91+
// DeleteTo deletes the range [Tail():to).
92+
DeleteTo(ctx context.Context, to uint64) error
9093
}
9194

9295
// Getter contains the behavior necessary for a component to retrieve

p2p/server_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,3 +190,8 @@ func (timeoutStore[H]) GetRange(ctx context.Context, _, _ uint64) ([]H, error) {
190190
<-ctx.Done()
191191
return nil, ctx.Err()
192192
}
193+
194+
func (timeoutStore[H]) DeleteTo(ctx context.Context, _ uint64) error {
195+
<-ctx.Done()
196+
return ctx.Err()
197+
}

store/batch.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package store
22

33
import (
4+
"maps"
5+
"slices"
46
"sync"
57

68
"github.com/celestiaorg/go-header"
@@ -101,6 +103,21 @@ func (b *batch[H]) Has(hash header.Hash) bool {
101103
return ok
102104
}
103105

106+
// DeleteRange of headers from the batch.
107+
func (b *batch[H]) DeleteRange(from, to uint64) {
108+
b.lk.Lock()
109+
defer b.lk.Unlock()
110+
111+
maps.DeleteFunc(b.heights, func(_ string, height uint64) bool {
112+
return from <= height && height < to
113+
})
114+
115+
b.headers = slices.DeleteFunc(b.headers, func(h H) bool {
116+
height := h.Height()
117+
return from <= height && height < to
118+
})
119+
}
120+
104121
// Reset cleans references to batched headers.
105122
func (b *batch[H]) Reset() {
106123
b.lk.Lock()

store/height_indexer.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package store
22

33
import (
44
"context"
5+
"fmt"
56

67
lru "github.com/hashicorp/golang-lru/v2"
78
"github.com/ipfs/go-datastore"
@@ -47,3 +48,16 @@ func (hi *heightIndexer[H]) HashByHeight(ctx context.Context, h uint64) (header.
4748
hi.cache.Add(h, header.Hash(val))
4849
return val, nil
4950
}
51+
52+
// deleteRange of heights from the index.
53+
func (hi *heightIndexer[H]) deleteRange(
54+
ctx context.Context, batch datastore.Batch, from, to uint64,
55+
) error {
56+
for h := from; h < to; h++ {
57+
if err := batch.Delete(ctx, heightKey(h)); err != nil {
58+
return fmt.Errorf("delete height key(%d): %w", h, err)
59+
}
60+
hi.cache.Remove(h)
61+
}
62+
return nil
63+
}

store/keys.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,9 @@ func heightKey(h uint64) datastore.Key {
1818
}
1919

2020
func headerKey[H header.Header[H]](h H) datastore.Key {
21-
return datastore.NewKey(h.Hash().String())
21+
return hashKey(h.Hash())
22+
}
23+
24+
func hashKey(h header.Hash) datastore.Key {
25+
return datastore.NewKey(h.String())
2226
}

store/store.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,106 @@ func (s *Store[H]) HasAt(_ context.Context, height uint64) bool {
345345
return height != uint64(0) && s.Height() >= height
346346
}
347347

348+
// DeleteTo implements [header.Store] interface.
349+
func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
350+
var from uint64
351+
352+
if tailPtr := s.tailHeader.Load(); tailPtr != nil {
353+
from = (*tailPtr).Height()
354+
}
355+
if from >= to {
356+
return nil
357+
}
358+
if headPtr := s.contiguousHead.Load(); headPtr != nil {
359+
if height := (*headPtr).Height(); to > height {
360+
return fmt.Errorf("header/store: higher then head (%d vs %d)", to, height)
361+
}
362+
}
363+
364+
if from >= to {
365+
log.Debugw("header/store: attempt to delete empty range(%d, %d)", from, to)
366+
return nil
367+
}
368+
369+
if err := s.deleteRange(ctx, from, to); err != nil {
370+
return fmt.Errorf("header/store: delete to height %d: %w", to, err)
371+
}
372+
return nil
373+
}
374+
375+
func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) error {
376+
batch, err := s.ds.Batch(ctx)
377+
if err != nil {
378+
return fmt.Errorf("delete batch: %w", err)
379+
}
380+
381+
if err := s.prepareDeleteRangeBatch(ctx, batch, from, to); err != nil {
382+
return fmt.Errorf("prepare: %w", err)
383+
}
384+
385+
if err := s.heightIndex.deleteRange(ctx, batch, from, to); err != nil {
386+
return fmt.Errorf("height index: %w", err)
387+
}
388+
389+
newTail, err := s.updateTail(ctx, batch, to)
390+
if err != nil {
391+
return fmt.Errorf("update tail: %w", err)
392+
}
393+
394+
if err := batch.Commit(ctx); err != nil {
395+
return fmt.Errorf("delete commit: %w", err)
396+
}
397+
398+
s.tailHeader.Store(&newTail)
399+
return nil
400+
}
401+
402+
func (s *Store[H]) prepareDeleteRangeBatch(
403+
ctx context.Context, batch datastore.Batch, from, to uint64,
404+
) error {
405+
for h := from; h < to; h++ {
406+
hash, err := s.heightIndex.HashByHeight(ctx, h)
407+
if err != nil {
408+
if errors.Is(err, datastore.ErrNotFound) {
409+
log.Errorw("removing non-existent header", "height", h)
410+
continue
411+
}
412+
return fmt.Errorf("hash by height(%d): %w", h, err)
413+
}
414+
s.cache.Remove(hash.String())
415+
416+
if err := batch.Delete(ctx, hashKey(hash)); err != nil {
417+
return fmt.Errorf("delete hash key: %w", err)
418+
}
419+
}
420+
421+
s.pending.DeleteRange(from, to)
422+
return nil
423+
}
424+
425+
func (s *Store[H]) updateTail(
426+
ctx context.Context, batch datastore.Batch, to uint64,
427+
) (H, error) {
428+
var zero H
429+
430+
newTail, err := s.getByHeight(ctx, to)
431+
if err != nil {
432+
if !errors.Is(err, header.ErrNotFound) {
433+
return zero, fmt.Errorf("cannot fetch next tail: %w", err)
434+
}
435+
return zero, err
436+
}
437+
438+
b, err := newTail.Hash().MarshalJSON()
439+
if err != nil {
440+
return zero, err
441+
}
442+
if err := batch.Put(ctx, tailKey, b); err != nil {
443+
return zero, err
444+
}
445+
return newTail, nil
446+
}
447+
348448
func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
349449
lh := len(headers)
350450
if lh == 0 {

store/store_test.go

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ func TestStore(t *testing.T) {
2929

3030
assert.Equal(t, *store.tailHeader.Load(), suite.Head())
3131

32-
assert.Equal(t, *store.tailHeader.Load(), suite.Head())
33-
3432
head, err := store.Head(ctx)
3533
require.NoError(t, err)
3634
assert.EqualValues(t, suite.Head().Hash(), head.Hash())
@@ -470,6 +468,94 @@ func TestStore_GetRange(t *testing.T) {
470468
}
471469
}
472470

471+
func TestStore_DeleteTo(t *testing.T) {
472+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
473+
t.Cleanup(cancel)
474+
475+
suite := headertest.NewTestSuite(t)
476+
477+
ds := sync.MutexWrap(datastore.NewMapDatastore())
478+
store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10))
479+
480+
const count = 100
481+
in := suite.GenDummyHeaders(count)
482+
err := store.Append(ctx, in...)
483+
require.NoError(t, err)
484+
485+
hashes := make(map[uint64]header.Hash, count)
486+
for _, h := range in {
487+
hashes[h.Height()] = h.Hash()
488+
}
489+
490+
// wait until headers are written
491+
time.Sleep(100 * time.Millisecond)
492+
493+
tests := []struct {
494+
name string
495+
to uint64
496+
wantTail uint64
497+
wantError bool
498+
}{
499+
{
500+
name: "initial delete request",
501+
to: 14,
502+
wantTail: 14,
503+
wantError: false,
504+
},
505+
{
506+
name: "no-op delete request",
507+
to: 5,
508+
wantTail: 14,
509+
wantError: false,
510+
},
511+
{
512+
name: "valid delete request",
513+
to: 50,
514+
wantTail: 50,
515+
wantError: false,
516+
},
517+
{
518+
name: "higher than head",
519+
to: 1055,
520+
wantTail: 30,
521+
wantError: true,
522+
},
523+
}
524+
525+
for _, tt := range tests {
526+
t.Run(tt.name, func(t *testing.T) {
527+
from := (*store.tailHeader.Load()).Height()
528+
529+
// manually add something to the pending for assert at the bottom
530+
if idx := from - 2; idx < count {
531+
store.pending.Append(in[idx])
532+
defer store.pending.Reset()
533+
}
534+
535+
ctx, cancel := context.WithTimeout(ctx, time.Second)
536+
defer cancel()
537+
538+
err := store.DeleteTo(ctx, tt.to)
539+
if tt.wantError {
540+
assert.Error(t, err)
541+
return
542+
}
543+
require.NoError(t, err)
544+
545+
// check that cache and pending doesn't contain old headers
546+
for h := from; h < tt.to; h++ {
547+
hash := hashes[h]
548+
assert.False(t, store.cache.Contains(hash.String()))
549+
assert.False(t, store.pending.Has(hash))
550+
}
551+
552+
tail, err := store.Tail(ctx)
553+
require.NoError(t, err)
554+
require.EqualValues(t, tail.Height(), tt.wantTail)
555+
})
556+
}
557+
}
558+
473559
func TestStorePendingCacheMiss(t *testing.T) {
474560
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
475561
t.Cleanup(cancel)

0 commit comments

Comments
 (0)