Skip to content

Commit 6418a17

Browse files
authored
feat(store): adjacency fix (#267)
Cherry-pick of f41d9f5
1 parent be8cb57 commit 6418a17

File tree

7 files changed

+592
-167
lines changed

7 files changed

+592
-167
lines changed

p2p/server_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ func TestExchangeServer_handleRequestTimeout(t *testing.T) {
1717
peer := createMocknet(t, 1)
1818
s, err := store.NewStore[*headertest.DummyHeader](datastore.NewMapDatastore())
1919
require.NoError(t, err)
20+
head := headertest.RandDummyHeader(t)
21+
head.HeightI %= 1000 // make it a bit lower
22+
err = s.Init(context.Background(), head)
23+
require.NoError(t, err)
2024
server, err := NewExchangeServer[*headertest.DummyHeader](
2125
peer[0],
2226
s,

store/batch.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"github.com/celestiaorg/go-header"
77
)
88

9-
// batch keeps an adjacent range of headers and loosely mimics the Store
9+
// batch keeps a range of headers and loosely mimics the Store
1010
// interface. NOTE: Can fully implement Store for a use case.
1111
//
1212
// It keeps a mapping 'height -> header' and 'hash -> height'
@@ -76,7 +76,11 @@ func (b *batch[H]) getByHeight(height uint64) H {
7676
return zero
7777
}
7878

79-
return b.headers[height-base-1]
79+
h := b.headers[height-base-1]
80+
if h.Height() == height {
81+
return h
82+
}
83+
return zero
8084
}
8185

8286
// Append appends new headers to the batch.

store/heightsub.go

Lines changed: 83 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -5,131 +5,133 @@ import (
55
"errors"
66
"sync"
77
"sync/atomic"
8-
9-
"github.com/celestiaorg/go-header"
108
)
119

1210
// errElapsedHeight is thrown when a requested height was already provided to heightSub.
1311
var errElapsedHeight = errors.New("elapsed height")
1412

1513
// heightSub provides a minimalistic mechanism to wait till header for a height becomes available.
16-
type heightSub[H header.Header[H]] struct {
14+
type heightSub struct {
1715
// height refers to the latest locally available header height
1816
// that has been fully verified and inserted into the subjective chain
1917
height atomic.Uint64
20-
heightReqsLk sync.Mutex
21-
heightReqs map[uint64]map[chan H]struct{}
18+
heightSubsLk sync.Mutex
19+
heightSubs map[uint64]*sub
20+
}
21+
22+
type sub struct {
23+
signal chan struct{}
24+
count int
2225
}
2326

2427
// newHeightSub instantiates new heightSub.
25-
func newHeightSub[H header.Header[H]]() *heightSub[H] {
26-
return &heightSub[H]{
27-
heightReqs: make(map[uint64]map[chan H]struct{}),
28+
func newHeightSub() *heightSub {
29+
return &heightSub{
30+
heightSubs: make(map[uint64]*sub),
31+
}
32+
}
33+
34+
// Init the heightSub with a given height.
35+
// Notifies all awaiting [Wait] calls lower than height.
36+
func (hs *heightSub) Init(height uint64) {
37+
hs.height.Store(height)
38+
39+
hs.heightSubsLk.Lock()
40+
defer hs.heightSubsLk.Unlock()
41+
42+
for h := range hs.heightSubs {
43+
if h < height {
44+
hs.notify(h, true)
45+
}
2846
}
2947
}
3048

3149
// Height reports current height.
32-
func (hs *heightSub[H]) Height() uint64 {
50+
func (hs *heightSub) Height() uint64 {
3351
return hs.height.Load()
3452
}
3553

3654
// SetHeight sets the new head height for heightSub.
37-
func (hs *heightSub[H]) SetHeight(height uint64) {
38-
hs.height.Store(height)
55+
// Notifies all awaiting [Wait] calls in range from [heightSub.Height] to height.
56+
func (hs *heightSub) SetHeight(height uint64) {
57+
for {
58+
curr := hs.height.Load()
59+
if curr >= height {
60+
return
61+
}
62+
if !hs.height.CompareAndSwap(curr, height) {
63+
continue
64+
}
65+
66+
hs.heightSubsLk.Lock()
67+
defer hs.heightSubsLk.Unlock()
68+
69+
for ; curr <= height; curr++ {
70+
hs.notify(curr, true)
71+
}
72+
return
73+
}
3974
}
4075

41-
// Sub subscribes for a header of a given height.
42-
// It can return errElapsedHeight, which means a requested header was already provided
76+
// Wait for a given height to be published.
77+
// It can return errElapsedHeight, which means a requested height was already seen
4378
// and caller should get it elsewhere.
44-
func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) {
45-
var zero H
79+
func (hs *heightSub) Wait(ctx context.Context, height uint64) error {
4680
if hs.Height() >= height {
47-
return zero, errElapsedHeight
81+
return errElapsedHeight
4882
}
4983

50-
hs.heightReqsLk.Lock()
84+
hs.heightSubsLk.Lock()
5185
if hs.Height() >= height {
5286
// This is a rare case we have to account for.
5387
// The lock above can park a goroutine long enough for hs.height to change for a requested height,
5488
// leaving the request never fulfilled and the goroutine deadlocked.
55-
hs.heightReqsLk.Unlock()
56-
return zero, errElapsedHeight
89+
hs.heightSubsLk.Unlock()
90+
return errElapsedHeight
5791
}
58-
resp := make(chan H, 1)
59-
reqs, ok := hs.heightReqs[height]
92+
93+
sac, ok := hs.heightSubs[height]
6094
if !ok {
61-
reqs = make(map[chan H]struct{})
62-
hs.heightReqs[height] = reqs
95+
sac = &sub{
96+
signal: make(chan struct{}, 1),
97+
}
98+
hs.heightSubs[height] = sac
6399
}
64-
reqs[resp] = struct{}{}
65-
hs.heightReqsLk.Unlock()
100+
sac.count++
101+
hs.heightSubsLk.Unlock()
66102

67103
select {
68-
case resp := <-resp:
69-
return resp, nil
104+
case <-sac.signal:
105+
return nil
70106
case <-ctx.Done():
71107
// no need to keep the request, if the op has canceled
72-
hs.heightReqsLk.Lock()
73-
delete(reqs, resp)
74-
if len(reqs) == 0 {
75-
delete(hs.heightReqs, height)
76-
}
77-
hs.heightReqsLk.Unlock()
78-
return zero, ctx.Err()
108+
hs.heightSubsLk.Lock()
109+
hs.notify(height, false)
110+
hs.heightSubsLk.Unlock()
111+
return ctx.Err()
79112
}
80113
}
81114

82-
// Pub processes all the outstanding subscriptions matching the given headers.
83-
// Pub is only safe when called from one goroutine.
84-
// For Pub to work correctly, heightSub has to be initialized with SetHeight
85-
// so that given headers are contiguous to the height on heightSub.
86-
func (hs *heightSub[H]) Pub(headers ...H) {
87-
ln := len(headers)
88-
if ln == 0 {
89-
return
90-
}
115+
// Notify and release the waiters in [Wait].
116+
// Note: do not advance heightSub's height.
117+
func (hs *heightSub) Notify(heights ...uint64) {
118+
hs.heightSubsLk.Lock()
119+
defer hs.heightSubsLk.Unlock()
91120

92-
height := hs.Height()
93-
from, to := headers[0].Height(), headers[ln-1].Height()
94-
if height+1 != from &&
95-
height != 0 { // height != 0 is needed to enable init from any height and not only 1
96-
log.Fatalf(
97-
"PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d",
98-
height+1,
99-
from,
100-
)
101-
return
121+
for _, h := range heights {
122+
hs.notify(h, true)
102123
}
103-
hs.SetHeight(to)
104-
105-
hs.heightReqsLk.Lock()
106-
defer hs.heightReqsLk.Unlock()
107-
108-
// there is a common case where we Pub only header
109-
// in this case, we shouldn't loop over each heightReqs
110-
// and instead read from the map directly
111-
if ln == 1 {
112-
reqs, ok := hs.heightReqs[from]
113-
if ok {
114-
for req := range reqs {
115-
req <- headers[0] // reqs must always be buffered, so this won't block
116-
}
117-
delete(hs.heightReqs, from)
118-
}
124+
}
125+
126+
func (hs *heightSub) notify(height uint64, all bool) {
127+
sac, ok := hs.heightSubs[height]
128+
if !ok {
119129
return
120130
}
121131

122-
// instead of looping over each header in 'headers', we can loop over each request
123-
// which will drastically decrease idle iterations, as there will be less requests than headers
124-
for height, reqs := range hs.heightReqs {
125-
// then we look if any of the requests match the given range of headers
126-
if height >= from && height <= to {
127-
// and if so, calculate its position and fulfill requests
128-
h := headers[height-from]
129-
for req := range reqs {
130-
req <- h // reqs must always be buffered, so this won't block
131-
}
132-
delete(hs.heightReqs, height)
133-
}
132+
sac.count--
133+
if all || sac.count == 0 {
134+
close(sac.signal)
135+
delete(hs.heightSubs, height)
134136
}
135137
}

0 commit comments

Comments
 (0)