Skip to content

Commit a465a27

Browse files
authored
fix(store): greedy heightSub cancellation (#168)
So, I was playing around with similar code and taking heightSub as an inspiration. While writing that code, I realized that heightSub has a nasty bug. Basically, a subscription cancellation removes the other active subs for the same height, leaving them blocked forever. We could observe some abnormal blockings in the past, which may explain it. I provided a test scenario that proves the issue existed and the fix works.
1 parent 6d0f9a4 commit a465a27

File tree

2 files changed

+52
-7
lines changed

2 files changed

+52
-7
lines changed

store/heightsub.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ type heightSub[H header.Header[H]] struct {
1818
// that has been fully verified and inserted into the subjective chain
1919
height atomic.Uint64
2020
heightReqsLk sync.Mutex
21-
heightReqs map[uint64][]chan H
21+
heightReqs map[uint64]map[chan H]struct{}
2222
}
2323

2424
// newHeightSub instantiates new heightSub.
2525
func newHeightSub[H header.Header[H]]() *heightSub[H] {
2626
return &heightSub[H]{
27-
heightReqs: make(map[uint64][]chan H),
27+
heightReqs: make(map[uint64]map[chan H]struct{}),
2828
}
2929
}
3030

@@ -56,16 +56,24 @@ func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) {
5656
return zero, errElapsedHeight
5757
}
5858
resp := make(chan H, 1)
59-
hs.heightReqs[height] = append(hs.heightReqs[height], resp)
59+
reqs, ok := hs.heightReqs[height]
60+
if !ok {
61+
reqs = make(map[chan H]struct{})
62+
hs.heightReqs[height] = reqs
63+
}
64+
reqs[resp] = struct{}{}
6065
hs.heightReqsLk.Unlock()
6166

6267
select {
6368
case resp := <-resp:
6469
return resp, nil
6570
case <-ctx.Done():
66-
// no need to keep the request, if the op is canceled
71+
// no need to keep the request, if the op has canceled
6772
hs.heightReqsLk.Lock()
68-
delete(hs.heightReqs, height)
73+
delete(reqs, resp)
74+
if len(reqs) == 0 {
75+
delete(hs.heightReqs, height)
76+
}
6977
hs.heightReqsLk.Unlock()
7078
return zero, ctx.Err()
7179
}
@@ -98,7 +106,7 @@ func (hs *heightSub[H]) Pub(headers ...H) {
98106
if ln == 1 {
99107
reqs, ok := hs.heightReqs[from]
100108
if ok {
101-
for _, req := range reqs {
109+
for req := range reqs {
102110
req <- headers[0] // reqs must always be buffered, so this won't block
103111
}
104112
delete(hs.heightReqs, from)
@@ -113,7 +121,7 @@ func (hs *heightSub[H]) Pub(headers ...H) {
113121
if height >= from && height <= to {
114122
// and if so, calculate its position and fulfill requests
115123
h := headers[height-from]
116-
for _, req := range reqs {
124+
for req := range reqs {
117125
req <- h // reqs must always be buffered, so this won't block
118126
}
119127
delete(hs.heightReqs, height)

store/heightsub_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,40 @@ func TestHeightSub(t *testing.T) {
4646
assert.NotNil(t, h)
4747
}
4848
}
49+
50+
func TestHeightSubCancellation(t *testing.T) {
51+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
52+
defer cancel()
53+
54+
h := headertest.RandDummyHeader(t)
55+
hs := newHeightSub[*headertest.DummyHeader]()
56+
57+
sub := make(chan *headertest.DummyHeader)
58+
go func() {
59+
// subscribe first time
60+
h, _ := hs.Sub(ctx, h.HeightI)
61+
sub <- h
62+
}()
63+
64+
// give a bit time for subscription to settle
65+
time.Sleep(time.Millisecond * 10)
66+
67+
// subscribe again but with failed canceled context
68+
canceledCtx, cancel := context.WithCancel(ctx)
69+
cancel()
70+
_, err := hs.Sub(canceledCtx, h.HeightI)
71+
assert.Error(t, err)
72+
73+
// publish header
74+
hs.Pub(h)
75+
76+
// ensure we still get our header
77+
select {
78+
case subH := <-sub:
79+
assert.Equal(t, h.HeightI, subH.HeightI)
80+
case <-ctx.Done():
81+
t.Error(ctx.Err())
82+
}
83+
// ensure we don't have any active subscriptions
84+
assert.Len(t, hs.heightReqs, 0)
85+
}

0 commit comments

Comments
 (0)