Skip to content

Commit 518c7b5

Browse files
committed
feat(store): advance tail
1 parent 72b0cce commit 518c7b5

File tree

2 files changed

+74
-21
lines changed

2 files changed

+74
-21
lines changed

store/store.go

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,8 @@ func (s *Store[H]) flushLoop() {
435435
s.pending.Append(headers...)
436436
// always inform heightSub about new headers seen.
437437
s.heightSub.Notify(getHeights(headers...)...)
438-
// advance contiguousHead if we don't have gaps.
439-
s.advanceContiguousHead(ctx, s.heightSub.Height())
438+
// advance head and tail if we don't have gaps.
439+
s.advanceHeadAndTail(ctx)
440440
// don't flush and continue if pending batch is not grown enough,
441441
// and Store is not stopping(headers == nil)
442442
if s.pending.Len() < s.Params.WriteBatchSize && headers != nil {
@@ -550,33 +550,53 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) {
550550
return data, nil
551551
}
552552

553-
// advanceContiguousHead updates contiguousHead and heightSub if a higher
554-
// contiguous header exists on a disk.
555-
func (s *Store[H]) advanceContiguousHead(ctx context.Context, height uint64) {
556-
newHead := s.nextContiguousHead(ctx, height)
557-
if newHead.IsZero() || newHead.Height() <= height {
558-
return
553+
// advanceHeadAndTail moves contiguous Head and Tail if a new or older exists respectively.
554+
// It looks throw caches, pending headers and datastore to find the new Head and Tail.
555+
// TODO(@Wondertan): Beware of the performance penalty of this approach, which always makes a at least one
556+
// datastore lookup for both Tail and Head.
557+
func (s *Store[H]) advanceHeadAndTail(ctx context.Context) {
558+
newHead, changed := s.nextHead(ctx)
559+
if changed {
560+
s.contiguousHead.Store(&newHead)
561+
s.heightSub.SetHeight(newHead.Height())
562+
log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash())
563+
s.metrics.newHead(newHead.Height())
564+
}
565+
566+
newTail, changed := s.nextTail(ctx)
567+
if changed {
568+
s.tailHeader.Store(&newTail)
569+
log.Infow("new tail", "height", newTail.Height(), "hash", newTail.Hash())
570+
// TODO(@Wondertan): tail metric?
559571
}
572+
}
560573

561-
s.contiguousHead.Store(&newHead)
562-
s.heightSub.SetHeight(newHead.Height())
563-
log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash())
564-
s.metrics.newHead(newHead.Height())
574+
// nextHead finds the new contiguous Head by iterating the current Head up until the newer height Head is found.
575+
// Returns true if the newer one was found.
576+
func (s *Store[H]) nextHead(ctx context.Context) (head H, changed bool) {
577+
head = *s.contiguousHead.Load()
578+
for {
579+
h, err := s.getByHeight(ctx, head.Height()+1)
580+
if err != nil {
581+
return head, changed
582+
}
583+
head = h
584+
changed = true
585+
}
565586
}
566587

567-
// nextContiguousHead iterates up header by header until it finds a gap.
568-
// if height+1 header not found returns a default header.
569-
func (s *Store[H]) nextContiguousHead(ctx context.Context, height uint64) H {
570-
var newHead H
588+
// nextTail finds the new contiguous Tail by iterating the current Tail down until the older height Tail is found.
589+
// Returns true if the older one was found.
590+
func (s *Store[H]) nextTail(ctx context.Context) (tail H, changed bool) {
591+
tail = *s.tailHeader.Load()
571592
for {
572-
height++
573-
h, err := s.getByHeight(ctx, height)
593+
h, err := s.getByHeight(ctx, tail.Height()-1)
574594
if err != nil {
575-
break
595+
return tail, changed
576596
}
577-
newHead = h
597+
tail = h
598+
changed = true
578599
}
579-
return newHead
580600
}
581601

582602
func (s *Store[H]) loadHeadAndTail(ctx context.Context) error {

store/store_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,39 @@ func TestStore_Append(t *testing.T) {
192192
}, time.Second, time.Millisecond)
193193
}
194194

195+
func TestStore_Append_advanceTail(t *testing.T) {
196+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
197+
t.Cleanup(cancel)
198+
199+
suite := headertest.NewTestSuite(t)
200+
missing := suite.GenDummyHeaders(10)
201+
202+
ds := sync.MutexWrap(datastore.NewMapDatastore())
203+
store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(4))
204+
205+
// assert Tail is beyond missing headers
206+
tail, err := store.Tail(ctx)
207+
require.NoError(t, err)
208+
assert.Equal(t, tail.Height(), suite.Head().Height())
209+
210+
// append the first 5 headers creating a gap, and assert Tail is still beyond missing headers
211+
err = store.Append(ctx, missing[0:5]...)
212+
require.NoError(t, err)
213+
time.Sleep(10 * time.Millisecond)
214+
tail, err = store.Tail(ctx)
215+
require.NoError(t, err)
216+
assert.Equal(t, tail.Height(), suite.Head().Height())
217+
218+
// append the remaining 5 headers filling the gap, and assert Tail advanced over the missing headers
219+
// until the very first one
220+
err = store.Append(ctx, missing[5:10]...)
221+
require.NoError(t, err)
222+
time.Sleep(10 * time.Millisecond)
223+
tail, err = store.Tail(ctx)
224+
require.NoError(t, err)
225+
assert.Equal(t, tail.Height(), missing[0].Height())
226+
}
227+
195228
func TestStore_Append_stableHeadWhenGaps(t *testing.T) {
196229
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
197230
t.Cleanup(cancel)

0 commit comments

Comments
 (0)