Skip to content

Commit 31ef719

Browse files
committed
fix(store): support intermittent writes for batch
1 parent 34072ea commit 31ef719

File tree

2 files changed

+36
-32
lines changed

2 files changed

+36
-32
lines changed

store/batch.go

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ import (
1818
type batch[H header.Header[H]] struct {
1919
lk sync.RWMutex
2020
heights map[string]uint64
21-
headers []H
21+
headers map[uint64]H
2222
}
2323

2424
// newBatch creates the batch with the given pre-allocated size.
2525
func newBatch[H header.Header[H]](size int) *batch[H] {
2626
return &batch[H]{
2727
heights: make(map[string]uint64, size),
28-
headers: make([]H, 0, size),
28+
headers: make(map[uint64]H, size),
2929
}
3030
}
3131

@@ -40,7 +40,7 @@ func (b *batch[H]) Len() int {
4040
func (b *batch[H]) GetAll() []H {
4141
b.lk.RLock()
4242
defer b.lk.RUnlock()
43-
return b.headers
43+
return slices.Collect(maps.Values(b.headers))
4444
}
4545

4646
// Get returns a header by its hash.
@@ -53,44 +53,22 @@ func (b *batch[H]) Get(hash header.Hash) H {
5353
return zero
5454
}
5555

56-
return b.getByHeight(height)
56+
return b.headers[height]
5757
}
5858

5959
// GetByHeight returns a header by its height.
6060
func (b *batch[H]) GetByHeight(height uint64) H {
6161
b.lk.RLock()
6262
defer b.lk.RUnlock()
63-
return b.getByHeight(height)
64-
}
65-
66-
func (b *batch[H]) getByHeight(height uint64) H {
67-
var (
68-
ln = uint64(len(b.headers))
69-
zero H
70-
)
71-
if ln == 0 {
72-
return zero
73-
}
74-
75-
head := b.headers[ln-1].Height()
76-
base := head - ln
77-
if height > head || height <= base {
78-
return zero
79-
}
80-
81-
h := b.headers[height-base-1]
82-
if h.Height() == height {
83-
return h
84-
}
85-
return zero
63+
return b.headers[height]
8664
}
8765

8866
// Append appends new headers to the batch.
8967
func (b *batch[H]) Append(headers ...H) {
9068
b.lk.Lock()
9169
defer b.lk.Unlock()
9270
for _, h := range headers {
93-
b.headers = append(b.headers, h)
71+
b.headers[h.Height()] = h
9472
b.heights[h.Hash().String()] = h.Height()
9573
}
9674
}
@@ -111,9 +89,7 @@ func (b *batch[H]) DeleteRange(from, to uint64) {
11189
maps.DeleteFunc(b.heights, func(_ string, height uint64) bool {
11290
return from <= height && height < to
11391
})
114-
115-
b.headers = slices.DeleteFunc(b.headers, func(h H) bool {
116-
height := h.Height()
92+
maps.DeleteFunc(b.headers, func(height uint64, _ H) bool {
11793
return from <= height && height < to
11894
})
11995
}
@@ -122,8 +98,10 @@ func (b *batch[H]) DeleteRange(from, to uint64) {
12298
func (b *batch[H]) Reset() {
12399
b.lk.Lock()
124100
defer b.lk.Unlock()
125-
b.headers = b.headers[:0]
126101
for k := range b.heights {
127102
delete(b.heights, k)
128103
}
104+
for k := range b.headers {
105+
delete(b.headers, k)
106+
}
129107
}

store/store_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,32 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) {
394394
}
395395
}
396396

397+
func TestStoreGetByHeight_intermittentWrites(t *testing.T) {
398+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
399+
t.Cleanup(cancel)
400+
401+
suite := headertest.NewTestSuite(t)
402+
403+
ds := sync.MutexWrap(datastore.NewMapDatastore())
404+
store := NewTestStore(t, ctx, ds, suite.Head())
405+
406+
firstChunk := suite.GenDummyHeaders(10)
407+
_ = suite.GenDummyHeaders(10)
408+
secondChunk := suite.GenDummyHeaders(10)
409+
err := store.Append(ctx, firstChunk...)
410+
require.NoError(t, err)
411+
err = store.Append(ctx, secondChunk...)
412+
require.NoError(t, err)
413+
// wait for batch to be written
414+
time.Sleep(10 * time.Millisecond)
415+
416+
for _, expect := range firstChunk {
417+
have, err := store.GetByHeight(ctx, expect.HeightI)
418+
require.NoError(t, err)
419+
assert.Equal(t, expect.Height(), have.Height())
420+
}
421+
}
422+
397423
func TestStoreGetByHeight_earlyAvailable(t *testing.T) {
398424
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
399425
t.Cleanup(cancel)

0 commit comments

Comments
 (0)