diff --git a/p2p/server_test.go b/p2p/server_test.go index 1e896b2e..db2ab575 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -17,6 +17,9 @@ func TestExchangeServer_handleRequestTimeout(t *testing.T) { peer := createMocknet(t, 1) s, err := store.NewStore[*headertest.DummyHeader](datastore.NewMapDatastore()) require.NoError(t, err) + head := headertest.RandDummyHeader(t) + head.HeightI %= 1000 // make it a bit lower + s.Init(context.Background(), head) server, err := NewExchangeServer[*headertest.DummyHeader]( peer[0], s, diff --git a/store/batch.go b/store/batch.go index 7785953f..55bb529a 100644 --- a/store/batch.go +++ b/store/batch.go @@ -6,7 +6,7 @@ import ( "github.com/celestiaorg/go-header" ) -// batch keeps an adjacent range of headers and loosely mimics the Store +// batch keeps a range of headers and loosely mimics the Store // interface. NOTE: Can fully implement Store for a use case. // // It keeps a mapping 'height -> header' and 'hash -> height' @@ -76,7 +76,11 @@ func (b *batch[H]) getByHeight(height uint64) H { return zero } - return b.headers[height-base-1] + h := b.headers[height-base-1] + if h.Height() == height { + return h + } + return zero } // Append appends new headers to the batch. diff --git a/store/heightsub.go b/store/heightsub.go index 2335001d..1bed1d3f 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -5,126 +5,133 @@ import ( "errors" "sync" "sync/atomic" - - "github.com/celestiaorg/go-header" ) // errElapsedHeight is thrown when a requested height was already provided to heightSub. var errElapsedHeight = errors.New("elapsed height") // heightSub provides a minimalistic mechanism to wait till header for a height becomes available. -type heightSub[H header.Header[H]] struct { +type heightSub struct { // height refers to the latest locally available header height // that has been fully verified and inserted into the subjective chain height atomic.Uint64 - heightReqsLk sync.Mutex - heightReqs map[uint64]map[chan H]struct{} + heightSubsLk sync.Mutex + heightSubs map[uint64]*sub +} + +type sub struct { + signal chan struct{} + count int } // newHeightSub instantiates new heightSub. -func newHeightSub[H header.Header[H]]() *heightSub[H] { - return &heightSub[H]{ - heightReqs: make(map[uint64]map[chan H]struct{}), +func newHeightSub() *heightSub { + return &heightSub{ + heightSubs: make(map[uint64]*sub), + } +} + +// Init the heightSub with a given height. +// Notifies all awaiting [Wait] calls lower than height. +func (hs *heightSub) Init(height uint64) { + hs.height.Store(height) + + hs.heightSubsLk.Lock() + defer hs.heightSubsLk.Unlock() + + for h := range hs.heightSubs { + if h < height { + hs.notify(h, true) + } } } // Height reports current height. -func (hs *heightSub[H]) Height() uint64 { +func (hs *heightSub) Height() uint64 { return hs.height.Load() } // SetHeight sets the new head height for heightSub. -func (hs *heightSub[H]) SetHeight(height uint64) { - hs.height.Store(height) +// Notifies all awaiting [Wait] calls in range from [heightSub.Height] to height. +func (hs *heightSub) SetHeight(height uint64) { + for { + curr := hs.height.Load() + if curr >= height { + return + } + if !hs.height.CompareAndSwap(curr, height) { + continue + } + + hs.heightSubsLk.Lock() + defer hs.heightSubsLk.Unlock() //nolint:gocritic // we have a return below + + for ; curr <= height; curr++ { + hs.notify(curr, true) + } + return + } } -// Sub subscribes for a header of a given height. -// It can return errElapsedHeight, which means a requested header was already provided +// Wait for a given height to be published. +// It can return errElapsedHeight, which means a requested height was already seen // and caller should get it elsewhere. -func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) { - var zero H +func (hs *heightSub) Wait(ctx context.Context, height uint64) error { if hs.Height() >= height { - return zero, errElapsedHeight + return errElapsedHeight } - hs.heightReqsLk.Lock() + hs.heightSubsLk.Lock() if hs.Height() >= height { // This is a rare case we have to account for. // The lock above can park a goroutine long enough for hs.height to change for a requested height, // leaving the request never fulfilled and the goroutine deadlocked. - hs.heightReqsLk.Unlock() - return zero, errElapsedHeight + hs.heightSubsLk.Unlock() + return errElapsedHeight } - resp := make(chan H, 1) - reqs, ok := hs.heightReqs[height] + + sac, ok := hs.heightSubs[height] if !ok { - reqs = make(map[chan H]struct{}) - hs.heightReqs[height] = reqs + sac = &sub{ + signal: make(chan struct{}, 1), + } + hs.heightSubs[height] = sac } - reqs[resp] = struct{}{} - hs.heightReqsLk.Unlock() + sac.count++ + hs.heightSubsLk.Unlock() select { - case resp := <-resp: - return resp, nil + case <-sac.signal: + return nil case <-ctx.Done(): // no need to keep the request, if the op has canceled - hs.heightReqsLk.Lock() - delete(reqs, resp) - if len(reqs) == 0 { - delete(hs.heightReqs, height) - } - hs.heightReqsLk.Unlock() - return zero, ctx.Err() + hs.heightSubsLk.Lock() + hs.notify(height, false) + hs.heightSubsLk.Unlock() + return ctx.Err() } } -// Pub processes all the outstanding subscriptions matching the given headers. -// Pub is only safe when called from one goroutine. -// For Pub to work correctly, heightSub has to be initialized with SetHeight -// so that given headers are contiguous to the height on heightSub. -func (hs *heightSub[H]) Pub(headers ...H) { - ln := len(headers) - if ln == 0 { - return - } +// Notify and release the waiters in [Wait]. +// Note: do not advance heightSub's height. +func (hs *heightSub) Notify(heights ...uint64) { + hs.heightSubsLk.Lock() + defer hs.heightSubsLk.Unlock() - height := hs.Height() - from, to := headers[0].Height(), headers[ln-1].Height() - if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1 - log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from) - return + for _, h := range heights { + hs.notify(h, true) } - hs.SetHeight(to) - - hs.heightReqsLk.Lock() - defer hs.heightReqsLk.Unlock() - - // there is a common case where we Pub only header - // in this case, we shouldn't loop over each heightReqs - // and instead read from the map directly - if ln == 1 { - reqs, ok := hs.heightReqs[from] - if ok { - for req := range reqs { - req <- headers[0] // reqs must always be buffered, so this won't block - } - delete(hs.heightReqs, from) - } +} + +func (hs *heightSub) notify(height uint64, all bool) { + sac, ok := hs.heightSubs[height] + if !ok { return } - // instead of looping over each header in 'headers', we can loop over each request - // which will drastically decrease idle iterations, as there will be less requests than headers - for height, reqs := range hs.heightReqs { - // then we look if any of the requests match the given range of headers - if height >= from && height <= to { - // and if so, calculate its position and fulfill requests - h := headers[height-from] - for req := range reqs { - req <- h // reqs must always be buffered, so this won't block - } - delete(hs.heightReqs, height) - } + sac.count-- + if all || sac.count == 0 { + close(sac.signal) + delete(hs.heightSubs, height) } } diff --git a/store/heightsub_test.go b/store/heightsub_test.go index f5958422..6ef64a64 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -14,18 +14,14 @@ func TestHeightSub(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - hs := newHeightSub[*headertest.DummyHeader]() + hs := newHeightSub() // assert subscription returns nil for past heights { - h := headertest.RandDummyHeader(t) - h.HeightI = 100 - hs.SetHeight(99) - hs.Pub(h) + hs.Init(99) - h, err := hs.Sub(ctx, 10) + err := hs.Wait(ctx, 10) assert.ErrorIs(t, err, errElapsedHeight) - assert.Nil(t, h) } // assert actual subscription works @@ -34,16 +30,11 @@ func TestHeightSub(t *testing.T) { // fixes flakiness on CI time.Sleep(time.Millisecond) - h1 := headertest.RandDummyHeader(t) - h1.HeightI = 101 - h2 := headertest.RandDummyHeader(t) - h2.HeightI = 102 - hs.Pub(h1, h2) + hs.SetHeight(102) }() - h, err := hs.Sub(ctx, 101) + err := hs.Wait(ctx, 101) assert.NoError(t, err) - assert.NotNil(t, h) } // assert multiple subscriptions work @@ -51,16 +42,14 @@ func TestHeightSub(t *testing.T) { ch := make(chan error, 10) for range cap(ch) { go func() { - _, err := hs.Sub(ctx, 103) + err := hs.Wait(ctx, 103) ch <- err }() } time.Sleep(time.Millisecond * 10) - h3 := headertest.RandDummyHeader(t) - h3.HeightI = 103 - hs.Pub(h3) + hs.SetHeight(103) for range cap(ch) { assert.NoError(t, <-ch) @@ -68,18 +57,98 @@ func TestHeightSub(t *testing.T) { } } +func TestHeightSub_withWaitCancelled(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + hs := newHeightSub() + hs.Init(10) + + const waiters = 5 + + cancelChs := make([]chan error, waiters) + blockedChs := make([]chan error, waiters) + for i := range waiters { + cancelChs[i] = make(chan error, 1) + blockedChs[i] = make(chan error, 1) + + go func() { + ctx, cancel := context.WithTimeout(ctx, time.Duration(i+1)*time.Millisecond) + defer cancel() + + err := hs.Wait(ctx, 100) + cancelChs[i] <- err + }() + + go func() { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + err := hs.Wait(ctx, 100) + blockedChs[i] <- err + }() + } + + for i := range cancelChs { + err := <-cancelChs[i] + assert.ErrorIs(t, err, context.DeadlineExceeded) + } + + for i := range blockedChs { + select { + case <-blockedChs[i]: + t.Error("channel should be blocked") + default: + } + } +} + +// Test heightSub can accept non-adj headers without an error. +func TestHeightSubNonAdjacency(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + hs := newHeightSub() + hs.Init(99) + + go func() { + // fixes flakiness on CI + time.Sleep(time.Millisecond) + + hs.SetHeight(300) + }() + + err := hs.Wait(ctx, 200) + assert.NoError(t, err) +} + +// Test heightSub's height cannot go down but only up. +func TestHeightSub_monotonicHeight(t *testing.T) { + hs := newHeightSub() + + hs.Init(99) + assert.Equal(t, int64(hs.height.Load()), int64(99)) + + hs.SetHeight(300) + assert.Equal(t, int64(hs.height.Load()), int64(300)) + + hs.SetHeight(120) + assert.Equal(t, int64(hs.height.Load()), int64(300)) +} + func TestHeightSubCancellation(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() h := headertest.RandDummyHeader(t) - hs := newHeightSub[*headertest.DummyHeader]() + h.HeightI %= 1000 // make it a bit lower + hs := newHeightSub() - sub := make(chan *headertest.DummyHeader) + sub := make(chan struct{}) go func() { // subscribe first time - h, _ := hs.Sub(ctx, h.HeightI) - sub <- h + hs.Wait(ctx, h.Height()) + sub <- struct{}{} }() // give a bit time for subscription to settle @@ -88,19 +157,18 @@ func TestHeightSubCancellation(t *testing.T) { // subscribe again but with failed canceled context canceledCtx, cancel := context.WithCancel(ctx) cancel() - _, err := hs.Sub(canceledCtx, h.HeightI) - assert.Error(t, err) + err := hs.Wait(canceledCtx, h.Height()) + assert.ErrorIs(t, err, context.Canceled) - // publish header - hs.Pub(h) + // update height + hs.SetHeight(h.Height()) // ensure we still get our header select { - case subH := <-sub: - assert.Equal(t, h.HeightI, subH.HeightI) + case <-sub: case <-ctx.Done(): t.Error(ctx.Err()) } // ensure we don't have any active subscriptions - assert.Len(t, hs.heightReqs, 0) + assert.Len(t, hs.heightSubs, 0) } diff --git a/store/store.go b/store/store.go index 83303fd1..55f9ce34 100644 --- a/store/store.go +++ b/store/store.go @@ -41,7 +41,7 @@ type Store[H header.Header[H]] struct { heightIndex *heightIndexer[H] // manages current store read head height (1) and // allows callers to wait until header for a height is stored (2) - heightSub *heightSub[H] + heightSub *heightSub // writing to datastore // @@ -49,8 +49,8 @@ type Store[H header.Header[H]] struct { writes chan []H // signals when writes are finished writesDn chan struct{} - // writeHead maintains the current write head - writeHead atomic.Pointer[H] + // contiguousHead is the highest contiguous header observed + contiguousHead atomic.Pointer[H] // pending keeps headers pending to be written in one batch pending *batch[H] @@ -103,7 +103,7 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store cache: cache, metrics: metrics, heightIndex: index, - heightSub: newHeightSub[H](), + heightSub: newHeightSub(), writes: make(chan []H, 16), writesDn: make(chan struct{}), pending: newBatch[H](params.WriteBatchSize), @@ -115,6 +115,11 @@ func (s *Store[H]) Init(ctx context.Context, initial H) error { if s.heightSub.Height() != 0 { return errors.New("store already initialized") } + + // initialize with the initial head before first flush. + s.contiguousHead.Store(&initial) + s.heightSub.Init(initial.Height()) + // trust the given header as the initial head err := s.flush(ctx, initial) if err != nil { @@ -122,11 +127,10 @@ func (s *Store[H]) Init(ctx context.Context, initial H) error { } log.Infow("initialized head", "height", initial.Height(), "hash", initial.Hash()) - s.heightSub.Pub(initial) return nil } -func (s *Store[H]) Start(context.Context) error { +func (s *Store[H]) Start(ctx context.Context) error { // closed s.writesDn means that store was stopped before, recreate chan. select { case <-s.writesDn: @@ -134,6 +138,13 @@ func (s *Store[H]) Start(context.Context) error { default: } + if err := s.loadContiguousHead(ctx); err != nil { + // we might start on an empty datastore, no key is okay. + if !errors.Is(err, datastore.ErrNotFound) { + return fmt.Errorf("header/store: cannot load headKey: %w", err) + } + } + go s.flushLoop() return nil } @@ -168,6 +179,10 @@ func (s *Store[H]) Height() uint64 { } func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) { + if head := s.contiguousHead.Load(); head != nil { + return *head, nil + } + head, err := s.GetByHeight(ctx, s.heightSub.Height()) if err == nil { return head, nil @@ -217,22 +232,33 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { if height == 0 { return zero, errors.New("header/store: height must be bigger than zero") } + + if h, err := s.getByHeight(ctx, height); err == nil { + return h, nil + } + // if the requested 'height' was not yet published // we subscribe to it - h, err := s.heightSub.Sub(ctx, height) - if !errors.Is(err, errElapsedHeight) { - return h, err + err := s.heightSub.Wait(ctx, height) + if err != nil && !errors.Is(err, errElapsedHeight) { + return zero, err } // otherwise, the errElapsedHeight is thrown, // which means the requested 'height' should be present // // check if the requested header is not yet written on disk + + return s.getByHeight(ctx, height) +} + +func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) { if h := s.pending.GetByHeight(height); !h.IsZero() { return h, nil } hash, err := s.heightIndex.HashByHeight(ctx, height) if err != nil { + var zero H if errors.Is(err, datastore.ErrNotFound) { return zero, header.ErrNotFound } @@ -304,23 +330,15 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { return nil } - var err error - // take current write head to verify headers against - var head H - headPtr := s.writeHead.Load() - if headPtr == nil { - head, err = s.Head(ctx) - if err != nil { - return err - } - } else { - head = *headPtr + // take current contiguous head to verify headers against + head, err := s.Head(ctx) + if err != nil { + return err } // collect valid headers verified := make([]H, 0, lh) for i, h := range headers { - err = head.Verify(h) if err != nil { var verErr *header.VerifyError @@ -344,19 +362,11 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { head = h } - onWrite := func() { - newHead := verified[len(verified)-1] - s.writeHead.Store(&newHead) - log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) - s.metrics.newHead(newHead.Height()) - } - // queue headers to be written on disk select { case s.writes <- verified: // we return an error here after writing, // as there might be an invalid header in between of a given range - onWrite() return err default: s.metrics.writesQueueBlocked(ctx) @@ -364,7 +374,6 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { // if the writes queue is full, we block until it is not select { case s.writes <- verified: - onWrite() return err case <-s.writesDn: return errStoppedStore @@ -383,10 +392,10 @@ func (s *Store[H]) flushLoop() { for headers := range s.writes { // add headers to the pending and ensure they are accessible s.pending.Append(headers...) - // and notify waiters if any + increase current read head height - // it is important to do Pub after updating pending - // so pending is consistent with atomic Height counter on the heightSub - s.heightSub.Pub(headers...) + // always inform heightSub about new headers seen. + s.heightSub.Notify(getHeights(headers...)...) + // advance contiguousHead if we don't have gaps. + s.advanceContiguousHead(ctx, s.heightSub.Height()) // don't flush and continue if pending batch is not grown enough, // and Store is not stopping(headers == nil) if s.pending.Len() < s.Params.WriteBatchSize && headers != nil { @@ -448,7 +457,8 @@ func (s *Store[H]) flush(ctx context.Context, headers ...H) error { } // marshal and add to batch reference to the new head - b, err := headers[ln-1].Hash().MarshalJSON() + head := *s.contiguousHead.Load() + b, err := head.Hash().MarshalJSON() if err != nil { return err } @@ -467,6 +477,18 @@ func (s *Store[H]) flush(ctx context.Context, headers ...H) error { return batch.Commit(ctx) } +// loadContiguousHead from the disk and sets contiguousHead and heightSub. +func (s *Store[H]) loadContiguousHead(ctx context.Context) error { + h, err := s.readHead(ctx) + if err != nil { + return err + } + + s.contiguousHead.Store(&h) + s.heightSub.SetHeight(h.Height()) + return nil +} + // readHead loads the head from the datastore. func (s *Store[H]) readHead(ctx context.Context) (H, error) { var zero H @@ -499,6 +521,35 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { return data, nil } +// advanceContiguousHead updates contiguousHead and heightSub if a higher +// contiguous header exists on a disk. +func (s *Store[H]) advanceContiguousHead(ctx context.Context, height uint64) { + newHead := s.nextContiguousHead(ctx, height) + if newHead.IsZero() || newHead.Height() <= height { + return + } + + s.contiguousHead.Store(&newHead) + s.heightSub.SetHeight(newHead.Height()) + log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) + s.metrics.newHead(newHead.Height()) +} + +// nextContiguousHead iterates up header by header until it finds a gap. +// if height+1 header not found returns a default header. +func (s *Store[H]) nextContiguousHead(ctx context.Context, height uint64) H { + var newHead H + for { + height++ + h, err := s.getByHeight(ctx, height) + if err != nil { + break + } + newHead = h + } + return newHead +} + // indexTo saves mapping between header Height and Hash to the given batch. func indexTo[H header.Header[H]](ctx context.Context, batch datastore.Batch, headers ...H) error { for _, h := range headers { @@ -509,3 +560,11 @@ func indexTo[H header.Header[H]](ctx context.Context, batch datastore.Batch, hea } return nil } + +func getHeights[H header.Header[H]](headers ...H) []uint64 { + heights := make([]uint64, len(headers)) + for i := range headers { + heights[i] = headers[i].Height() + } + return heights +} diff --git a/store/store_test.go b/store/store_test.go index 96f5ff25..a152a42b 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -1,7 +1,10 @@ package store import ( + "bytes" "context" + "math/rand" + stdsync "sync" "testing" "time" @@ -10,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/celestiaorg/go-header" "github.com/celestiaorg/go-header/headertest" ) @@ -145,6 +149,260 @@ func TestStore_Append_BadHeader(t *testing.T) { require.Error(t, err) } +func TestStore_Append(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(4)) + + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), suite.Head().Hash()) + + const workers = 10 + const chunk = 5 + headers := suite.GenDummyHeaders(workers * chunk) + + errCh := make(chan error, workers) + var wg stdsync.WaitGroup + wg.Add(workers) + + for i := range workers { + go func() { + defer wg.Done() + // make every append happened in random order. + time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) + + err := store.Append(ctx, headers[i*chunk:(i+1)*chunk]...) + errCh <- err + }() + } + + wg.Wait() + close(errCh) + for err := range errCh { + assert.NoError(t, err) + } + + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + assert.Eventually(t, func() bool { + head, err = store.Head(ctx) + assert.NoError(t, err) + assert.Equal(t, int(head.Height()), int(headers[len(headers)-1].Height())) + + switch { + case int(head.Height()) != int(headers[len(headers)-1].Height()): + return false + case !bytes.Equal(head.Hash(), headers[len(headers)-1].Hash()): + return false + default: + return true + } + }, time.Second, time.Millisecond) +} + +func TestStore_Append_stableHeadWhenGaps(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(4)) + + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), suite.Head().Hash()) + + firstChunk := suite.GenDummyHeaders(5) + missedChunk := suite.GenDummyHeaders(5) + lastChunk := suite.GenDummyHeaders(5) + + wantHead := firstChunk[len(firstChunk)-1] + latestHead := lastChunk[len(lastChunk)-1] + + { + err := store.Append(ctx, firstChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + // head is advanced to the last known header. + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Height(), wantHead.Height()) + assert.Equal(t, head.Hash(), wantHead.Hash()) + + // check that store height is aligned with the head. + height := store.Height() + assert.Equal(t, height, head.Height()) + } + { + err := store.Append(ctx, lastChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + // head is not advanced due to a gap. + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Height(), wantHead.Height()) + assert.Equal(t, head.Hash(), wantHead.Hash()) + + // check that store height is aligned with the head. + height := store.Height() + assert.Equal(t, height, head.Height()) + } + { + err := store.Append(ctx, missedChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(time.Second) + + // after appending missing headers we're on the latest header. + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Height(), latestHead.Height()) + assert.Equal(t, head.Hash(), latestHead.Hash()) + + // check that store height is aligned with the head. + height := store.Height() + assert.Equal(t, height, head.Height()) + } +} + +func TestStoreGetByHeight_whenGaps(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), suite.Head().Hash()) + + { + firstChunk := suite.GenDummyHeaders(5) + latestHead := firstChunk[len(firstChunk)-1] + + err := store.Append(ctx, firstChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Height(), latestHead.Height()) + assert.Equal(t, head.Hash(), latestHead.Hash()) + } + + missedChunk := suite.GenDummyHeaders(5) + wantMissHead := missedChunk[len(missedChunk)-2] + + errChMiss := make(chan error, 1) + go func() { + shortCtx, shortCancel := context.WithTimeout(ctx, 3*time.Second) + defer shortCancel() + + _, err := store.GetByHeight(shortCtx, wantMissHead.Height()) + errChMiss <- err + }() + + lastChunk := suite.GenDummyHeaders(5) + wantLastHead := lastChunk[len(lastChunk)-1] + + errChLast := make(chan error, 1) + go func() { + shortCtx, shortCancel := context.WithTimeout(ctx, 3*time.Second) + defer shortCancel() + + _, err := store.GetByHeight(shortCtx, wantLastHead.Height()) + errChLast <- err + }() + + // wait for goroutines start + time.Sleep(100 * time.Millisecond) + + select { + case err := <-errChMiss: + t.Fatalf("store.GetByHeight on prelast height MUST be blocked, have error: %v", err) + case err := <-errChLast: + t.Fatalf("store.GetByHeight on last height MUST be blocked, have error: %v", err) + default: + // ok + } + + { + err := store.Append(ctx, lastChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + select { + case err := <-errChMiss: + t.Fatalf("store.GetByHeight on prelast height MUST be blocked, have error: %v", err) + case err := <-errChLast: + require.NoError(t, err) + default: + t.Fatalf("store.GetByHeight on last height MUST NOT be blocked, have error: %v", err) + } + } + + { + err := store.Append(ctx, missedChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + select { + case err := <-errChMiss: + require.NoError(t, err) + + head, err := store.GetByHeight(ctx, wantLastHead.Height()) + require.NoError(t, err) + require.Equal(t, head, wantLastHead) + default: + t.Fatal("store.GetByHeight on last height MUST NOT be blocked") + } + } +} + +func TestStoreGetByHeight_earlyAvailable(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + const skippedHeaders = 15 + suite.GenDummyHeaders(skippedHeaders) + lastChunk := suite.GenDummyHeaders(1) + + { + err := store.Append(ctx, lastChunk...) + require.NoError(t, err) + + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + } + + { + h, err := store.GetByHeight(ctx, lastChunk[0].Height()) + require.NoError(t, err) + require.Equal(t, h, lastChunk[0]) + } +} + // TestStore_GetRange tests possible combinations of requests and ensures that // the store can handle them adequately (even malformed requests) func TestStore_GetRange(t *testing.T) { @@ -253,6 +511,7 @@ func TestBatch_GetByHeightBeforeInit(t *testing.T) { t.Cleanup(cancel) suite := headertest.NewTestSuite(t) + suite.Head().HeightI = 1_000_000 ds := sync.MutexWrap(datastore.NewMapDatastore()) store, err := NewStore[*headertest.DummyHeader](ds) @@ -265,9 +524,8 @@ func TestBatch_GetByHeightBeforeInit(t *testing.T) { _ = store.Init(ctx, suite.Head()) }() - h, err := store.GetByHeight(ctx, 1) - require.NoError(t, err) - require.NotNil(t, h) + _, err = store.GetByHeight(ctx, 1) + require.ErrorIs(t, err, header.ErrNotFound) } func TestStoreInit(t *testing.T) { diff --git a/sync/sync_test.go b/sync/sync_test.go index b9acb2d3..b8d98b7d 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -47,19 +47,37 @@ func TestSyncSimpleRequestingHead(t *testing.T) { err = syncer.SyncWait(ctx) require.NoError(t, err) - exp, err := remoteStore.Head(ctx) - require.NoError(t, err) - - have, err := localStore.Head(ctx) - require.NoError(t, err) - assert.Equal(t, exp.Height(), have.Height()) - assert.Empty(t, syncer.pending.Head()) - - state := syncer.State() - assert.Equal(t, uint64(exp.Height()), state.Height) - assert.Equal(t, uint64(2), state.FromHeight) - assert.Equal(t, uint64(exp.Height()), state.ToHeight) - assert.True(t, state.Finished(), state) + // force sync to update underlying stores. + syncer.wantSync() + + // we need to wait for a flush + assert.Eventually(t, func() bool { + exp, err := remoteStore.Head(ctx) + require.NoError(t, err) + + have, err := localStore.Head(ctx) + require.NoError(t, err) + + state := syncer.State() + switch { + case exp.Height() != have.Height(): + return false + case syncer.pending.Head() != nil: + return false + + case uint64(exp.Height()) != state.Height: + return false + case uint64(2) != state.FromHeight: + return false + + case uint64(exp.Height()) != state.ToHeight: + return false + case !state.Finished(): + return false + default: + return true + } + }, 2*time.Second, 100*time.Millisecond) } func TestDoSyncFullRangeFromExternalPeer(t *testing.T) { @@ -206,11 +224,20 @@ func TestSyncPendingRangesWithMisses(t *testing.T) { exp, err := remoteStore.Head(ctx) require.NoError(t, err) - have, err := localStore.Head(ctx) - require.NoError(t, err) - - assert.Equal(t, exp.Height(), have.Height()) - assert.Empty(t, syncer.pending.Head()) // assert all cache from pending is used + // we need to wait for a flush + assert.Eventually(t, func() bool { + have, err := localStore.Head(ctx) + require.NoError(t, err) + + switch { + case exp.Height() != have.Height(): + return false + case !syncer.pending.Head().IsZero(): + return false + default: + return true + } + }, 2*time.Second, 100*time.Millisecond) } // TestSyncer_FindHeadersReturnsCorrectRange ensures that `findHeaders` returns