Skip to content
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
46f8fad
fix(store): track store's contiguous head
cristaloleg Jan 8, 2025
cbcb738
more fixes
cristaloleg Jan 8, 2025
4bc4aa0
fix test
cristaloleg Jan 8, 2025
9e19840
fix sync test
cristaloleg Jan 8, 2025
cc8894c
fix sync test
cristaloleg Jan 8, 2025
73b65f2
test heightSub
cristaloleg Jan 9, 2025
b74e825
simplify test
cristaloleg Jan 9, 2025
2a317ad
wip
cristaloleg Jan 10, 2025
1903695
cleanup
cristaloleg Jan 10, 2025
268afc3
simplify heightSub
cristaloleg Jan 10, 2025
674e461
fix tests
cristaloleg Jan 13, 2025
a68fd8a
fixes
cristaloleg Jan 13, 2025
7c3cc9b
fix ctx
cristaloleg Jan 13, 2025
5d89381
review suggestions
cristaloleg Jan 14, 2025
b267831
rebase
cristaloleg Jan 14, 2025
beda8d1
tests
cristaloleg Jan 15, 2025
311561b
more tests
cristaloleg Jan 15, 2025
22a4b2f
review suggestions
cristaloleg Jan 22, 2025
26e6621
small refactoring
cristaloleg Jan 22, 2025
06e73c1
add heightSub.Init
cristaloleg Jan 23, 2025
b506084
do 1 more fetch before subscribe
cristaloleg Jan 23, 2025
7b5b282
review suggestions
cristaloleg Jan 24, 2025
ac9eb8c
do advance at start
cristaloleg Jan 24, 2025
7a53bc7
load key on start
cristaloleg Jan 27, 2025
3089d68
fix finding
cristaloleg Jan 27, 2025
75b0141
review suggestions
cristaloleg Jan 27, 2025
8194f52
rename to a better name
cristaloleg Jan 27, 2025
1375bd3
even better names
cristaloleg Jan 27, 2025
e9fbde6
sky is the limit
cristaloleg Jan 27, 2025
07babe7
hehe
cristaloleg Jan 27, 2025
459baad
drop param
cristaloleg Jan 27, 2025
9f1c92b
review suggestions
cristaloleg Jan 30, 2025
d554c55
fix
cristaloleg Jan 31, 2025
034e618
revert
cristaloleg Jan 31, 2025
aa97c39
more review suggestions
cristaloleg Feb 4, 2025
f2d860c
simplify
cristaloleg Feb 4, 2025
5518222
simplify again
cristaloleg Feb 4, 2025
aef55c4
simplify
cristaloleg Feb 4, 2025
ac051e4
linter
cristaloleg Feb 4, 2025
35856eb
think different
cristaloleg Feb 4, 2025
73b5c89
more suggestions
cristaloleg Feb 5, 2025
84a7090
upd cmnt
cristaloleg Feb 5, 2025
921de51
revert head method
cristaloleg Feb 13, 2025
dc6d5bf
revert but a bit
cristaloleg Feb 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions p2p/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ func TestExchangeServer_handleRequestTimeout(t *testing.T) {
peer := createMocknet(t, 1)
s, err := store.NewStore[*headertest.DummyHeader](datastore.NewMapDatastore())
require.NoError(t, err)
head := headertest.RandDummyHeader(t)
head.HeightI %= 1000 // make it a bit lower
s.Init(context.Background(), head)
server, err := NewExchangeServer[*headertest.DummyHeader](
peer[0],
s,
Expand Down
6 changes: 5 additions & 1 deletion store/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ func (b *batch[H]) getByHeight(height uint64) H {
return zero
}

return b.headers[height-base-1]
h := b.headers[height-base-1]
if h.Height() == height {
return h
}
return zero
}

// Append appends new headers to the batch.
Expand Down
159 changes: 82 additions & 77 deletions store/heightsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,126 +5,131 @@ import (
"errors"
"sync"
"sync/atomic"

"github.com/celestiaorg/go-header"
)

// errElapsedHeight is thrown when a requested height was already provided to heightSub.
var errElapsedHeight = errors.New("elapsed height")

// heightSub provides a minimalistic mechanism to wait till header for a height becomes available.
type heightSub[H header.Header[H]] struct {
type heightSub struct {
// height refers to the latest locally available header height
// that has been fully verified and inserted into the subjective chain
height atomic.Uint64
heightReqsLk sync.Mutex
heightReqs map[uint64]map[chan H]struct{}
heightSubsLk sync.Mutex
heightSubs map[uint64]*sub
}

type sub struct {
signal chan struct{}
count int
}

// newHeightSub instantiates new heightSub.
func newHeightSub[H header.Header[H]]() *heightSub[H] {
return &heightSub[H]{
heightReqs: make(map[uint64]map[chan H]struct{}),
func newHeightSub() *heightSub {
return &heightSub{
heightSubs: make(map[uint64]*sub),
}
}

// Init the heightSub with a given height.
// Notifies all awaiting [Wait] calls lower than height.
func (hs *heightSub) Init(height uint64) {
hs.height.Store(height)

hs.heightSubsLk.Lock()
defer hs.heightSubsLk.Unlock()

for h := range hs.heightSubs {
if h < height {
hs.notify(h, true)
}
Comment on lines +42 to +45
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which situation would there be active subscriptions on an un-initialised heightsub instance (which means an uninitialised header store)? Just curious

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have test TestBatch_GetByHeightBeforeInit which can invoke a method on non-inited store. To keep this behaviour (at least in this RP) we need to handle this case properly in heightSub.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary that heightsub should be able to be re-initialised with pre-existing subscriptions still on it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not see #243

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be resolved?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean yes, it's just still unclear to me in which case there are waiters on a unintialised heightsub in current use of the lib

}
}

// Height reports current height.
func (hs *heightSub[H]) Height() uint64 {
func (hs *heightSub) Height() uint64 {
return hs.height.Load()
}

// SetHeight sets the new head height for heightSub.
func (hs *heightSub[H]) SetHeight(height uint64) {
hs.height.Store(height)
// Notifies all awaiting [Wait] calls in range from [heightSub.Height] to height.
func (hs *heightSub) SetHeight(height uint64) {
for {
curr := hs.height.Load()
if curr >= height {
return
}
if !hs.height.CompareAndSwap(curr, height) {
continue
}

hs.heightSubsLk.Lock()
defer hs.heightSubsLk.Unlock() //nolint:gocritic we have a return below

for ; curr <= height; curr++ {
hs.notify(curr, true)
}
return
}
}

// Sub subscribes for a header of a given height.
// It can return errElapsedHeight, which means a requested header was already provided
// Wait for a given height to be published.
// It can return errElapsedHeight, which means a requested height was already seen
// and caller should get it elsewhere.
func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) {
var zero H
func (hs *heightSub) Wait(ctx context.Context, height uint64) error {
if hs.Height() >= height {
return zero, errElapsedHeight
return errElapsedHeight
}

hs.heightReqsLk.Lock()
hs.heightSubsLk.Lock()
if hs.Height() >= height {
// This is a rare case we have to account for.
// The lock above can park a goroutine long enough for hs.height to change for a requested height,
// leaving the request never fulfilled and the goroutine deadlocked.
hs.heightReqsLk.Unlock()
return zero, errElapsedHeight
hs.heightSubsLk.Unlock()
return errElapsedHeight
}
resp := make(chan H, 1)
reqs, ok := hs.heightReqs[height]

sac, ok := hs.heightSubs[height]
if !ok {
reqs = make(map[chan H]struct{})
hs.heightReqs[height] = reqs
sac = &sub{
signal: make(chan struct{}, 1),
}
hs.heightSubs[height] = sac
}
reqs[resp] = struct{}{}
hs.heightReqsLk.Unlock()
sac.count++
hs.heightSubsLk.Unlock()

select {
case resp := <-resp:
return resp, nil
case <-sac.signal:
return nil
case <-ctx.Done():
// no need to keep the request, if the op has canceled
hs.heightReqsLk.Lock()
delete(reqs, resp)
if len(reqs) == 0 {
delete(hs.heightReqs, height)
}
hs.heightReqsLk.Unlock()
return zero, ctx.Err()
hs.heightSubsLk.Lock()
hs.notify(height, false)
hs.heightSubsLk.Unlock()
return ctx.Err()
}
}

// Pub processes all the outstanding subscriptions matching the given headers.
// Pub is only safe when called from one goroutine.
// For Pub to work correctly, heightSub has to be initialized with SetHeight
// so that given headers are contiguous to the height on heightSub.
func (hs *heightSub[H]) Pub(headers ...H) {
ln := len(headers)
if ln == 0 {
return
}
// Notify and release the waiters in [Wait].
// Note: do not advance heightSub's height.
func (hs *heightSub) Notify(height uint64) {
hs.heightSubsLk.Lock()
defer hs.heightSubsLk.Unlock()

height := hs.Height()
from, to := headers[0].Height(), headers[ln-1].Height()
if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1
log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from)
return
}
hs.SetHeight(to)

hs.heightReqsLk.Lock()
defer hs.heightReqsLk.Unlock()

// there is a common case where we Pub only header
// in this case, we shouldn't loop over each heightReqs
// and instead read from the map directly
if ln == 1 {
reqs, ok := hs.heightReqs[from]
if ok {
for req := range reqs {
req <- headers[0] // reqs must always be buffered, so this won't block
}
delete(hs.heightReqs, from)
}
hs.notify(height, true)
}

func (hs *heightSub) notify(height uint64, all bool) {
sac, ok := hs.heightSubs[height]
if !ok {
return
}

// instead of looping over each header in 'headers', we can loop over each request
// which will drastically decrease idle iterations, as there will be less requests than headers
for height, reqs := range hs.heightReqs {
// then we look if any of the requests match the given range of headers
if height >= from && height <= to {
// and if so, calculate its position and fulfill requests
h := headers[height-from]
for req := range reqs {
req <- h // reqs must always be buffered, so this won't block
}
delete(hs.heightReqs, height)
}
sac.count--
if all || sac.count == 0 {
close(sac.signal)
delete(hs.heightSubs, height)
}
}
Loading