-
Notifications
You must be signed in to change notification settings - Fork 26
fix(store): track store's contiguous head #239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
46f8fad
cbcb738
4bc4aa0
9e19840
cc8894c
73b65f2
b74e825
2a317ad
1903695
268afc3
674e461
a68fd8a
7c3cc9b
5d89381
b267831
beda8d1
311561b
22a4b2f
26e6621
06e73c1
b506084
7b5b282
ac9eb8c
7a53bc7
3089d68
75b0141
8194f52
1375bd3
e9fbde6
07babe7
459baad
9f1c92b
d554c55
034e618
aa97c39
f2d860c
5518222
aef55c4
ac051e4
35856eb
73b5c89
84a7090
921de51
dc6d5bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,126 +5,133 @@ import ( | |
"errors" | ||
"sync" | ||
"sync/atomic" | ||
|
||
"github.com/celestiaorg/go-header" | ||
) | ||
|
||
// errElapsedHeight is thrown when a requested height was already provided to heightSub. | ||
var errElapsedHeight = errors.New("elapsed height") | ||
|
||
// heightSub provides a minimalistic mechanism to wait till header for a height becomes available. | ||
type heightSub[H header.Header[H]] struct { | ||
type heightSub struct { | ||
// height refers to the latest locally available header height | ||
// that has been fully verified and inserted into the subjective chain | ||
height atomic.Uint64 | ||
heightReqsLk sync.Mutex | ||
heightReqs map[uint64]map[chan H]struct{} | ||
heightSubsLk sync.Mutex | ||
heightSubs map[uint64]*sub | ||
} | ||
|
||
type sub struct { | ||
signal chan struct{} | ||
count int | ||
} | ||
|
||
// newHeightSub instantiates new heightSub. | ||
func newHeightSub[H header.Header[H]]() *heightSub[H] { | ||
return &heightSub[H]{ | ||
heightReqs: make(map[uint64]map[chan H]struct{}), | ||
func newHeightSub() *heightSub { | ||
return &heightSub{ | ||
heightSubs: make(map[uint64]*sub), | ||
} | ||
} | ||
|
||
// Init the heightSub with a given height. | ||
// Notifies all awaiting [Wait] calls lower than height. | ||
func (hs *heightSub) Init(height uint64) { | ||
hs.height.Store(height) | ||
|
||
hs.heightSubsLk.Lock() | ||
defer hs.heightSubsLk.Unlock() | ||
|
||
for h := range hs.heightSubs { | ||
if h < height { | ||
hs.notify(h, true) | ||
} | ||
Comment on lines
+42
to
+45
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In which situation would there be active subscriptions on an un-initialised heightsub instance (which means an uninitialised header store)? Just curious There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have test There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it necessary that heightsub should be able to be re-initialised with pre-existing subscriptions still on it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its not see #243 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be resolved? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean yes, it's just still unclear to me in which case there are waiters on a unintialised heightsub in current use of the lib |
||
} | ||
} | ||
|
||
// Height reports current height. | ||
func (hs *heightSub[H]) Height() uint64 { | ||
func (hs *heightSub) Height() uint64 { | ||
return hs.height.Load() | ||
} | ||
|
||
// SetHeight sets the new head height for heightSub. | ||
func (hs *heightSub[H]) SetHeight(height uint64) { | ||
hs.height.Store(height) | ||
// Notifies all awaiting [Wait] calls in range from [heightSub.Height] to height. | ||
func (hs *heightSub) SetHeight(height uint64) { | ||
for { | ||
curr := hs.height.Load() | ||
if curr >= height { | ||
return | ||
} | ||
if !hs.height.CompareAndSwap(curr, height) { | ||
continue | ||
} | ||
|
||
hs.heightSubsLk.Lock() | ||
defer hs.heightSubsLk.Unlock() //nolint:gocritic // we have a return below | ||
|
||
for ; curr <= height; curr++ { | ||
hs.notify(curr, true) | ||
} | ||
return | ||
} | ||
} | ||
|
||
// Sub subscribes for a header of a given height. | ||
// It can return errElapsedHeight, which means a requested header was already provided | ||
// Wait for a given height to be published. | ||
// It can return errElapsedHeight, which means a requested height was already seen | ||
// and caller should get it elsewhere. | ||
func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) { | ||
var zero H | ||
func (hs *heightSub) Wait(ctx context.Context, height uint64) error { | ||
if hs.Height() >= height { | ||
return zero, errElapsedHeight | ||
return errElapsedHeight | ||
} | ||
|
||
hs.heightReqsLk.Lock() | ||
hs.heightSubsLk.Lock() | ||
if hs.Height() >= height { | ||
// This is a rare case we have to account for. | ||
// The lock above can park a goroutine long enough for hs.height to change for a requested height, | ||
// leaving the request never fulfilled and the goroutine deadlocked. | ||
hs.heightReqsLk.Unlock() | ||
return zero, errElapsedHeight | ||
hs.heightSubsLk.Unlock() | ||
return errElapsedHeight | ||
} | ||
resp := make(chan H, 1) | ||
reqs, ok := hs.heightReqs[height] | ||
|
||
sac, ok := hs.heightSubs[height] | ||
if !ok { | ||
reqs = make(map[chan H]struct{}) | ||
hs.heightReqs[height] = reqs | ||
sac = &sub{ | ||
signal: make(chan struct{}, 1), | ||
} | ||
hs.heightSubs[height] = sac | ||
} | ||
reqs[resp] = struct{}{} | ||
hs.heightReqsLk.Unlock() | ||
sac.count++ | ||
hs.heightSubsLk.Unlock() | ||
|
||
select { | ||
case resp := <-resp: | ||
return resp, nil | ||
case <-sac.signal: | ||
return nil | ||
case <-ctx.Done(): | ||
// no need to keep the request, if the op has canceled | ||
hs.heightReqsLk.Lock() | ||
delete(reqs, resp) | ||
if len(reqs) == 0 { | ||
delete(hs.heightReqs, height) | ||
} | ||
hs.heightReqsLk.Unlock() | ||
return zero, ctx.Err() | ||
hs.heightSubsLk.Lock() | ||
hs.notify(height, false) | ||
hs.heightSubsLk.Unlock() | ||
return ctx.Err() | ||
} | ||
} | ||
|
||
// Pub processes all the outstanding subscriptions matching the given headers. | ||
// Pub is only safe when called from one goroutine. | ||
// For Pub to work correctly, heightSub has to be initialized with SetHeight | ||
// so that given headers are contiguous to the height on heightSub. | ||
func (hs *heightSub[H]) Pub(headers ...H) { | ||
ln := len(headers) | ||
if ln == 0 { | ||
return | ||
} | ||
// Notify and release the waiters in [Wait]. | ||
// Note: do not advance heightSub's height. | ||
func (hs *heightSub) Notify(heights ...uint64) { | ||
hs.heightSubsLk.Lock() | ||
defer hs.heightSubsLk.Unlock() | ||
|
||
height := hs.Height() | ||
from, to := headers[0].Height(), headers[ln-1].Height() | ||
if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1 | ||
log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from) | ||
return | ||
for _, h := range heights { | ||
hs.notify(h, true) | ||
} | ||
hs.SetHeight(to) | ||
|
||
hs.heightReqsLk.Lock() | ||
defer hs.heightReqsLk.Unlock() | ||
|
||
// there is a common case where we Pub only header | ||
// in this case, we shouldn't loop over each heightReqs | ||
// and instead read from the map directly | ||
if ln == 1 { | ||
reqs, ok := hs.heightReqs[from] | ||
if ok { | ||
for req := range reqs { | ||
req <- headers[0] // reqs must always be buffered, so this won't block | ||
} | ||
delete(hs.heightReqs, from) | ||
} | ||
} | ||
|
||
func (hs *heightSub) notify(height uint64, all bool) { | ||
sac, ok := hs.heightSubs[height] | ||
if !ok { | ||
return | ||
} | ||
|
||
// instead of looping over each header in 'headers', we can loop over each request | ||
// which will drastically decrease idle iterations, as there will be less requests than headers | ||
for height, reqs := range hs.heightReqs { | ||
// then we look if any of the requests match the given range of headers | ||
if height >= from && height <= to { | ||
// and if so, calculate its position and fulfill requests | ||
h := headers[height-from] | ||
for req := range reqs { | ||
req <- h // reqs must always be buffered, so this won't block | ||
} | ||
delete(hs.heightReqs, height) | ||
} | ||
sac.count-- | ||
if all || sac.count == 0 { | ||
close(sac.signal) | ||
delete(hs.heightSubs, height) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.