From 46f8fadcb39f04ed4d2861592f30322501d25d8a Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 8 Jan 2025 12:25:57 +0100 Subject: [PATCH 01/44] fix(store): track store's contiguous head --- store/metrics.go | 10 +++++++++- store/store.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/store/metrics.go b/store/metrics.go index e5f14211..0c096408 100644 --- a/store/metrics.go +++ b/store/metrics.go @@ -13,7 +13,9 @@ import ( var meter = otel.Meter("header/store") type metrics struct { - headHeight atomic.Int64 + headHeight atomic.Int64 + contiguousHeadHeight atomic.Int64 + headHeightInst metric.Int64ObservableGauge headHeightReg metric.Registration @@ -66,6 +68,12 @@ func (m *metrics) newHead(height uint64) { }) } +func (m *metrics) newContiguousHead(height uint64) { + m.observe(context.Background(), func(ctx context.Context) { + m.contiguousHeadHeight.Store(int64(height)) + }) +} + func (m *metrics) observeHeight(_ context.Context, obs metric.Observer) error { obs.ObserveInt64(m.headHeightInst, m.headHeight.Load()) return nil diff --git a/store/store.go b/store/store.go index 83303fd1..6eab8357 100644 --- a/store/store.go +++ b/store/store.go @@ -51,6 +51,8 @@ type Store[H header.Header[H]] struct { writesDn chan struct{} // writeHead maintains the current write head writeHead atomic.Pointer[H] + // contiguousHead is the highest contiguous header observed + contiguousHead atomic.Pointer[H] // pending keeps headers pending to be written in one batch pending *batch[H] @@ -123,6 +125,7 @@ func (s *Store[H]) Init(ctx context.Context, initial H) error { log.Infow("initialized head", "height", initial.Height(), "hash", initial.Hash()) s.heightSub.Pub(initial) + s.contiguousHead.Store(&initial) return nil } @@ -168,6 +171,10 @@ func (s *Store[H]) Height() uint64 { } func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) { + // if head := s.contiguousHead.Load(); head != nil { + // return *head, nil + // } + head, err := s.GetByHeight(ctx, s.heightSub.Height()) if err == nil { return head, nil @@ -231,8 +238,13 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { return h, nil } + return s.getByHeight(ctx, height) +} + +func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) { hash, err := s.heightIndex.HashByHeight(ctx, height) if err != nil { + var zero H if errors.Is(err, datastore.ErrNotFound) { return zero, header.ErrNotFound } @@ -387,6 +399,7 @@ func (s *Store[H]) flushLoop() { // it is important to do Pub after updating pending // so pending is consistent with atomic Height counter on the heightSub s.heightSub.Pub(headers...) + s.advanceContiguousHead(ctx) // don't flush and continue if pending batch is not grown enough, // and Store is not stopping(headers == nil) if s.pending.Len() < s.Params.WriteBatchSize && headers != nil { @@ -499,6 +512,36 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { return data, nil } +// try advance contiguous head based on already written headers. +func (s *Store[H]) advanceContiguousHead(ctx context.Context) { + currHead := s.contiguousHead.Load() + if currHead == nil { + return + } + currHeight := (*currHead).Height() + prevHeight := currHeight + + // TODO(cristaloleg): benchmark this timeout or make it dynamic. + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + var newHead H + for { + h, err := s.getByHeight(ctx, currHeight+1) + if err != nil { + break + } + newHead = h + currHeight++ + } + + if currHeight > prevHeight { + s.contiguousHead.Store(&newHead) + log.Infow("new contiguous head", "height", newHead.Height(), "hash", newHead.Hash()) + s.metrics.newContiguousHead(newHead.Height()) + } +} + // indexTo saves mapping between header Height and Hash to the given batch. func indexTo[H header.Header[H]](ctx context.Context, batch datastore.Batch, headers ...H) error { for _, h := range headers { From cbcb738062025662035aeeca8d8c8530cd396ef4 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 8 Jan 2025 13:24:54 +0100 Subject: [PATCH 02/44] more fixes --- store/heightsub.go | 17 ++++-- store/heightsub_test.go | 62 ++++++++++++++++++++ store/metrics.go | 10 +--- store/store.go | 81 ++++++++++++++------------ store/store_test.go | 123 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 242 insertions(+), 51 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 2335001d..96c1dd33 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -3,6 +3,7 @@ package store import ( "context" "errors" + "fmt" "sync" "sync/atomic" @@ -35,7 +36,15 @@ func (hs *heightSub[H]) Height() uint64 { // SetHeight sets the new head height for heightSub. func (hs *heightSub[H]) SetHeight(height uint64) { - hs.height.Store(height) + for { + curr := hs.height.Load() + if curr >= height { + return + } + if hs.height.CompareAndSwap(curr, height) { + return + } + } } // Sub subscribes for a header of a given height. @@ -89,11 +98,9 @@ func (hs *heightSub[H]) Pub(headers ...H) { return } - height := hs.Height() from, to := headers[0].Height(), headers[ln-1].Height() - if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1 - log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from) - return + if from > to { + panic(fmt.Sprintf("from must be lower than to, have: %d and %d", from, to)) } hs.SetHeight(to) diff --git a/store/heightsub_test.go b/store/heightsub_test.go index f5958422..ccd6c24d 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -68,6 +68,68 @@ func TestHeightSub(t *testing.T) { } } +// Test heightSub can accept non-adj headers without an error. +func TestHeightSubNonAdjacement(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + hs := newHeightSub[*headertest.DummyHeader]() + + { + h := headertest.RandDummyHeader(t) + h.HeightI = 100 + hs.SetHeight(99) + hs.Pub(h) + } + + { + go func() { + // fixes flakiness on CI + time.Sleep(time.Millisecond) + + h1 := headertest.RandDummyHeader(t) + h1.HeightI = 200 + h2 := headertest.RandDummyHeader(t) + h2.HeightI = 300 + hs.Pub(h1, h2) + }() + + h, err := hs.Sub(ctx, 200) + assert.NoError(t, err) + assert.NotNil(t, h) + } +} + +// Test heightSub's height cannot go down but only up. +func TestHeightSub_monotonicHeight(t *testing.T) { + hs := newHeightSub[*headertest.DummyHeader]() + + { + h := headertest.RandDummyHeader(t) + h.HeightI = 100 + hs.SetHeight(99) + hs.Pub(h) + } + + { + h1 := headertest.RandDummyHeader(t) + h1.HeightI = 200 + h2 := headertest.RandDummyHeader(t) + h2.HeightI = 300 + hs.Pub(h1, h2) + } + + { + h1 := headertest.RandDummyHeader(t) + h1.HeightI = 120 + h2 := headertest.RandDummyHeader(t) + h2.HeightI = 130 + hs.Pub(h1, h2) + } + + assert.Equal(t, hs.height.Load(), uint64(300)) +} + func TestHeightSubCancellation(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/store/metrics.go b/store/metrics.go index 0c096408..e5f14211 100644 --- a/store/metrics.go +++ b/store/metrics.go @@ -13,9 +13,7 @@ import ( var meter = otel.Meter("header/store") type metrics struct { - headHeight atomic.Int64 - contiguousHeadHeight atomic.Int64 - + headHeight atomic.Int64 headHeightInst metric.Int64ObservableGauge headHeightReg metric.Registration @@ -68,12 +66,6 @@ func (m *metrics) newHead(height uint64) { }) } -func (m *metrics) newContiguousHead(height uint64) { - m.observe(context.Background(), func(ctx context.Context) { - m.contiguousHeadHeight.Store(int64(height)) - }) -} - func (m *metrics) observeHeight(_ context.Context, obs metric.Observer) error { obs.ObserveInt64(m.headHeightInst, m.headHeight.Load()) return nil diff --git a/store/store.go b/store/store.go index 6eab8357..97b1544c 100644 --- a/store/store.go +++ b/store/store.go @@ -49,8 +49,6 @@ type Store[H header.Header[H]] struct { writes chan []H // signals when writes are finished writesDn chan struct{} - // writeHead maintains the current write head - writeHead atomic.Pointer[H] // contiguousHead is the highest contiguous header observed contiguousHead atomic.Pointer[H] // pending keeps headers pending to be written in one batch @@ -124,8 +122,8 @@ func (s *Store[H]) Init(ctx context.Context, initial H) error { } log.Infow("initialized head", "height", initial.Height(), "hash", initial.Hash()) - s.heightSub.Pub(initial) s.contiguousHead.Store(&initial) + s.heightSub.Pub(initial) return nil } @@ -167,27 +165,35 @@ func (s *Store[H]) Stop(ctx context.Context) error { } func (s *Store[H]) Height() uint64 { - return s.heightSub.Height() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + head, err := s.Head(ctx) + if err != nil { + if errors.Is(err, context.Canceled) || + errors.Is(err, context.DeadlineExceeded) || + errors.Is(err, datastore.ErrNotFound) { + return 0 + } + panic(err) + } + return head.Height() } func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) { - // if head := s.contiguousHead.Load(); head != nil { - // return *head, nil - // } - - head, err := s.GetByHeight(ctx, s.heightSub.Height()) - if err == nil { - return head, nil + if head := s.contiguousHead.Load(); head != nil { + return *head, nil } var zero H - head, err = s.readHead(ctx) + head, err := s.readHead(ctx) switch { default: return zero, err case errors.Is(err, datastore.ErrNotFound), errors.Is(err, header.ErrNotFound): return zero, header.ErrNoHead case err == nil: + s.contiguousHead.Store(&head) s.heightSub.SetHeight(head.Height()) log.Infow("loaded head", "height", head.Height(), "hash", head.Hash()) return head, nil @@ -234,14 +240,15 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { // which means the requested 'height' should be present // // check if the requested header is not yet written on disk - if h := s.pending.GetByHeight(height); !h.IsZero() { - return h, nil - } return s.getByHeight(ctx, height) } func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) { + if h := s.pending.GetByHeight(height); !h.IsZero() { + return h, nil + } + hash, err := s.heightIndex.HashByHeight(ctx, height) if err != nil { var zero H @@ -316,17 +323,10 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { return nil } - var err error - // take current write head to verify headers against - var head H - headPtr := s.writeHead.Load() - if headPtr == nil { - head, err = s.Head(ctx) - if err != nil { - return err - } - } else { - head = *headPtr + // take current contiguous head to verify headers against + head, err := s.Head(ctx) + if err != nil { + return err } // collect valid headers @@ -356,19 +356,11 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { head = h } - onWrite := func() { - newHead := verified[len(verified)-1] - s.writeHead.Store(&newHead) - log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) - s.metrics.newHead(newHead.Height()) - } - // queue headers to be written on disk select { case s.writes <- verified: // we return an error here after writing, // as there might be an invalid header in between of a given range - onWrite() return err default: s.metrics.writesQueueBlocked(ctx) @@ -376,7 +368,6 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { // if the writes queue is full, we block until it is not select { case s.writes <- verified: - onWrite() return err case <-s.writesDn: return errStoppedStore @@ -399,6 +390,7 @@ func (s *Store[H]) flushLoop() { // it is important to do Pub after updating pending // so pending is consistent with atomic Height counter on the heightSub s.heightSub.Pub(headers...) + // try to advance contiguousHead if we don't have gaps. s.advanceContiguousHead(ctx) // don't flush and continue if pending batch is not grown enough, // and Store is not stopping(headers == nil) @@ -537,8 +529,23 @@ func (s *Store[H]) advanceContiguousHead(ctx context.Context) { if currHeight > prevHeight { s.contiguousHead.Store(&newHead) - log.Infow("new contiguous head", "height", newHead.Height(), "hash", newHead.Hash()) - s.metrics.newContiguousHead(newHead.Height()) + s.heightSub.SetHeight(currHeight) + log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) + s.metrics.newHead(newHead.Height()) + + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + b, err := newHead.Hash().MarshalJSON() + if err != nil { + log.Errorw("cannot marshal new head", + "height", newHead.Height(), "hash", newHead.Hash(), "err", err) + } + + if err := s.ds.Put(ctx, headKey, b); err != nil { + log.Errorw("cannot put new head", + "height", newHead.Height(), "hash", newHead.Hash(), "err", err) + } } } diff --git a/store/store_test.go b/store/store_test.go index 96f5ff25..19daed35 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -2,6 +2,8 @@ package store import ( "context" + "math/rand" + stdsync "sync" "testing" "time" @@ -145,6 +147,127 @@ func TestStore_Append_BadHeader(t *testing.T) { require.Error(t, err) } +func TestStore_Append(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(4)) + + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), suite.Head().Hash()) + + const workers = 10 + const chunk = 5 + headers := suite.GenDummyHeaders(workers * chunk) + + errCh := make(chan error, workers) + var wg stdsync.WaitGroup + wg.Add(workers) + + for i := range workers { + go func() { + defer wg.Done() + // make every append happened in random order. + time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) + + err := store.Append(ctx, headers[i*chunk:(i+1)*chunk]...) + errCh <- err + }() + } + + wg.Wait() + close(errCh) + for err := range errCh { + assert.NoError(t, err) + } + + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + // assert.Eventually(t, func() bool { + head, err = store.Head(ctx) + assert.NoError(t, err) + assert.Equal(t, int(head.Height()), int(headers[len(headers)-1].Height())) + + // return int(head.Height()) == int(headers[len(headers)-1].Height()) + // }, time.Second, time.Millisecond) + assert.Equal(t, head.Hash(), headers[len(headers)-1].Hash()) +} + +func TestStore_Append_stableHeadWhenGaps(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(4)) + + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), suite.Head().Hash()) + + firstChunk := suite.GenDummyHeaders(5) + missedChunk := suite.GenDummyHeaders(5) + lastChunk := suite.GenDummyHeaders(5) + + wantHead := firstChunk[len(firstChunk)-1] + latestHead := lastChunk[len(lastChunk)-1] + + { + err := store.Append(ctx, firstChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + // head is advanced to the last known header. + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Height(), wantHead.Height()) + assert.Equal(t, head.Hash(), wantHead.Hash()) + + // check that store height is aligned with the head. + height := store.Height() + assert.Equal(t, height, head.Height()) + } + { + err := store.Append(ctx, lastChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + // head is not advanced due to a gap. + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Height(), wantHead.Height()) + assert.Equal(t, head.Hash(), wantHead.Hash()) + + // check that store height is aligned with the head. + height := store.Height() + assert.Equal(t, height, head.Height()) + } + { + err := store.Append(ctx, missedChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(time.Second) + + // after appending missing headers we're on the latest header. + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Height(), latestHead.Height()) + assert.Equal(t, head.Hash(), latestHead.Hash()) + + // check that store height is aligned with the head. + height := store.Height() + assert.Equal(t, height, head.Height()) + } +} + // TestStore_GetRange tests possible combinations of requests and ensures that // the store can handle them adequately (even malformed requests) func TestStore_GetRange(t *testing.T) { From 4bc4aa0b8b5f7f8f54a74ad02526ed700f1d5943 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 8 Jan 2025 13:28:40 +0100 Subject: [PATCH 03/44] fix test --- p2p/server_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/p2p/server_test.go b/p2p/server_test.go index 1e896b2e..4b19ede9 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -17,6 +17,7 @@ func TestExchangeServer_handleRequestTimeout(t *testing.T) { peer := createMocknet(t, 1) s, err := store.NewStore[*headertest.DummyHeader](datastore.NewMapDatastore()) require.NoError(t, err) + s.Init(context.Background(), headertest.RandDummyHeader(t)) server, err := NewExchangeServer[*headertest.DummyHeader]( peer[0], s, From 9e1984042e80105f5bf476d0e8543b522f606c65 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 8 Jan 2025 13:41:05 +0100 Subject: [PATCH 04/44] fix sync test --- sync/sync_test.go | 44 +++++++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/sync/sync_test.go b/sync/sync_test.go index b9acb2d3..f3f14a06 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -47,19 +47,37 @@ func TestSyncSimpleRequestingHead(t *testing.T) { err = syncer.SyncWait(ctx) require.NoError(t, err) - exp, err := remoteStore.Head(ctx) - require.NoError(t, err) - - have, err := localStore.Head(ctx) - require.NoError(t, err) - assert.Equal(t, exp.Height(), have.Height()) - assert.Empty(t, syncer.pending.Head()) - - state := syncer.State() - assert.Equal(t, uint64(exp.Height()), state.Height) - assert.Equal(t, uint64(2), state.FromHeight) - assert.Equal(t, uint64(exp.Height()), state.ToHeight) - assert.True(t, state.Finished(), state) + // force sync to update underlying stores. + syncer.wantSync() + + // we need to wait for a flush + assert.Eventually(t, func() bool { + exp, err := remoteStore.Head(ctx) + require.NoError(t, err) + + have, err := localStore.Head(ctx) + require.NoError(t, err) + + state := syncer.State() + switch { + case exp.Height() != have.Height(): + return false + case syncer.pending.Head() != nil: + return false + + case uint64(exp.Height()) != state.Height: + return false + case uint64(2) != state.FromHeight: + return false + + case uint64(exp.Height()) != state.ToHeight: + return false + case !state.Finished(): + return false + default: + return true + } + }, 2*time.Second, 100*time.Millisecond) } func TestDoSyncFullRangeFromExternalPeer(t *testing.T) { From cc8894ce817c48ab299449584ab54669482ce018 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 8 Jan 2025 13:48:09 +0100 Subject: [PATCH 05/44] fix sync test --- sync/sync_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sync/sync_test.go b/sync/sync_test.go index f3f14a06..de767412 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -224,11 +224,16 @@ func TestSyncPendingRangesWithMisses(t *testing.T) { exp, err := remoteStore.Head(ctx) require.NoError(t, err) - have, err := localStore.Head(ctx) - require.NoError(t, err) + // we need to wait for a flush + assert.Eventually(t, func() bool { + have, err := localStore.Head(ctx) + require.NoError(t, err) - assert.Equal(t, exp.Height(), have.Height()) - assert.Empty(t, syncer.pending.Head()) // assert all cache from pending is used + // assert.Equal(t, exp.Height(), have.Height()) + // assert.Empty(t, syncer.pending.Head()) // assert all cache from pending is used + + return exp.Height() == have.Height() + }, 2*time.Second, 100*time.Millisecond) } // TestSyncer_FindHeadersReturnsCorrectRange ensures that `findHeaders` returns From 73b65f2a9c73080f6c77ff2a7f8a095b417dbdf8 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 9 Jan 2025 15:06:53 +0100 Subject: [PATCH 06/44] test heightSub --- store/heightsub.go | 1 - store/store_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/store/heightsub.go b/store/heightsub.go index 96c1dd33..8d15e243 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -102,7 +102,6 @@ func (hs *heightSub[H]) Pub(headers ...H) { if from > to { panic(fmt.Sprintf("from must be lower than to, have: %d and %d", from, to)) } - hs.SetHeight(to) hs.heightReqsLk.Lock() defer hs.heightReqsLk.Unlock() diff --git a/store/store_test.go b/store/store_test.go index 19daed35..30c1f137 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -268,6 +268,51 @@ func TestStore_Append_stableHeadWhenGaps(t *testing.T) { } } +func TestStoreGetByHeight_whenGaps(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), suite.Head().Hash()) + + firstChunk := suite.GenDummyHeaders(5) + _ = suite.GenDummyHeaders(5) + lastChunk := suite.GenDummyHeaders(5) + + wantHead := firstChunk[len(firstChunk)-1] + + { + err = store.Append(ctx, lastChunk...) + require.NoError(t, err) + + shortCtx, shortCancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer shortCancel() + + head, err = store.GetByHeight(shortCtx, wantHead.Height()) + require.Error(t, err) + } + + { + go func() { + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + err := store.Append(ctx, firstChunk...) + require.NoError(t, err) + }() + + head, err = store.GetByHeight(ctx, wantHead.Height()) + require.NoError(t, err) + assert.Equal(t, head.Hash(), wantHead.Hash()) + } +} + // TestStore_GetRange tests possible combinations of requests and ensures that // the store can handle them adequately (even malformed requests) func TestStore_GetRange(t *testing.T) { From b74e825c28686282866cd5a3d527530236b70521 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 9 Jan 2025 15:06:59 +0100 Subject: [PATCH 07/44] simplify test --- store/heightsub_test.go | 28 ++++++---------------------- 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/store/heightsub_test.go b/store/heightsub_test.go index ccd6c24d..26c11744 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -104,30 +104,14 @@ func TestHeightSubNonAdjacement(t *testing.T) { func TestHeightSub_monotonicHeight(t *testing.T) { hs := newHeightSub[*headertest.DummyHeader]() - { - h := headertest.RandDummyHeader(t) - h.HeightI = 100 - hs.SetHeight(99) - hs.Pub(h) - } + hs.SetHeight(99) + assert.Equal(t, int64(hs.height.Load()), int64(99)) - { - h1 := headertest.RandDummyHeader(t) - h1.HeightI = 200 - h2 := headertest.RandDummyHeader(t) - h2.HeightI = 300 - hs.Pub(h1, h2) - } - - { - h1 := headertest.RandDummyHeader(t) - h1.HeightI = 120 - h2 := headertest.RandDummyHeader(t) - h2.HeightI = 130 - hs.Pub(h1, h2) - } + hs.SetHeight(300) + assert.Equal(t, int64(hs.height.Load()), int64(300)) - assert.Equal(t, hs.height.Load(), uint64(300)) + hs.SetHeight(120) + assert.Equal(t, int64(hs.height.Load()), int64(300)) } func TestHeightSubCancellation(t *testing.T) { From 2a317aded5e1e7318af8bbbd4bc9f273d52cb37d Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Fri, 10 Jan 2025 14:09:25 +0100 Subject: [PATCH 08/44] wip --- store/batch.go | 6 ++++- store/heightsub.go | 17 ++++++++++++- store/store.go | 9 ++++++- store/store_test.go | 61 ++++++++++++++++++++++++++++++++++----------- 4 files changed, 75 insertions(+), 18 deletions(-) diff --git a/store/batch.go b/store/batch.go index 7785953f..04b41a01 100644 --- a/store/batch.go +++ b/store/batch.go @@ -76,7 +76,11 @@ func (b *batch[H]) getByHeight(height uint64) H { return zero } - return b.headers[height-base-1] + h := b.headers[height-base-1] + if h.Height() == height { + return h + } + return zero } // Append appends new headers to the batch. diff --git a/store/heightsub.go b/store/heightsub.go index 8d15e243..3432f75c 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -42,6 +42,18 @@ func (hs *heightSub[H]) SetHeight(height uint64) { return } if hs.height.CompareAndSwap(curr, height) { + println("CAS", curr, height) + hs.heightReqsLk.Lock() + for ; curr <= height; curr++ { + reqs, ok := hs.heightReqs[curr] + if ok { + for k := range reqs { + close(k) + } + delete(hs.heightReqs, curr) + } + } + hs.heightReqsLk.Unlock() return } } @@ -74,7 +86,10 @@ func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) { hs.heightReqsLk.Unlock() select { - case resp := <-resp: + case resp, ok := <-resp: + if !ok { + return zero, errElapsedHeight + } return resp, nil case <-ctx.Done(): // no need to keep the request, if the op has canceled diff --git a/store/store.go b/store/store.go index 97b1544c..0ae51b46 100644 --- a/store/store.go +++ b/store/store.go @@ -246,11 +246,13 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) { if h := s.pending.GetByHeight(height); !h.IsZero() { + println("pending", height) return h, nil } hash, err := s.heightIndex.HashByHeight(ctx, height) if err != nil { + println("index", height, err.Error()) var zero H if errors.Is(err, datastore.ErrNotFound) { return zero, header.ErrNotFound @@ -323,6 +325,8 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { return nil } + println("APPEND", headers[0].Height(), headers[len(headers)-1].Height()) + // take current contiguous head to verify headers against head, err := s.Head(ctx) if err != nil { @@ -389,7 +393,7 @@ func (s *Store[H]) flushLoop() { // and notify waiters if any + increase current read head height // it is important to do Pub after updating pending // so pending is consistent with atomic Height counter on the heightSub - s.heightSub.Pub(headers...) + // s.heightSub.Pub(headers...) // try to advance contiguousHead if we don't have gaps. s.advanceContiguousHead(ctx) // don't flush and continue if pending batch is not grown enough, @@ -519,8 +523,10 @@ func (s *Store[H]) advanceContiguousHead(ctx context.Context) { var newHead H for { + println("TRY", currHeight+1) h, err := s.getByHeight(ctx, currHeight+1) if err != nil { + println("FAIL", currHeight+1, err.Error()) break } newHead = h @@ -528,6 +534,7 @@ func (s *Store[H]) advanceContiguousHead(ctx context.Context) { } if currHeight > prevHeight { + println("UPD", currHeight, prevHeight) s.contiguousHead.Store(&newHead) s.heightSub.SetHeight(currHeight) log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) diff --git a/store/store_test.go b/store/store_test.go index 30c1f137..8fd1a780 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -282,34 +282,65 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { assert.Equal(t, head.Hash(), suite.Head().Hash()) firstChunk := suite.GenDummyHeaders(5) - _ = suite.GenDummyHeaders(5) + missedChunk := suite.GenDummyHeaders(5) lastChunk := suite.GenDummyHeaders(5) - wantHead := firstChunk[len(firstChunk)-1] - { - err = store.Append(ctx, lastChunk...) + latestHead := firstChunk[len(firstChunk)-1] + + err := store.Append(ctx, firstChunk...) require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) - shortCtx, shortCancel := context.WithTimeout(ctx, 10*time.Millisecond) + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Height(), latestHead.Height()) + assert.Equal(t, head.Hash(), latestHead.Hash()) + } + + errCh := make(chan error, 1) + go func() { + wantHead := lastChunk[len(lastChunk)-1] + + shortCtx, shortCancel := context.WithTimeout(ctx, 3*time.Second) defer shortCancel() - head, err = store.GetByHeight(shortCtx, wantHead.Height()) - require.Error(t, err) + _, err := store.GetByHeight(shortCtx, wantHead.Height()) + errCh <- err + }() + + select { + case err := <-errCh: + t.Fatalf("store.GetByHeight must be blocked, have error: %v", err) + default: } { - go func() { - // wait for batch to be written. - time.Sleep(100 * time.Millisecond) + err := store.Append(ctx, lastChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + } - err := store.Append(ctx, firstChunk...) - require.NoError(t, err) - }() + select { + case err := <-errCh: + t.Fatalf("store.GetByHeight must be still blocked, have error: %v", err) + default: + } - head, err = store.GetByHeight(ctx, wantHead.Height()) + { + err := store.Append(ctx, missedChunk...) require.NoError(t, err) - assert.Equal(t, head.Hash(), wantHead.Hash()) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + } + + select { + case err := <-errCh: + require.NoError(t, err) + default: + t.Fatal("store.GetByHeight must not be blocked") } } From 1903695fcb9aea334bd49cafc59f656496cd452c Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Fri, 10 Jan 2025 15:01:58 +0100 Subject: [PATCH 09/44] cleanup --- store/heightsub.go | 1 - store/store.go | 16 ++++++---------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 3432f75c..baa633a7 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -42,7 +42,6 @@ func (hs *heightSub[H]) SetHeight(height uint64) { return } if hs.height.CompareAndSwap(curr, height) { - println("CAS", curr, height) hs.heightReqsLk.Lock() for ; curr <= height; curr++ { reqs, ok := hs.heightReqs[curr] diff --git a/store/store.go b/store/store.go index 0ae51b46..e28af3a5 100644 --- a/store/store.go +++ b/store/store.go @@ -230,11 +230,14 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { if height == 0 { return zero, errors.New("header/store: height must be bigger than zero") } + // if the requested 'height' was not yet published // we subscribe to it - h, err := s.heightSub.Sub(ctx, height) - if !errors.Is(err, errElapsedHeight) { - return h, err + if head := s.contiguousHead.Load(); head == nil || height > (*head).Height() { + h, err := s.heightSub.Sub(ctx, height) + if !errors.Is(err, errElapsedHeight) { + return h, err + } } // otherwise, the errElapsedHeight is thrown, // which means the requested 'height' should be present @@ -246,13 +249,11 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) { if h := s.pending.GetByHeight(height); !h.IsZero() { - println("pending", height) return h, nil } hash, err := s.heightIndex.HashByHeight(ctx, height) if err != nil { - println("index", height, err.Error()) var zero H if errors.Is(err, datastore.ErrNotFound) { return zero, header.ErrNotFound @@ -325,8 +326,6 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { return nil } - println("APPEND", headers[0].Height(), headers[len(headers)-1].Height()) - // take current contiguous head to verify headers against head, err := s.Head(ctx) if err != nil { @@ -523,10 +522,8 @@ func (s *Store[H]) advanceContiguousHead(ctx context.Context) { var newHead H for { - println("TRY", currHeight+1) h, err := s.getByHeight(ctx, currHeight+1) if err != nil { - println("FAIL", currHeight+1, err.Error()) break } newHead = h @@ -534,7 +531,6 @@ func (s *Store[H]) advanceContiguousHead(ctx context.Context) { } if currHeight > prevHeight { - println("UPD", currHeight, prevHeight) s.contiguousHead.Store(&newHead) s.heightSub.SetHeight(currHeight) log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) From 268afc3887165b32aee287c70ccd605c267dd18e Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Fri, 10 Jan 2025 16:49:05 +0100 Subject: [PATCH 10/44] simplify heightSub --- store/heightsub.go | 94 ++++++++--------------------------------- store/heightsub_test.go | 63 +++++++++------------------ store/store.go | 9 ++-- 3 files changed, 43 insertions(+), 123 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index baa633a7..0d1cfc0e 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -3,7 +3,6 @@ package store import ( "context" "errors" - "fmt" "sync" "sync/atomic" @@ -19,13 +18,13 @@ type heightSub[H header.Header[H]] struct { // that has been fully verified and inserted into the subjective chain height atomic.Uint64 heightReqsLk sync.Mutex - heightReqs map[uint64]map[chan H]struct{} + heightSubs map[uint64]chan struct{} } // newHeightSub instantiates new heightSub. func newHeightSub[H header.Header[H]]() *heightSub[H] { return &heightSub[H]{ - heightReqs: make(map[uint64]map[chan H]struct{}), + heightSubs: make(map[uint64]chan struct{}), } } @@ -44,12 +43,10 @@ func (hs *heightSub[H]) SetHeight(height uint64) { if hs.height.CompareAndSwap(curr, height) { hs.heightReqsLk.Lock() for ; curr <= height; curr++ { - reqs, ok := hs.heightReqs[curr] + sub, ok := hs.heightSubs[curr] if ok { - for k := range reqs { - close(k) - } - delete(hs.heightReqs, curr) + close(sub) + delete(hs.heightSubs, curr) } } hs.heightReqsLk.Unlock() @@ -58,13 +55,9 @@ func (hs *heightSub[H]) SetHeight(height uint64) { } } -// Sub subscribes for a header of a given height. -// It can return errElapsedHeight, which means a requested header was already provided -// and caller should get it elsewhere. -func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) { - var zero H +func (hs *heightSub[H]) Wait(ctx context.Context, height uint64) error { if hs.Height() >= height { - return zero, errElapsedHeight + return errElapsedHeight } hs.heightReqsLk.Lock() @@ -73,78 +66,25 @@ func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) { // The lock above can park a goroutine long enough for hs.height to change for a requested height, // leaving the request never fulfilled and the goroutine deadlocked. hs.heightReqsLk.Unlock() - return zero, errElapsedHeight + return errElapsedHeight } - resp := make(chan H, 1) - reqs, ok := hs.heightReqs[height] + + sub, ok := hs.heightSubs[height] if !ok { - reqs = make(map[chan H]struct{}) - hs.heightReqs[height] = reqs + sub = make(chan struct{}, 1) + hs.heightSubs[height] = sub } - reqs[resp] = struct{}{} hs.heightReqsLk.Unlock() select { - case resp, ok := <-resp: - if !ok { - return zero, errElapsedHeight - } - return resp, nil + case <-sub: + return nil case <-ctx.Done(): // no need to keep the request, if the op has canceled hs.heightReqsLk.Lock() - delete(reqs, resp) - if len(reqs) == 0 { - delete(hs.heightReqs, height) - } + close(sub) + delete(hs.heightSubs, height) hs.heightReqsLk.Unlock() - return zero, ctx.Err() - } -} - -// Pub processes all the outstanding subscriptions matching the given headers. -// Pub is only safe when called from one goroutine. -// For Pub to work correctly, heightSub has to be initialized with SetHeight -// so that given headers are contiguous to the height on heightSub. -func (hs *heightSub[H]) Pub(headers ...H) { - ln := len(headers) - if ln == 0 { - return - } - - from, to := headers[0].Height(), headers[ln-1].Height() - if from > to { - panic(fmt.Sprintf("from must be lower than to, have: %d and %d", from, to)) - } - - hs.heightReqsLk.Lock() - defer hs.heightReqsLk.Unlock() - - // there is a common case where we Pub only header - // in this case, we shouldn't loop over each heightReqs - // and instead read from the map directly - if ln == 1 { - reqs, ok := hs.heightReqs[from] - if ok { - for req := range reqs { - req <- headers[0] // reqs must always be buffered, so this won't block - } - delete(hs.heightReqs, from) - } - return - } - - // instead of looping over each header in 'headers', we can loop over each request - // which will drastically decrease idle iterations, as there will be less requests than headers - for height, reqs := range hs.heightReqs { - // then we look if any of the requests match the given range of headers - if height >= from && height <= to { - // and if so, calculate its position and fulfill requests - h := headers[height-from] - for req := range reqs { - req <- h // reqs must always be buffered, so this won't block - } - delete(hs.heightReqs, height) - } + return ctx.Err() } } diff --git a/store/heightsub_test.go b/store/heightsub_test.go index 26c11744..67788f8c 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -18,14 +18,10 @@ func TestHeightSub(t *testing.T) { // assert subscription returns nil for past heights { - h := headertest.RandDummyHeader(t) - h.HeightI = 100 hs.SetHeight(99) - hs.Pub(h) - h, err := hs.Sub(ctx, 10) + err := hs.Wait(ctx, 10) assert.ErrorIs(t, err, errElapsedHeight) - assert.Nil(t, h) } // assert actual subscription works @@ -34,16 +30,11 @@ func TestHeightSub(t *testing.T) { // fixes flakiness on CI time.Sleep(time.Millisecond) - h1 := headertest.RandDummyHeader(t) - h1.HeightI = 101 - h2 := headertest.RandDummyHeader(t) - h2.HeightI = 102 - hs.Pub(h1, h2) + hs.SetHeight(102) }() - h, err := hs.Sub(ctx, 101) + err := hs.Wait(ctx, 101) assert.NoError(t, err) - assert.NotNil(t, h) } // assert multiple subscriptions work @@ -75,29 +66,17 @@ func TestHeightSubNonAdjacement(t *testing.T) { hs := newHeightSub[*headertest.DummyHeader]() - { - h := headertest.RandDummyHeader(t) - h.HeightI = 100 - hs.SetHeight(99) - hs.Pub(h) - } + hs.SetHeight(99) - { - go func() { - // fixes flakiness on CI - time.Sleep(time.Millisecond) + go func() { + // fixes flakiness on CI + time.Sleep(time.Millisecond) - h1 := headertest.RandDummyHeader(t) - h1.HeightI = 200 - h2 := headertest.RandDummyHeader(t) - h2.HeightI = 300 - hs.Pub(h1, h2) - }() + hs.SetHeight(300) + }() - h, err := hs.Sub(ctx, 200) - assert.NoError(t, err) - assert.NotNil(t, h) - } + err := hs.Wait(ctx, 200) + assert.NoError(t, err) } // Test heightSub's height cannot go down but only up. @@ -119,13 +98,14 @@ func TestHeightSubCancellation(t *testing.T) { defer cancel() h := headertest.RandDummyHeader(t) + h.HeightI %= 100 // make it a bit lower hs := newHeightSub[*headertest.DummyHeader]() - sub := make(chan *headertest.DummyHeader) + sub := make(chan struct{}) go func() { // subscribe first time - h, _ := hs.Sub(ctx, h.HeightI) - sub <- h + hs.Wait(ctx, h.Height()) + sub <- struct{}{} }() // give a bit time for subscription to settle @@ -134,19 +114,18 @@ func TestHeightSubCancellation(t *testing.T) { // subscribe again but with failed canceled context canceledCtx, cancel := context.WithCancel(ctx) cancel() - _, err := hs.Sub(canceledCtx, h.HeightI) - assert.Error(t, err) + err := hs.Wait(canceledCtx, h.Height()) + assert.ErrorIs(t, err, context.Canceled) - // publish header - hs.Pub(h) + // update height + hs.SetHeight(h.Height()) // ensure we still get our header select { - case subH := <-sub: - assert.Equal(t, h.HeightI, subH.HeightI) + case <-sub: case <-ctx.Done(): t.Error(ctx.Err()) } // ensure we don't have any active subscriptions - assert.Len(t, hs.heightReqs, 0) + assert.Len(t, hs.heightSubs, 0) } diff --git a/store/store.go b/store/store.go index e28af3a5..8d2ff28a 100644 --- a/store/store.go +++ b/store/store.go @@ -123,7 +123,7 @@ func (s *Store[H]) Init(ctx context.Context, initial H) error { log.Infow("initialized head", "height", initial.Height(), "hash", initial.Hash()) s.contiguousHead.Store(&initial) - s.heightSub.Pub(initial) + s.heightSub.SetHeight(initial.Height()) return nil } @@ -234,9 +234,10 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { // if the requested 'height' was not yet published // we subscribe to it if head := s.contiguousHead.Load(); head == nil || height > (*head).Height() { - h, err := s.heightSub.Sub(ctx, height) - if !errors.Is(err, errElapsedHeight) { - return h, err + err := s.heightSub.Wait(ctx, height) + if err != nil && !errors.Is(err, errElapsedHeight) { + var zero H + return zero, err } } // otherwise, the errElapsedHeight is thrown, From 674e4614f18bc80d59f7fddb3efa718f5b1c49e8 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 13 Jan 2025 14:07:22 +0100 Subject: [PATCH 11/44] fix tests --- p2p/server_test.go | 4 +++- store/heightsub_test.go | 2 +- store/store_test.go | 7 +++++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/p2p/server_test.go b/p2p/server_test.go index 4b19ede9..db2ab575 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -17,7 +17,9 @@ func TestExchangeServer_handleRequestTimeout(t *testing.T) { peer := createMocknet(t, 1) s, err := store.NewStore[*headertest.DummyHeader](datastore.NewMapDatastore()) require.NoError(t, err) - s.Init(context.Background(), headertest.RandDummyHeader(t)) + head := headertest.RandDummyHeader(t) + head.HeightI %= 1000 // make it a bit lower + s.Init(context.Background(), head) server, err := NewExchangeServer[*headertest.DummyHeader]( peer[0], s, diff --git a/store/heightsub_test.go b/store/heightsub_test.go index 67788f8c..f579ad76 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -98,7 +98,7 @@ func TestHeightSubCancellation(t *testing.T) { defer cancel() h := headertest.RandDummyHeader(t) - h.HeightI %= 100 // make it a bit lower + h.HeightI %= 1000 // make it a bit lower hs := newHeightSub[*headertest.DummyHeader]() sub := make(chan struct{}) diff --git a/store/store_test.go b/store/store_test.go index 8fd1a780..c6cbe1ea 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -284,6 +284,7 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { firstChunk := suite.GenDummyHeaders(5) missedChunk := suite.GenDummyHeaders(5) lastChunk := suite.GenDummyHeaders(5) + wantHead := lastChunk[len(lastChunk)-1] { latestHead := firstChunk[len(firstChunk)-1] @@ -301,8 +302,6 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { errCh := make(chan error, 1) go func() { - wantHead := lastChunk[len(lastChunk)-1] - shortCtx, shortCancel := context.WithTimeout(ctx, 3*time.Second) defer shortCancel() @@ -339,6 +338,10 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { select { case err := <-errCh: require.NoError(t, err) + + head, err := store.GetByHeight(ctx, wantHead.Height()) + require.NoError(t, err) + require.Equal(t, head, wantHead) default: t.Fatal("store.GetByHeight must not be blocked") } From a68fd8a771ddb670c43f917e1edd106492daa3ab Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 13 Jan 2025 15:06:48 +0100 Subject: [PATCH 12/44] fixes --- store/heightsub.go | 1 + sync/sync_test.go | 12 ++++++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 0d1cfc0e..853b2337 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -34,6 +34,7 @@ func (hs *heightSub[H]) Height() uint64 { } // SetHeight sets the new head height for heightSub. +// Unblocks all awaiting [Wait] calls in range from [heightSub.Height] to height. func (hs *heightSub[H]) SetHeight(height uint64) { for { curr := hs.height.Load() diff --git a/sync/sync_test.go b/sync/sync_test.go index de767412..b8d98b7d 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -229,10 +229,14 @@ func TestSyncPendingRangesWithMisses(t *testing.T) { have, err := localStore.Head(ctx) require.NoError(t, err) - // assert.Equal(t, exp.Height(), have.Height()) - // assert.Empty(t, syncer.pending.Head()) // assert all cache from pending is used - - return exp.Height() == have.Height() + switch { + case exp.Height() != have.Height(): + return false + case !syncer.pending.Head().IsZero(): + return false + default: + return true + } }, 2*time.Second, 100*time.Millisecond) } From 7c3cc9b55a518271489b519843c19c7fd5dbe390 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 13 Jan 2025 17:32:33 +0100 Subject: [PATCH 13/44] fix ctx --- store/store.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/store/store.go b/store/store.go index 8d2ff28a..a7815d83 100644 --- a/store/store.go +++ b/store/store.go @@ -518,12 +518,12 @@ func (s *Store[H]) advanceContiguousHead(ctx context.Context) { prevHeight := currHeight // TODO(cristaloleg): benchmark this timeout or make it dynamic. - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() + advCtx, advCancel := context.WithTimeout(ctx, 10*time.Second) + defer advCancel() var newHead H for { - h, err := s.getByHeight(ctx, currHeight+1) + h, err := s.getByHeight(advCtx, currHeight+1) if err != nil { break } @@ -537,7 +537,7 @@ func (s *Store[H]) advanceContiguousHead(ctx context.Context) { log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) s.metrics.newHead(newHead.Height()) - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() b, err := newHead.Hash().MarshalJSON() From 5d893813eb3985488ec2eafb7cb40236060bdb31 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 14 Jan 2025 13:06:44 +0100 Subject: [PATCH 14/44] review suggestions --- store/heightsub.go | 26 ++++++++++++++++---------- store/store.go | 12 ++++++++---- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 853b2337..623c5d88 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -41,21 +41,27 @@ func (hs *heightSub[H]) SetHeight(height uint64) { if curr >= height { return } - if hs.height.CompareAndSwap(curr, height) { - hs.heightReqsLk.Lock() - for ; curr <= height; curr++ { - sub, ok := hs.heightSubs[curr] - if ok { - close(sub) - delete(hs.heightSubs, curr) - } + if !hs.height.CompareAndSwap(curr, height) { + continue + } + + hs.heightReqsLk.Lock() + defer hs.heightReqsLk.Unlock() //nolint:gocritic we have a return below + + for ; curr <= height; curr++ { + sub, ok := hs.heightSubs[curr] + if ok { + close(sub) + delete(hs.heightSubs, curr) } - hs.heightReqsLk.Unlock() - return } + return } } +// Wait for a given height to be published. +// It can return errElapsedHeight, which means a requested height was already seen +// and caller should get it elsewhere. func (hs *heightSub[H]) Wait(ctx context.Context, height uint64) error { if hs.Height() >= height { return errElapsedHeight diff --git a/store/store.go b/store/store.go index a7815d83..d686ecee 100644 --- a/store/store.go +++ b/store/store.go @@ -231,6 +231,13 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { return zero, errors.New("header/store: height must be bigger than zero") } + // switch h, err := s.getByHeight(ctx, height); { + // case err == nil: + // return h, nil + // case ctx.Err() != nil: + // return zero, ctx.Err() + // } + // if the requested 'height' was not yet published // we subscribe to it if head := s.contiguousHead.Load(); head == nil || height > (*head).Height() { @@ -390,11 +397,8 @@ func (s *Store[H]) flushLoop() { for headers := range s.writes { // add headers to the pending and ensure they are accessible s.pending.Append(headers...) - // and notify waiters if any + increase current read head height - // it is important to do Pub after updating pending - // so pending is consistent with atomic Height counter on the heightSub - // s.heightSub.Pub(headers...) // try to advance contiguousHead if we don't have gaps. + // and notify waiters in heightSub. s.advanceContiguousHead(ctx) // don't flush and continue if pending batch is not grown enough, // and Store is not stopping(headers == nil) From b267831aba1f13763ebfdbc8d6c7e03cc4d807fe Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 14 Jan 2025 14:51:05 +0100 Subject: [PATCH 15/44] rebase --- store/heightsub_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/store/heightsub_test.go b/store/heightsub_test.go index f579ad76..b1decce0 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -42,16 +42,14 @@ func TestHeightSub(t *testing.T) { ch := make(chan error, 10) for range cap(ch) { go func() { - _, err := hs.Sub(ctx, 103) + err := hs.Wait(ctx, 103) ch <- err }() } time.Sleep(time.Millisecond * 10) - h3 := headertest.RandDummyHeader(t) - h3.HeightI = 103 - hs.Pub(h3) + hs.SetHeight(103) for range cap(ch) { assert.NoError(t, <-ch) From beda8d14a3a058380845961bf6ca6b4427cbef87 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 15 Jan 2025 13:20:15 +0100 Subject: [PATCH 16/44] tests --- store/heightsub_test.go | 46 +++++++++++++++++++++++++++++++++++++++++ store/store.go | 12 +++++------ store/store_test.go | 39 +++++++++++++++++++++++----------- 3 files changed, 79 insertions(+), 18 deletions(-) diff --git a/store/heightsub_test.go b/store/heightsub_test.go index b1decce0..894b62a1 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -57,6 +57,52 @@ func TestHeightSub(t *testing.T) { } } +func TestHeightSub_withWaitCancelled(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + hs := newHeightSub[*headertest.DummyHeader]() + hs.SetHeight(10) + + const waiters = 5 + + cancelChs := make([]chan error, waiters) + blockedChs := make([]chan error, waiters) + for i := range waiters { + cancelChs[i] = make(chan error, 1) + blockedChs[i] = make(chan error, 1) + + go func() { + ctx, cancel := context.WithTimeout(ctx, time.Duration(i+1)*time.Millisecond) + defer cancel() + + err := hs.Wait(ctx, 100) + cancelChs[i] <- err + }() + + go func() { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + err := hs.Wait(ctx, 100) + blockedChs[i] <- err + }() + } + + for i := range cancelChs { + err := <-cancelChs[i] + assert.ErrorIs(t, err, context.DeadlineExceeded) + } + + for i := range blockedChs { + select { + case <-blockedChs[i]: + t.Error("channel should be blocked") + default: + } + } +} + // Test heightSub can accept non-adj headers without an error. func TestHeightSubNonAdjacement(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/store/store.go b/store/store.go index d686ecee..d8e653ab 100644 --- a/store/store.go +++ b/store/store.go @@ -231,12 +231,12 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { return zero, errors.New("header/store: height must be bigger than zero") } - // switch h, err := s.getByHeight(ctx, height); { - // case err == nil: - // return h, nil - // case ctx.Err() != nil: - // return zero, ctx.Err() - // } + switch h, err := s.getByHeight(ctx, height); { + case err == nil: + return h, nil + case ctx.Err() != nil: + return zero, ctx.Err() + } // if the requested 'height' was not yet published // we subscribe to it diff --git a/store/store_test.go b/store/store_test.go index c6cbe1ea..fff62e50 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -284,7 +284,8 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { firstChunk := suite.GenDummyHeaders(5) missedChunk := suite.GenDummyHeaders(5) lastChunk := suite.GenDummyHeaders(5) - wantHead := lastChunk[len(lastChunk)-1] + wantPreLastHead := lastChunk[len(lastChunk)-2] + wantLastHead := lastChunk[len(lastChunk)-1] { latestHead := firstChunk[len(firstChunk)-1] @@ -300,18 +301,29 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { assert.Equal(t, head.Hash(), latestHead.Hash()) } - errCh := make(chan error, 1) + errChPreLast := make(chan error, 1) go func() { shortCtx, shortCancel := context.WithTimeout(ctx, 3*time.Second) defer shortCancel() - _, err := store.GetByHeight(shortCtx, wantHead.Height()) - errCh <- err + _, err := store.GetByHeight(shortCtx, wantPreLastHead.Height()) + errChPreLast <- err + }() + + errChLast := make(chan error, 1) + go func() { + shortCtx, shortCancel := context.WithTimeout(ctx, 3*time.Second) + defer shortCancel() + + _, err := store.GetByHeight(shortCtx, wantLastHead.Height()) + errChLast <- err }() select { - case err := <-errCh: - t.Fatalf("store.GetByHeight must be blocked, have error: %v", err) + case err := <-errChPreLast: + t.Fatalf("store.GetByHeight on prelast height MUST be blocked, have error: %v", err) + case err := <-errChLast: + t.Fatalf("store.GetByHeight on last height MUST be blocked, have error: %v", err) default: } @@ -323,9 +335,12 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { } select { - case err := <-errCh: - t.Fatalf("store.GetByHeight must be still blocked, have error: %v", err) + case err := <-errChPreLast: + t.Fatalf("store.GetByHeight on prelast height MUST be blocked, have error: %v", err) + case err := <-errChLast: + require.NoError(t, err) default: + t.Fatalf("store.GetByHeight on last height MUST NOT be blocked, have error: %v", err) } { @@ -336,14 +351,14 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { } select { - case err := <-errCh: + case err := <-errChPreLast: require.NoError(t, err) - head, err := store.GetByHeight(ctx, wantHead.Height()) + head, err := store.GetByHeight(ctx, wantLastHead.Height()) require.NoError(t, err) - require.Equal(t, head, wantHead) + require.Equal(t, head, wantLastHead) default: - t.Fatal("store.GetByHeight must not be blocked") + t.Fatal("store.GetByHeight on last height MUST NOT be blocked") } } From 311561b6e06f3088467c631d905b88e97a44f9c6 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 15 Jan 2025 15:47:29 +0100 Subject: [PATCH 17/44] more tests --- store/heightsub.go | 63 ++++++++++++++++++++++++++++++++------------- store/store.go | 51 ++++++++++++++++++------------------ store/store_test.go | 18 ++++++++----- 3 files changed, 82 insertions(+), 50 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 623c5d88..4789726c 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -17,14 +17,19 @@ type heightSub[H header.Header[H]] struct { // height refers to the latest locally available header height // that has been fully verified and inserted into the subjective chain height atomic.Uint64 - heightReqsLk sync.Mutex - heightSubs map[uint64]chan struct{} + heightSubsLk sync.Mutex + heightSubs map[uint64]*signalAndCounter +} + +type signalAndCounter struct { + signal chan struct{} + count int } // newHeightSub instantiates new heightSub. func newHeightSub[H header.Header[H]]() *heightSub[H] { return &heightSub[H]{ - heightSubs: make(map[uint64]chan struct{}), + heightSubs: make(map[uint64]*signalAndCounter), } } @@ -45,13 +50,13 @@ func (hs *heightSub[H]) SetHeight(height uint64) { continue } - hs.heightReqsLk.Lock() - defer hs.heightReqsLk.Unlock() //nolint:gocritic we have a return below + hs.heightSubsLk.Lock() + defer hs.heightSubsLk.Unlock() //nolint:gocritic we have a return below for ; curr <= height; curr++ { - sub, ok := hs.heightSubs[curr] + sac, ok := hs.heightSubs[curr] if ok { - close(sub) + close(sac.signal) delete(hs.heightSubs, curr) } } @@ -67,31 +72,53 @@ func (hs *heightSub[H]) Wait(ctx context.Context, height uint64) error { return errElapsedHeight } - hs.heightReqsLk.Lock() + hs.heightSubsLk.Lock() if hs.Height() >= height { // This is a rare case we have to account for. // The lock above can park a goroutine long enough for hs.height to change for a requested height, // leaving the request never fulfilled and the goroutine deadlocked. - hs.heightReqsLk.Unlock() + hs.heightSubsLk.Unlock() return errElapsedHeight } - sub, ok := hs.heightSubs[height] + sac, ok := hs.heightSubs[height] if !ok { - sub = make(chan struct{}, 1) - hs.heightSubs[height] = sub + sac = &signalAndCounter{ + signal: make(chan struct{}, 1), + } + hs.heightSubs[height] = sac } - hs.heightReqsLk.Unlock() + sac.count++ + hs.heightSubsLk.Unlock() select { - case <-sub: + case <-sac.signal: return nil case <-ctx.Done(): // no need to keep the request, if the op has canceled - hs.heightReqsLk.Lock() - close(sub) - delete(hs.heightSubs, height) - hs.heightReqsLk.Unlock() + hs.heightSubsLk.Lock() + sac.count-- + if sac.count == 0 { + close(sac.signal) + delete(hs.heightSubs, height) + } + hs.heightSubsLk.Unlock() return ctx.Err() } } + +// UnblockHeight and release the waiters in [Wait]. +// Note: do not advance heightSub's height. +func (hs *heightSub[H]) UnblockHeight(height uint64) { + hs.heightSubsLk.Lock() + defer hs.heightSubsLk.Unlock() + + sac, ok := hs.heightSubs[height] + if ok { + sac.count-- + if sac.count == 0 { + close(sac.signal) + } + delete(hs.heightSubs, height) + } +} diff --git a/store/store.go b/store/store.go index d8e653ab..bc0d04bd 100644 --- a/store/store.go +++ b/store/store.go @@ -231,19 +231,11 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { return zero, errors.New("header/store: height must be bigger than zero") } - switch h, err := s.getByHeight(ctx, height); { - case err == nil: - return h, nil - case ctx.Err() != nil: - return zero, ctx.Err() - } - // if the requested 'height' was not yet published // we subscribe to it if head := s.contiguousHead.Load(); head == nil || height > (*head).Height() { err := s.heightSub.Wait(ctx, height) if err != nil && !errors.Is(err, errElapsedHeight) { - var zero H return zero, err } } @@ -399,7 +391,7 @@ func (s *Store[H]) flushLoop() { s.pending.Append(headers...) // try to advance contiguousHead if we don't have gaps. // and notify waiters in heightSub. - s.advanceContiguousHead(ctx) + s.advanceContiguousHead(ctx, headers...) // don't flush and continue if pending batch is not grown enough, // and Store is not stopping(headers == nil) if s.pending.Len() < s.Params.WriteBatchSize && headers != nil { @@ -513,7 +505,12 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { } // try advance contiguous head based on already written headers. -func (s *Store[H]) advanceContiguousHead(ctx context.Context) { +func (s *Store[H]) advanceContiguousHead(ctx context.Context, headers ...H) { + // always inform heightSub about new headers seen + for _, h := range headers { + s.heightSub.UnblockHeight(h.Height()) + } + currHead := s.contiguousHead.Load() if currHead == nil { return @@ -536,24 +533,28 @@ func (s *Store[H]) advanceContiguousHead(ctx context.Context) { } if currHeight > prevHeight { - s.contiguousHead.Store(&newHead) - s.heightSub.SetHeight(currHeight) - log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) - s.metrics.newHead(newHead.Height()) + s.updateContiguousHead(newHead, currHeight) + } +} - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() +func (s *Store[H]) updateContiguousHead(newHead H, newHeight uint64) { + s.contiguousHead.Store(&newHead) + s.heightSub.UnblockHeight(newHeight) + log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) + s.metrics.newHead(newHead.Height()) - b, err := newHead.Hash().MarshalJSON() - if err != nil { - log.Errorw("cannot marshal new head", - "height", newHead.Height(), "hash", newHead.Hash(), "err", err) - } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() - if err := s.ds.Put(ctx, headKey, b); err != nil { - log.Errorw("cannot put new head", - "height", newHead.Height(), "hash", newHead.Hash(), "err", err) - } + b, err := newHead.Hash().MarshalJSON() + if err != nil { + log.Errorw("cannot marshal new head", + "height", newHead.Height(), "hash", newHead.Hash(), "err", err) + } + + if err := s.ds.Put(ctx, headKey, b); err != nil { + log.Errorw("cannot put new head", + "height", newHead.Height(), "hash", newHead.Hash(), "err", err) } } diff --git a/store/store_test.go b/store/store_test.go index fff62e50..a660c8a1 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -284,7 +284,8 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { firstChunk := suite.GenDummyHeaders(5) missedChunk := suite.GenDummyHeaders(5) lastChunk := suite.GenDummyHeaders(5) - wantPreLastHead := lastChunk[len(lastChunk)-2] + + wantMissHead := missedChunk[len(missedChunk)-2] wantLastHead := lastChunk[len(lastChunk)-1] { @@ -301,13 +302,13 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { assert.Equal(t, head.Hash(), latestHead.Hash()) } - errChPreLast := make(chan error, 1) + errChMiss := make(chan error, 1) go func() { shortCtx, shortCancel := context.WithTimeout(ctx, 3*time.Second) defer shortCancel() - _, err := store.GetByHeight(shortCtx, wantPreLastHead.Height()) - errChPreLast <- err + _, err := store.GetByHeight(shortCtx, wantMissHead.Height()) + errChMiss <- err }() errChLast := make(chan error, 1) @@ -319,8 +320,11 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { errChLast <- err }() + // wait for goroutines start + time.Sleep(100 * time.Millisecond) + select { - case err := <-errChPreLast: + case err := <-errChMiss: t.Fatalf("store.GetByHeight on prelast height MUST be blocked, have error: %v", err) case err := <-errChLast: t.Fatalf("store.GetByHeight on last height MUST be blocked, have error: %v", err) @@ -335,7 +339,7 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { } select { - case err := <-errChPreLast: + case err := <-errChMiss: t.Fatalf("store.GetByHeight on prelast height MUST be blocked, have error: %v", err) case err := <-errChLast: require.NoError(t, err) @@ -351,7 +355,7 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { } select { - case err := <-errChPreLast: + case err := <-errChMiss: require.NoError(t, err) head, err := store.GetByHeight(ctx, wantLastHead.Height()) From 22a4b2fb94b27cc71fdfa48c192d86cef1fc17a9 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 22 Jan 2025 11:29:52 +0100 Subject: [PATCH 18/44] review suggestions --- store/store_test.go | 72 +++++++++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 32 deletions(-) diff --git a/store/store_test.go b/store/store_test.go index a660c8a1..d845ff91 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -1,6 +1,7 @@ package store import ( + "bytes" "context" "math/rand" stdsync "sync" @@ -188,14 +189,20 @@ func TestStore_Append(t *testing.T) { // wait for batch to be written. time.Sleep(100 * time.Millisecond) - // assert.Eventually(t, func() bool { - head, err = store.Head(ctx) - assert.NoError(t, err) - assert.Equal(t, int(head.Height()), int(headers[len(headers)-1].Height())) - - // return int(head.Height()) == int(headers[len(headers)-1].Height()) - // }, time.Second, time.Millisecond) - assert.Equal(t, head.Hash(), headers[len(headers)-1].Hash()) + assert.Eventually(t, func() bool { + head, err = store.Head(ctx) + assert.NoError(t, err) + assert.Equal(t, int(head.Height()), int(headers[len(headers)-1].Height())) + + switch { + case int(head.Height()) != int(headers[len(headers)-1].Height()): + return false + case !bytes.Equal(head.Hash(), headers[len(headers)-1].Hash()): + return false + default: + return true + } + }, time.Second, time.Millisecond) } func TestStore_Append_stableHeadWhenGaps(t *testing.T) { @@ -281,14 +288,8 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { require.NoError(t, err) assert.Equal(t, head.Hash(), suite.Head().Hash()) - firstChunk := suite.GenDummyHeaders(5) - missedChunk := suite.GenDummyHeaders(5) - lastChunk := suite.GenDummyHeaders(5) - - wantMissHead := missedChunk[len(missedChunk)-2] - wantLastHead := lastChunk[len(lastChunk)-1] - { + firstChunk := suite.GenDummyHeaders(5) latestHead := firstChunk[len(firstChunk)-1] err := store.Append(ctx, firstChunk...) @@ -302,6 +303,9 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { assert.Equal(t, head.Hash(), latestHead.Hash()) } + missedChunk := suite.GenDummyHeaders(5) + wantMissHead := missedChunk[len(missedChunk)-2] + errChMiss := make(chan error, 1) go func() { shortCtx, shortCancel := context.WithTimeout(ctx, 3*time.Second) @@ -311,6 +315,9 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { errChMiss <- err }() + lastChunk := suite.GenDummyHeaders(5) + wantLastHead := lastChunk[len(lastChunk)-1] + errChLast := make(chan error, 1) go func() { shortCtx, shortCancel := context.WithTimeout(ctx, 3*time.Second) @@ -329,6 +336,7 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { case err := <-errChLast: t.Fatalf("store.GetByHeight on last height MUST be blocked, have error: %v", err) default: + // ok } { @@ -336,15 +344,15 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { require.NoError(t, err) // wait for batch to be written. time.Sleep(100 * time.Millisecond) - } - select { - case err := <-errChMiss: - t.Fatalf("store.GetByHeight on prelast height MUST be blocked, have error: %v", err) - case err := <-errChLast: - require.NoError(t, err) - default: - t.Fatalf("store.GetByHeight on last height MUST NOT be blocked, have error: %v", err) + select { + case err := <-errChMiss: + t.Fatalf("store.GetByHeight on prelast height MUST be blocked, have error: %v", err) + case err := <-errChLast: + require.NoError(t, err) + default: + t.Fatalf("store.GetByHeight on last height MUST NOT be blocked, have error: %v", err) + } } { @@ -352,17 +360,17 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { require.NoError(t, err) // wait for batch to be written. time.Sleep(100 * time.Millisecond) - } - select { - case err := <-errChMiss: - require.NoError(t, err) + select { + case err := <-errChMiss: + require.NoError(t, err) - head, err := store.GetByHeight(ctx, wantLastHead.Height()) - require.NoError(t, err) - require.Equal(t, head, wantLastHead) - default: - t.Fatal("store.GetByHeight on last height MUST NOT be blocked") + head, err := store.GetByHeight(ctx, wantLastHead.Height()) + require.NoError(t, err) + require.Equal(t, head, wantLastHead) + default: + t.Fatal("store.GetByHeight on last height MUST NOT be blocked") + } } } From 26e66215389c730346b3d29b4e300f0e4a0274a3 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 22 Jan 2025 11:48:02 +0100 Subject: [PATCH 19/44] small refactoring --- store/heightsub.go | 28 +++++++++++++--------------- store/store.go | 2 +- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 4789726c..aa40b651 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -54,11 +54,7 @@ func (hs *heightSub[H]) SetHeight(height uint64) { defer hs.heightSubsLk.Unlock() //nolint:gocritic we have a return below for ; curr <= height; curr++ { - sac, ok := hs.heightSubs[curr] - if ok { - close(sac.signal) - delete(hs.heightSubs, curr) - } + hs.unblockHeight(curr, true) } return } @@ -97,11 +93,7 @@ func (hs *heightSub[H]) Wait(ctx context.Context, height uint64) error { case <-ctx.Done(): // no need to keep the request, if the op has canceled hs.heightSubsLk.Lock() - sac.count-- - if sac.count == 0 { - close(sac.signal) - delete(hs.heightSubs, height) - } + hs.unblockHeight(height, false) hs.heightSubsLk.Unlock() return ctx.Err() } @@ -113,12 +105,18 @@ func (hs *heightSub[H]) UnblockHeight(height uint64) { hs.heightSubsLk.Lock() defer hs.heightSubsLk.Unlock() + hs.unblockHeight(height, true) +} + +func (hs *heightSub[H]) unblockHeight(height uint64, all bool) { sac, ok := hs.heightSubs[height] - if ok { - sac.count-- - if sac.count == 0 { - close(sac.signal) - } + if !ok { + return + } + + sac.count-- + if all || sac.count == 0 { + close(sac.signal) delete(hs.heightSubs, height) } } diff --git a/store/store.go b/store/store.go index bc0d04bd..caec3320 100644 --- a/store/store.go +++ b/store/store.go @@ -539,7 +539,7 @@ func (s *Store[H]) advanceContiguousHead(ctx context.Context, headers ...H) { func (s *Store[H]) updateContiguousHead(newHead H, newHeight uint64) { s.contiguousHead.Store(&newHead) - s.heightSub.UnblockHeight(newHeight) + s.heightSub.SetHeight(newHeight) log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) s.metrics.newHead(newHead.Height()) From 06e73c132e895ac02753091283ac636bb9c20cd7 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 23 Jan 2025 16:20:12 +0100 Subject: [PATCH 20/44] add heightSub.Init --- store/heightsub.go | 15 +++++++++++++++ store/heightsub_test.go | 9 ++++----- store/store.go | 2 +- store/store_test.go | 7 ++++--- 4 files changed, 24 insertions(+), 9 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index aa40b651..80d81666 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -33,6 +33,21 @@ func newHeightSub[H header.Header[H]]() *heightSub[H] { } } +// Init the heightSub with a given height. +// Unblocks all awaiting [Wait] calls lower than height. +func (hs *heightSub[H]) Init(height uint64) { + hs.height.Store(height) + + hs.heightSubsLk.Lock() + defer hs.heightSubsLk.Unlock() + + for h := range hs.heightSubs { + if h < height { + hs.unblockHeight(h, true) + } + } +} + // Height reports current height. func (hs *heightSub[H]) Height() uint64 { return hs.height.Load() diff --git a/store/heightsub_test.go b/store/heightsub_test.go index 894b62a1..1c5118ff 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -18,7 +18,7 @@ func TestHeightSub(t *testing.T) { // assert subscription returns nil for past heights { - hs.SetHeight(99) + hs.Init(99) err := hs.Wait(ctx, 10) assert.ErrorIs(t, err, errElapsedHeight) @@ -62,7 +62,7 @@ func TestHeightSub_withWaitCancelled(t *testing.T) { defer cancel() hs := newHeightSub[*headertest.DummyHeader]() - hs.SetHeight(10) + hs.Init(10) const waiters = 5 @@ -109,8 +109,7 @@ func TestHeightSubNonAdjacement(t *testing.T) { defer cancel() hs := newHeightSub[*headertest.DummyHeader]() - - hs.SetHeight(99) + hs.Init(99) go func() { // fixes flakiness on CI @@ -127,7 +126,7 @@ func TestHeightSubNonAdjacement(t *testing.T) { func TestHeightSub_monotonicHeight(t *testing.T) { hs := newHeightSub[*headertest.DummyHeader]() - hs.SetHeight(99) + hs.Init(99) assert.Equal(t, int64(hs.height.Load()), int64(99)) hs.SetHeight(300) diff --git a/store/store.go b/store/store.go index caec3320..287acef1 100644 --- a/store/store.go +++ b/store/store.go @@ -123,7 +123,7 @@ func (s *Store[H]) Init(ctx context.Context, initial H) error { log.Infow("initialized head", "height", initial.Height(), "hash", initial.Hash()) s.contiguousHead.Store(&initial) - s.heightSub.SetHeight(initial.Height()) + s.heightSub.Init(initial.Height()) return nil } diff --git a/store/store_test.go b/store/store_test.go index d845ff91..804d4c25 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/celestiaorg/go-header" "github.com/celestiaorg/go-header/headertest" ) @@ -482,6 +483,7 @@ func TestBatch_GetByHeightBeforeInit(t *testing.T) { t.Cleanup(cancel) suite := headertest.NewTestSuite(t) + suite.Head().HeightI = 1_000_000 ds := sync.MutexWrap(datastore.NewMapDatastore()) store, err := NewStore[*headertest.DummyHeader](ds) @@ -494,9 +496,8 @@ func TestBatch_GetByHeightBeforeInit(t *testing.T) { _ = store.Init(ctx, suite.Head()) }() - h, err := store.GetByHeight(ctx, 1) - require.NoError(t, err) - require.NotNil(t, h) + _, err = store.GetByHeight(ctx, 1) + require.ErrorIs(t, err, header.ErrNotFound) } func TestStoreInit(t *testing.T) { From b5060847be0aad8a76be7cad27a2b2cab742383d Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 23 Jan 2025 16:34:02 +0100 Subject: [PATCH 21/44] do 1 more fetch before subscribe --- store/store.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/store/store.go b/store/store.go index 287acef1..d7962bc8 100644 --- a/store/store.go +++ b/store/store.go @@ -231,6 +231,10 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { return zero, errors.New("header/store: height must be bigger than zero") } + if h, err := s.getByHeight(ctx, height); err == nil || ctx.Err() != nil { + return h, err + } + // if the requested 'height' was not yet published // we subscribe to it if head := s.contiguousHead.Load(); head == nil || height > (*head).Height() { From 7b5b2826a755a66e9c2c4a41bd93749bebebe36f Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Fri, 24 Jan 2025 11:15:36 +0100 Subject: [PATCH 22/44] review suggestions --- store/store.go | 24 ++++++------------------ store/store_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/store/store.go b/store/store.go index d7962bc8..4dd0fe10 100644 --- a/store/store.go +++ b/store/store.go @@ -165,19 +165,7 @@ func (s *Store[H]) Stop(ctx context.Context) error { } func (s *Store[H]) Height() uint64 { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - - head, err := s.Head(ctx) - if err != nil { - if errors.Is(err, context.Canceled) || - errors.Is(err, context.DeadlineExceeded) || - errors.Is(err, datastore.ErrNotFound) { - return 0 - } - panic(err) - } - return head.Height() + return s.heightSub.Height() } func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) { @@ -231,8 +219,8 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { return zero, errors.New("header/store: height must be bigger than zero") } - if h, err := s.getByHeight(ctx, height); err == nil || ctx.Err() != nil { - return h, err + if h, err := s.getByHeight(ctx, height); err == nil { + return h, nil } // if the requested 'height' was not yet published @@ -537,17 +525,17 @@ func (s *Store[H]) advanceContiguousHead(ctx context.Context, headers ...H) { } if currHeight > prevHeight { - s.updateContiguousHead(newHead, currHeight) + s.updateContiguousHead(ctx, newHead, currHeight) } } -func (s *Store[H]) updateContiguousHead(newHead H, newHeight uint64) { +func (s *Store[H]) updateContiguousHead(ctx context.Context, newHead H, newHeight uint64) { s.contiguousHead.Store(&newHead) s.heightSub.SetHeight(newHeight) log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) s.metrics.newHead(newHead.Height()) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() b, err := newHead.Hash().MarshalJSON() diff --git a/store/store_test.go b/store/store_test.go index 804d4c25..a152a42b 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -375,6 +375,34 @@ func TestStoreGetByHeight_whenGaps(t *testing.T) { } } +func TestStoreGetByHeight_earlyAvailable(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + const skippedHeaders = 15 + suite.GenDummyHeaders(skippedHeaders) + lastChunk := suite.GenDummyHeaders(1) + + { + err := store.Append(ctx, lastChunk...) + require.NoError(t, err) + + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + } + + { + h, err := store.GetByHeight(ctx, lastChunk[0].Height()) + require.NoError(t, err) + require.Equal(t, h, lastChunk[0]) + } +} + // TestStore_GetRange tests possible combinations of requests and ensures that // the store can handle them adequately (even malformed requests) func TestStore_GetRange(t *testing.T) { From ac9eb8c0eee8b8b75c55402361f0091e3ea6557c Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Fri, 24 Jan 2025 16:19:34 +0100 Subject: [PATCH 23/44] do advance at start --- store/store.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/store/store.go b/store/store.go index 4dd0fe10..74e2d487 100644 --- a/store/store.go +++ b/store/store.go @@ -376,6 +376,9 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { // (1) Appends not to be blocked on long disk IO writes and underlying DB compactions // (2) Batching header writes func (s *Store[H]) flushLoop() { + // advance based on what we have on disk. + s.doAdvanceContiguousHead(context.Background(), s.Height()) + defer close(s.writesDn) ctx := context.Background() for headers := range s.writes { @@ -383,7 +386,7 @@ func (s *Store[H]) flushLoop() { s.pending.Append(headers...) // try to advance contiguousHead if we don't have gaps. // and notify waiters in heightSub. - s.advanceContiguousHead(ctx, headers...) + s.tryAdvanceContiguousHead(ctx, headers...) // don't flush and continue if pending batch is not grown enough, // and Store is not stopping(headers == nil) if s.pending.Len() < s.Params.WriteBatchSize && headers != nil { @@ -497,17 +500,19 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { } // try advance contiguous head based on already written headers. -func (s *Store[H]) advanceContiguousHead(ctx context.Context, headers ...H) { +func (s *Store[H]) tryAdvanceContiguousHead(ctx context.Context, headers ...H) { // always inform heightSub about new headers seen for _, h := range headers { s.heightSub.UnblockHeight(h.Height()) } currHead := s.contiguousHead.Load() - if currHead == nil { - return + if currHead != nil { + s.doAdvanceContiguousHead(ctx, (*currHead).Height()) } - currHeight := (*currHead).Height() +} + +func (s *Store[H]) doAdvanceContiguousHead(ctx context.Context, currHeight uint64) { prevHeight := currHeight // TODO(cristaloleg): benchmark this timeout or make it dynamic. From 7a53bc79f2eb3838359d535a378345f4508fd1af Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 27 Jan 2025 11:04:06 +0100 Subject: [PATCH 24/44] load key on start --- store/store.go | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/store/store.go b/store/store.go index 74e2d487..8a1832ec 100644 --- a/store/store.go +++ b/store/store.go @@ -174,18 +174,7 @@ func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, erro } var zero H - head, err := s.readHead(ctx) - switch { - default: - return zero, err - case errors.Is(err, datastore.ErrNotFound), errors.Is(err, header.ErrNotFound): - return zero, header.ErrNoHead - case err == nil: - s.contiguousHead.Store(&head) - s.heightSub.SetHeight(head.Height()) - log.Infow("loaded head", "height", head.Height(), "hash", head.Hash()) - return head, nil - } + return zero, header.ErrNoHead } func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) { @@ -376,8 +365,7 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { // (1) Appends not to be blocked on long disk IO writes and underlying DB compactions // (2) Batching header writes func (s *Store[H]) flushLoop() { - // advance based on what we have on disk. - s.doAdvanceContiguousHead(context.Background(), s.Height()) + s.loadHeadKey(context.Background()) defer close(s.writesDn) ctx := context.Background() @@ -467,6 +455,16 @@ func (s *Store[H]) flush(ctx context.Context, headers ...H) error { return batch.Commit(ctx) } +func (s *Store[H]) loadHeadKey(ctx context.Context) error { + h, err := s.readHead(ctx) + if err != nil { + return err + } + + s.doAdvanceContiguousHead(ctx, h.Height()) + return nil +} + // readHead loads the head from the datastore. func (s *Store[H]) readHead(ctx context.Context) (H, error) { var zero H @@ -513,12 +511,11 @@ func (s *Store[H]) tryAdvanceContiguousHead(ctx context.Context, headers ...H) { } func (s *Store[H]) doAdvanceContiguousHead(ctx context.Context, currHeight uint64) { - prevHeight := currHeight - // TODO(cristaloleg): benchmark this timeout or make it dynamic. advCtx, advCancel := context.WithTimeout(ctx, 10*time.Second) defer advCancel() + prevHeight := currHeight var newHead H for { h, err := s.getByHeight(advCtx, currHeight+1) From 3089d68f9e7a66e65d19e28d78cbf689e4692a7b Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 27 Jan 2025 11:29:53 +0100 Subject: [PATCH 25/44] fix finding --- store/store.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/store/store.go b/store/store.go index 8a1832ec..74ce96cc 100644 --- a/store/store.go +++ b/store/store.go @@ -135,6 +135,10 @@ func (s *Store[H]) Start(context.Context) error { default: } + if err := s.loadHeadKey(context.Background()); err != nil { + log.Errorw("cannot load headKey", "err", err) + } + go s.flushLoop() return nil } @@ -365,8 +369,6 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { // (1) Appends not to be blocked on long disk IO writes and underlying DB compactions // (2) Batching header writes func (s *Store[H]) flushLoop() { - s.loadHeadKey(context.Background()) - defer close(s.writesDn) ctx := context.Background() for headers := range s.writes { @@ -461,7 +463,11 @@ func (s *Store[H]) loadHeadKey(ctx context.Context) error { return err } - s.doAdvanceContiguousHead(ctx, h.Height()) + newHeight := s.doAdvanceContiguousHead(ctx, h.Height()) + if newHeight >= h.Height() { + s.contiguousHead.Store(&h) + s.heightSub.SetHeight(h.Height()) + } return nil } @@ -510,7 +516,9 @@ func (s *Store[H]) tryAdvanceContiguousHead(ctx context.Context, headers ...H) { } } -func (s *Store[H]) doAdvanceContiguousHead(ctx context.Context, currHeight uint64) { +// doAdvanceContiguousHead return a new highest contiguous height +// or a given if not found. +func (s *Store[H]) doAdvanceContiguousHead(ctx context.Context, currHeight uint64) uint64 { // TODO(cristaloleg): benchmark this timeout or make it dynamic. advCtx, advCancel := context.WithTimeout(ctx, 10*time.Second) defer advCancel() @@ -529,6 +537,7 @@ func (s *Store[H]) doAdvanceContiguousHead(ctx context.Context, currHeight uint6 if currHeight > prevHeight { s.updateContiguousHead(ctx, newHead, currHeight) } + return currHeight } func (s *Store[H]) updateContiguousHead(ctx context.Context, newHead H, newHeight uint64) { From 75b01413b0f05803d6b2e5bb5328c7bb20f6a9b7 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 27 Jan 2025 12:15:07 +0100 Subject: [PATCH 26/44] review suggestions --- store/heightsub.go | 8 ++++---- store/store.go | 9 ++++++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 80d81666..148cb32d 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -18,10 +18,10 @@ type heightSub[H header.Header[H]] struct { // that has been fully verified and inserted into the subjective chain height atomic.Uint64 heightSubsLk sync.Mutex - heightSubs map[uint64]*signalAndCounter + heightSubs map[uint64]*sub } -type signalAndCounter struct { +type sub struct { signal chan struct{} count int } @@ -29,7 +29,7 @@ type signalAndCounter struct { // newHeightSub instantiates new heightSub. func newHeightSub[H header.Header[H]]() *heightSub[H] { return &heightSub[H]{ - heightSubs: make(map[uint64]*signalAndCounter), + heightSubs: make(map[uint64]*sub), } } @@ -94,7 +94,7 @@ func (hs *heightSub[H]) Wait(ctx context.Context, height uint64) error { sac, ok := hs.heightSubs[height] if !ok { - sac = &signalAndCounter{ + sac = &sub{ signal: make(chan struct{}, 1), } hs.heightSubs[height] = sac diff --git a/store/store.go b/store/store.go index 74ce96cc..6a9e6b9b 100644 --- a/store/store.go +++ b/store/store.go @@ -127,7 +127,7 @@ func (s *Store[H]) Init(ctx context.Context, initial H) error { return nil } -func (s *Store[H]) Start(context.Context) error { +func (s *Store[H]) Start(ctx context.Context) error { // closed s.writesDn means that store was stopped before, recreate chan. select { case <-s.writesDn: @@ -135,8 +135,11 @@ func (s *Store[H]) Start(context.Context) error { default: } - if err := s.loadHeadKey(context.Background()); err != nil { - log.Errorw("cannot load headKey", "err", err) + if err := s.loadHeadKey(ctx); err != nil { + // we might start on an empty datastore, no key is okay. + if !errors.Is(err, datastore.ErrNotFound) { + return fmt.Errorf("header/store: cannot load headKey: %w", err) + } } go s.flushLoop() From 8194f52db78934cfa28fda1daa09f33f20e0f833 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 27 Jan 2025 16:02:39 +0100 Subject: [PATCH 27/44] rename to a better name --- store/store.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/store/store.go b/store/store.go index 6a9e6b9b..c36a835c 100644 --- a/store/store.go +++ b/store/store.go @@ -377,9 +377,9 @@ func (s *Store[H]) flushLoop() { for headers := range s.writes { // add headers to the pending and ensure they are accessible s.pending.Append(headers...) - // try to advance contiguousHead if we don't have gaps. - // and notify waiters in heightSub. - s.tryAdvanceContiguousHead(ctx, headers...) + // notify waiters in heightSub and advance contiguousHead + // if we don't have gaps. + s.unblockAndAdvance(ctx, headers...) // don't flush and continue if pending batch is not grown enough, // and Store is not stopping(headers == nil) if s.pending.Len() < s.Params.WriteBatchSize && headers != nil { @@ -466,7 +466,7 @@ func (s *Store[H]) loadHeadKey(ctx context.Context) error { return err } - newHeight := s.doAdvanceContiguousHead(ctx, h.Height()) + newHeight := s.advanceContiguousHead(ctx, h.Height()) if newHeight >= h.Height() { s.contiguousHead.Store(&h) s.heightSub.SetHeight(h.Height()) @@ -506,8 +506,9 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { return data, nil } -// try advance contiguous head based on already written headers. -func (s *Store[H]) tryAdvanceContiguousHead(ctx context.Context, headers ...H) { +// unblockAndAdvance will notify waiters in heightSub and advance contiguousHead +// based on already written headers. +func (s *Store[H]) unblockAndAdvance(ctx context.Context, headers ...H) { // always inform heightSub about new headers seen for _, h := range headers { s.heightSub.UnblockHeight(h.Height()) @@ -515,13 +516,13 @@ func (s *Store[H]) tryAdvanceContiguousHead(ctx context.Context, headers ...H) { currHead := s.contiguousHead.Load() if currHead != nil { - s.doAdvanceContiguousHead(ctx, (*currHead).Height()) + s.advanceContiguousHead(ctx, (*currHead).Height()) } } -// doAdvanceContiguousHead return a new highest contiguous height +// advanceContiguousHead return a new highest contiguous height // or a given if not found. -func (s *Store[H]) doAdvanceContiguousHead(ctx context.Context, currHeight uint64) uint64 { +func (s *Store[H]) advanceContiguousHead(ctx context.Context, currHeight uint64) uint64 { // TODO(cristaloleg): benchmark this timeout or make it dynamic. advCtx, advCancel := context.WithTimeout(ctx, 10*time.Second) defer advCancel() From 1375bd399623274e29923cc797f6a5656469dc09 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 27 Jan 2025 16:14:21 +0100 Subject: [PATCH 28/44] even better names --- store/heightsub.go | 18 +++++++++--------- store/store.go | 8 ++++---- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 148cb32d..7ca90194 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -34,7 +34,7 @@ func newHeightSub[H header.Header[H]]() *heightSub[H] { } // Init the heightSub with a given height. -// Unblocks all awaiting [Wait] calls lower than height. +// Notifies all awaiting [Wait] calls lower than height. func (hs *heightSub[H]) Init(height uint64) { hs.height.Store(height) @@ -43,7 +43,7 @@ func (hs *heightSub[H]) Init(height uint64) { for h := range hs.heightSubs { if h < height { - hs.unblockHeight(h, true) + hs.notifyHeight(h, true) } } } @@ -54,7 +54,7 @@ func (hs *heightSub[H]) Height() uint64 { } // SetHeight sets the new head height for heightSub. -// Unblocks all awaiting [Wait] calls in range from [heightSub.Height] to height. +// Notifies all awaiting [Wait] calls in range from [heightSub.Height] to height. func (hs *heightSub[H]) SetHeight(height uint64) { for { curr := hs.height.Load() @@ -69,7 +69,7 @@ func (hs *heightSub[H]) SetHeight(height uint64) { defer hs.heightSubsLk.Unlock() //nolint:gocritic we have a return below for ; curr <= height; curr++ { - hs.unblockHeight(curr, true) + hs.notifyHeight(curr, true) } return } @@ -108,22 +108,22 @@ func (hs *heightSub[H]) Wait(ctx context.Context, height uint64) error { case <-ctx.Done(): // no need to keep the request, if the op has canceled hs.heightSubsLk.Lock() - hs.unblockHeight(height, false) + hs.notifyHeight(height, false) hs.heightSubsLk.Unlock() return ctx.Err() } } -// UnblockHeight and release the waiters in [Wait]. +// NotifyHeight and release the waiters in [Wait]. // Note: do not advance heightSub's height. -func (hs *heightSub[H]) UnblockHeight(height uint64) { +func (hs *heightSub[H]) NotifyHeight(height uint64) { hs.heightSubsLk.Lock() defer hs.heightSubsLk.Unlock() - hs.unblockHeight(height, true) + hs.notifyHeight(height, true) } -func (hs *heightSub[H]) unblockHeight(height uint64, all bool) { +func (hs *heightSub[H]) notifyHeight(height uint64, all bool) { sac, ok := hs.heightSubs[height] if !ok { return diff --git a/store/store.go b/store/store.go index c36a835c..93a34ee3 100644 --- a/store/store.go +++ b/store/store.go @@ -379,7 +379,7 @@ func (s *Store[H]) flushLoop() { s.pending.Append(headers...) // notify waiters in heightSub and advance contiguousHead // if we don't have gaps. - s.unblockAndAdvance(ctx, headers...) + s.notifyAndAdvance(ctx, headers...) // don't flush and continue if pending batch is not grown enough, // and Store is not stopping(headers == nil) if s.pending.Len() < s.Params.WriteBatchSize && headers != nil { @@ -506,12 +506,12 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { return data, nil } -// unblockAndAdvance will notify waiters in heightSub and advance contiguousHead +// notifyAndAdvance will notify waiters in heightSub and advance contiguousHead // based on already written headers. -func (s *Store[H]) unblockAndAdvance(ctx context.Context, headers ...H) { +func (s *Store[H]) notifyAndAdvance(ctx context.Context, headers ...H) { // always inform heightSub about new headers seen for _, h := range headers { - s.heightSub.UnblockHeight(h.Height()) + s.heightSub.NotifyHeight(h.Height()) } currHead := s.contiguousHead.Load() From e9fbde6fcb454932502808e90c61eab46afdfc28 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 27 Jan 2025 16:18:41 +0100 Subject: [PATCH 29/44] sky is the limit --- store/heightsub.go | 10 +++++----- store/heightsub_test.go | 16 ++++++++-------- store/store.go | 2 +- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 7ca90194..2c9d2c13 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -34,7 +34,7 @@ func newHeightSub[H header.Header[H]]() *heightSub[H] { } // Init the heightSub with a given height. -// Notifies all awaiting [Wait] calls lower than height. +// Notifies all awaiting [WaitHeight] calls lower than height. func (hs *heightSub[H]) Init(height uint64) { hs.height.Store(height) @@ -54,7 +54,7 @@ func (hs *heightSub[H]) Height() uint64 { } // SetHeight sets the new head height for heightSub. -// Notifies all awaiting [Wait] calls in range from [heightSub.Height] to height. +// Notifies all awaiting [WaitHeight] calls in range from [heightSub.Height] to height. func (hs *heightSub[H]) SetHeight(height uint64) { for { curr := hs.height.Load() @@ -75,10 +75,10 @@ func (hs *heightSub[H]) SetHeight(height uint64) { } } -// Wait for a given height to be published. +// WaitHeight for a given height to be published. // It can return errElapsedHeight, which means a requested height was already seen // and caller should get it elsewhere. -func (hs *heightSub[H]) Wait(ctx context.Context, height uint64) error { +func (hs *heightSub[H]) WaitHeight(ctx context.Context, height uint64) error { if hs.Height() >= height { return errElapsedHeight } @@ -114,7 +114,7 @@ func (hs *heightSub[H]) Wait(ctx context.Context, height uint64) error { } } -// NotifyHeight and release the waiters in [Wait]. +// NotifyHeight and release the waiters in [WaitHeight]. // Note: do not advance heightSub's height. func (hs *heightSub[H]) NotifyHeight(height uint64) { hs.heightSubsLk.Lock() diff --git a/store/heightsub_test.go b/store/heightsub_test.go index 1c5118ff..30567393 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -20,7 +20,7 @@ func TestHeightSub(t *testing.T) { { hs.Init(99) - err := hs.Wait(ctx, 10) + err := hs.WaitHeight(ctx, 10) assert.ErrorIs(t, err, errElapsedHeight) } @@ -33,7 +33,7 @@ func TestHeightSub(t *testing.T) { hs.SetHeight(102) }() - err := hs.Wait(ctx, 101) + err := hs.WaitHeight(ctx, 101) assert.NoError(t, err) } @@ -42,7 +42,7 @@ func TestHeightSub(t *testing.T) { ch := make(chan error, 10) for range cap(ch) { go func() { - err := hs.Wait(ctx, 103) + err := hs.WaitHeight(ctx, 103) ch <- err }() } @@ -76,7 +76,7 @@ func TestHeightSub_withWaitCancelled(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, time.Duration(i+1)*time.Millisecond) defer cancel() - err := hs.Wait(ctx, 100) + err := hs.WaitHeight(ctx, 100) cancelChs[i] <- err }() @@ -84,7 +84,7 @@ func TestHeightSub_withWaitCancelled(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - err := hs.Wait(ctx, 100) + err := hs.WaitHeight(ctx, 100) blockedChs[i] <- err }() } @@ -118,7 +118,7 @@ func TestHeightSubNonAdjacement(t *testing.T) { hs.SetHeight(300) }() - err := hs.Wait(ctx, 200) + err := hs.WaitHeight(ctx, 200) assert.NoError(t, err) } @@ -147,7 +147,7 @@ func TestHeightSubCancellation(t *testing.T) { sub := make(chan struct{}) go func() { // subscribe first time - hs.Wait(ctx, h.Height()) + hs.WaitHeight(ctx, h.Height()) sub <- struct{}{} }() @@ -157,7 +157,7 @@ func TestHeightSubCancellation(t *testing.T) { // subscribe again but with failed canceled context canceledCtx, cancel := context.WithCancel(ctx) cancel() - err := hs.Wait(canceledCtx, h.Height()) + err := hs.WaitHeight(canceledCtx, h.Height()) assert.ErrorIs(t, err, context.Canceled) // update height diff --git a/store/store.go b/store/store.go index 93a34ee3..19554fc4 100644 --- a/store/store.go +++ b/store/store.go @@ -222,7 +222,7 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { // if the requested 'height' was not yet published // we subscribe to it if head := s.contiguousHead.Load(); head == nil || height > (*head).Height() { - err := s.heightSub.Wait(ctx, height) + err := s.heightSub.WaitHeight(ctx, height) if err != nil && !errors.Is(err, errElapsedHeight) { return zero, err } From 07babe791345bb4a7e0f72849568c7f1dbeea6cc Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 27 Jan 2025 16:21:50 +0100 Subject: [PATCH 30/44] hehe --- store/heightsub.go | 36 +++++++++++++++++------------------- store/heightsub_test.go | 26 +++++++++++++------------- store/store.go | 8 ++++---- 3 files changed, 34 insertions(+), 36 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 2c9d2c13..d9f40db2 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -5,15 +5,13 @@ import ( "errors" "sync" "sync/atomic" - - "github.com/celestiaorg/go-header" ) // errElapsedHeight is thrown when a requested height was already provided to heightSub. var errElapsedHeight = errors.New("elapsed height") // heightSub provides a minimalistic mechanism to wait till header for a height becomes available. -type heightSub[H header.Header[H]] struct { +type heightSub struct { // height refers to the latest locally available header height // that has been fully verified and inserted into the subjective chain height atomic.Uint64 @@ -27,15 +25,15 @@ type sub struct { } // newHeightSub instantiates new heightSub. -func newHeightSub[H header.Header[H]]() *heightSub[H] { - return &heightSub[H]{ +func newHeightSub() *heightSub { + return &heightSub{ heightSubs: make(map[uint64]*sub), } } // Init the heightSub with a given height. -// Notifies all awaiting [WaitHeight] calls lower than height. -func (hs *heightSub[H]) Init(height uint64) { +// Notifies all awaiting [Wait] calls lower than height. +func (hs *heightSub) Init(height uint64) { hs.height.Store(height) hs.heightSubsLk.Lock() @@ -43,19 +41,19 @@ func (hs *heightSub[H]) Init(height uint64) { for h := range hs.heightSubs { if h < height { - hs.notifyHeight(h, true) + hs.notify(h, true) } } } // Height reports current height. -func (hs *heightSub[H]) Height() uint64 { +func (hs *heightSub) Height() uint64 { return hs.height.Load() } // SetHeight sets the new head height for heightSub. -// Notifies all awaiting [WaitHeight] calls in range from [heightSub.Height] to height. -func (hs *heightSub[H]) SetHeight(height uint64) { +// Notifies all awaiting [Wait] calls in range from [heightSub.Height] to height. +func (hs *heightSub) SetHeight(height uint64) { for { curr := hs.height.Load() if curr >= height { @@ -69,16 +67,16 @@ func (hs *heightSub[H]) SetHeight(height uint64) { defer hs.heightSubsLk.Unlock() //nolint:gocritic we have a return below for ; curr <= height; curr++ { - hs.notifyHeight(curr, true) + hs.notify(curr, true) } return } } -// WaitHeight for a given height to be published. +// Wait for a given height to be published. // It can return errElapsedHeight, which means a requested height was already seen // and caller should get it elsewhere. -func (hs *heightSub[H]) WaitHeight(ctx context.Context, height uint64) error { +func (hs *heightSub) Wait(ctx context.Context, height uint64) error { if hs.Height() >= height { return errElapsedHeight } @@ -108,22 +106,22 @@ func (hs *heightSub[H]) WaitHeight(ctx context.Context, height uint64) error { case <-ctx.Done(): // no need to keep the request, if the op has canceled hs.heightSubsLk.Lock() - hs.notifyHeight(height, false) + hs.notify(height, false) hs.heightSubsLk.Unlock() return ctx.Err() } } -// NotifyHeight and release the waiters in [WaitHeight]. +// Notify and release the waiters in [Wait]. // Note: do not advance heightSub's height. -func (hs *heightSub[H]) NotifyHeight(height uint64) { +func (hs *heightSub) Notify(height uint64) { hs.heightSubsLk.Lock() defer hs.heightSubsLk.Unlock() - hs.notifyHeight(height, true) + hs.notify(height, true) } -func (hs *heightSub[H]) notifyHeight(height uint64, all bool) { +func (hs *heightSub) notify(height uint64, all bool) { sac, ok := hs.heightSubs[height] if !ok { return diff --git a/store/heightsub_test.go b/store/heightsub_test.go index 30567393..98ebef04 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -14,13 +14,13 @@ func TestHeightSub(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - hs := newHeightSub[*headertest.DummyHeader]() + hs := newHeightSub() // assert subscription returns nil for past heights { hs.Init(99) - err := hs.WaitHeight(ctx, 10) + err := hs.Wait(ctx, 10) assert.ErrorIs(t, err, errElapsedHeight) } @@ -33,7 +33,7 @@ func TestHeightSub(t *testing.T) { hs.SetHeight(102) }() - err := hs.WaitHeight(ctx, 101) + err := hs.Wait(ctx, 101) assert.NoError(t, err) } @@ -42,7 +42,7 @@ func TestHeightSub(t *testing.T) { ch := make(chan error, 10) for range cap(ch) { go func() { - err := hs.WaitHeight(ctx, 103) + err := hs.Wait(ctx, 103) ch <- err }() } @@ -61,7 +61,7 @@ func TestHeightSub_withWaitCancelled(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - hs := newHeightSub[*headertest.DummyHeader]() + hs := newHeightSub() hs.Init(10) const waiters = 5 @@ -76,7 +76,7 @@ func TestHeightSub_withWaitCancelled(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, time.Duration(i+1)*time.Millisecond) defer cancel() - err := hs.WaitHeight(ctx, 100) + err := hs.Wait(ctx, 100) cancelChs[i] <- err }() @@ -84,7 +84,7 @@ func TestHeightSub_withWaitCancelled(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - err := hs.WaitHeight(ctx, 100) + err := hs.Wait(ctx, 100) blockedChs[i] <- err }() } @@ -108,7 +108,7 @@ func TestHeightSubNonAdjacement(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - hs := newHeightSub[*headertest.DummyHeader]() + hs := newHeightSub() hs.Init(99) go func() { @@ -118,13 +118,13 @@ func TestHeightSubNonAdjacement(t *testing.T) { hs.SetHeight(300) }() - err := hs.WaitHeight(ctx, 200) + err := hs.Wait(ctx, 200) assert.NoError(t, err) } // Test heightSub's height cannot go down but only up. func TestHeightSub_monotonicHeight(t *testing.T) { - hs := newHeightSub[*headertest.DummyHeader]() + hs := newHeightSub() hs.Init(99) assert.Equal(t, int64(hs.height.Load()), int64(99)) @@ -142,12 +142,12 @@ func TestHeightSubCancellation(t *testing.T) { h := headertest.RandDummyHeader(t) h.HeightI %= 1000 // make it a bit lower - hs := newHeightSub[*headertest.DummyHeader]() + hs := newHeightSub() sub := make(chan struct{}) go func() { // subscribe first time - hs.WaitHeight(ctx, h.Height()) + hs.Wait(ctx, h.Height()) sub <- struct{}{} }() @@ -157,7 +157,7 @@ func TestHeightSubCancellation(t *testing.T) { // subscribe again but with failed canceled context canceledCtx, cancel := context.WithCancel(ctx) cancel() - err := hs.WaitHeight(canceledCtx, h.Height()) + err := hs.Wait(canceledCtx, h.Height()) assert.ErrorIs(t, err, context.Canceled) // update height diff --git a/store/store.go b/store/store.go index 19554fc4..6b226f7d 100644 --- a/store/store.go +++ b/store/store.go @@ -41,7 +41,7 @@ type Store[H header.Header[H]] struct { heightIndex *heightIndexer[H] // manages current store read head height (1) and // allows callers to wait until header for a height is stored (2) - heightSub *heightSub[H] + heightSub *heightSub // writing to datastore // @@ -103,7 +103,7 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store cache: cache, metrics: metrics, heightIndex: index, - heightSub: newHeightSub[H](), + heightSub: newHeightSub(), writes: make(chan []H, 16), writesDn: make(chan struct{}), pending: newBatch[H](params.WriteBatchSize), @@ -222,7 +222,7 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { // if the requested 'height' was not yet published // we subscribe to it if head := s.contiguousHead.Load(); head == nil || height > (*head).Height() { - err := s.heightSub.WaitHeight(ctx, height) + err := s.heightSub.Wait(ctx, height) if err != nil && !errors.Is(err, errElapsedHeight) { return zero, err } @@ -511,7 +511,7 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { func (s *Store[H]) notifyAndAdvance(ctx context.Context, headers ...H) { // always inform heightSub about new headers seen for _, h := range headers { - s.heightSub.NotifyHeight(h.Height()) + s.heightSub.Notify(h.Height()) } currHead := s.contiguousHead.Load() From 459baad6c49e70bf77a634007d9772061fb4593b Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 27 Jan 2025 16:35:05 +0100 Subject: [PATCH 31/44] drop param --- store/store.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/store/store.go b/store/store.go index 6b226f7d..b52b9ecf 100644 --- a/store/store.go +++ b/store/store.go @@ -539,14 +539,14 @@ func (s *Store[H]) advanceContiguousHead(ctx context.Context, currHeight uint64) } if currHeight > prevHeight { - s.updateContiguousHead(ctx, newHead, currHeight) + s.updateContiguousHead(ctx, newHead) } return currHeight } -func (s *Store[H]) updateContiguousHead(ctx context.Context, newHead H, newHeight uint64) { +func (s *Store[H]) updateContiguousHead(ctx context.Context, newHead H) { s.contiguousHead.Store(&newHead) - s.heightSub.SetHeight(newHeight) + s.heightSub.SetHeight(newHead.Height()) log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) s.metrics.newHead(newHead.Height()) From 9f1c92ba9b397cd88cca9485424034a914e0048b Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 30 Jan 2025 10:23:10 +0100 Subject: [PATCH 32/44] review suggestions --- store/heightsub_test.go | 2 +- store/store.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/store/heightsub_test.go b/store/heightsub_test.go index 98ebef04..6ef64a64 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -104,7 +104,7 @@ func TestHeightSub_withWaitCancelled(t *testing.T) { } // Test heightSub can accept non-adj headers without an error. -func TestHeightSubNonAdjacement(t *testing.T) { +func TestHeightSubNonAdjacency(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() diff --git a/store/store.go b/store/store.go index b52b9ecf..5649e6f0 100644 --- a/store/store.go +++ b/store/store.go @@ -521,7 +521,7 @@ func (s *Store[H]) notifyAndAdvance(ctx context.Context, headers ...H) { } // advanceContiguousHead return a new highest contiguous height -// or a given if not found. +// or returns a given height if not found. func (s *Store[H]) advanceContiguousHead(ctx context.Context, currHeight uint64) uint64 { // TODO(cristaloleg): benchmark this timeout or make it dynamic. advCtx, advCancel := context.WithTimeout(ctx, 10*time.Second) @@ -557,11 +557,13 @@ func (s *Store[H]) updateContiguousHead(ctx context.Context, newHead H) { if err != nil { log.Errorw("cannot marshal new head", "height", newHead.Height(), "hash", newHead.Hash(), "err", err) + return } if err := s.ds.Put(ctx, headKey, b); err != nil { log.Errorw("cannot put new head", "height", newHead.Height(), "hash", newHead.Hash(), "err", err) + return } } From d554c5552381e07db39a7103fa51fa6fb433ff33 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Fri, 31 Jan 2025 12:04:38 +0100 Subject: [PATCH 33/44] fix --- store/store.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/store/store.go b/store/store.go index 5649e6f0..e529ac7b 100644 --- a/store/store.go +++ b/store/store.go @@ -466,11 +466,7 @@ func (s *Store[H]) loadHeadKey(ctx context.Context) error { return err } - newHeight := s.advanceContiguousHead(ctx, h.Height()) - if newHeight >= h.Height() { - s.contiguousHead.Store(&h) - s.heightSub.SetHeight(h.Height()) - } + s.advanceContiguousHead(ctx, h.Height()) return nil } @@ -521,8 +517,8 @@ func (s *Store[H]) notifyAndAdvance(ctx context.Context, headers ...H) { } // advanceContiguousHead return a new highest contiguous height -// or returns a given height if not found. -func (s *Store[H]) advanceContiguousHead(ctx context.Context, currHeight uint64) uint64 { +// or returns the given height if not found. +func (s *Store[H]) advanceContiguousHead(ctx context.Context, currHeight uint64) { // TODO(cristaloleg): benchmark this timeout or make it dynamic. advCtx, advCancel := context.WithTimeout(ctx, 10*time.Second) defer advCancel() @@ -538,10 +534,9 @@ func (s *Store[H]) advanceContiguousHead(ctx context.Context, currHeight uint64) currHeight++ } - if currHeight > prevHeight { + if currHeight >= prevHeight { s.updateContiguousHead(ctx, newHead) } - return currHeight } func (s *Store[H]) updateContiguousHead(ctx context.Context, newHead H) { From 034e61821d6ebc9671d957a22c65f8f3661ca262 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Fri, 31 Jan 2025 12:09:47 +0100 Subject: [PATCH 34/44] revert --- store/store.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/store/store.go b/store/store.go index e529ac7b..26a4173d 100644 --- a/store/store.go +++ b/store/store.go @@ -466,7 +466,11 @@ func (s *Store[H]) loadHeadKey(ctx context.Context) error { return err } - s.advanceContiguousHead(ctx, h.Height()) + newHeight := s.advanceContiguousHead(ctx, h.Height()) + if newHeight >= h.Height() { + s.contiguousHead.Store(&h) + s.heightSub.SetHeight(h.Height()) + } return nil } @@ -518,7 +522,7 @@ func (s *Store[H]) notifyAndAdvance(ctx context.Context, headers ...H) { // advanceContiguousHead return a new highest contiguous height // or returns the given height if not found. -func (s *Store[H]) advanceContiguousHead(ctx context.Context, currHeight uint64) { +func (s *Store[H]) advanceContiguousHead(ctx context.Context, currHeight uint64) uint64 { // TODO(cristaloleg): benchmark this timeout or make it dynamic. advCtx, advCancel := context.WithTimeout(ctx, 10*time.Second) defer advCancel() @@ -534,9 +538,10 @@ func (s *Store[H]) advanceContiguousHead(ctx context.Context, currHeight uint64) currHeight++ } - if currHeight >= prevHeight { + if currHeight > prevHeight { s.updateContiguousHead(ctx, newHead) } + return currHeight } func (s *Store[H]) updateContiguousHead(ctx context.Context, newHead H) { From aa97c3927d7e4a64b547d42ab35c52b14236aa9e Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 4 Feb 2025 13:19:10 +0100 Subject: [PATCH 35/44] more review suggestions --- store/batch.go | 2 +- store/heightsub.go | 6 ++++-- store/store.go | 50 ++++++++++++++++------------------------------ 3 files changed, 22 insertions(+), 36 deletions(-) diff --git a/store/batch.go b/store/batch.go index 04b41a01..55bb529a 100644 --- a/store/batch.go +++ b/store/batch.go @@ -6,7 +6,7 @@ import ( "github.com/celestiaorg/go-header" ) -// batch keeps an adjacent range of headers and loosely mimics the Store +// batch keeps a range of headers and loosely mimics the Store // interface. NOTE: Can fully implement Store for a use case. // // It keeps a mapping 'height -> header' and 'hash -> height' diff --git a/store/heightsub.go b/store/heightsub.go index d9f40db2..7f83f275 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -114,11 +114,13 @@ func (hs *heightSub) Wait(ctx context.Context, height uint64) error { // Notify and release the waiters in [Wait]. // Note: do not advance heightSub's height. -func (hs *heightSub) Notify(height uint64) { +func (hs *heightSub) Notify(heights ...uint64) { hs.heightSubsLk.Lock() defer hs.heightSubsLk.Unlock() - hs.notify(height, true) + for _, h := range heights { + hs.notify(h, true) + } } func (hs *heightSub) notify(height uint64, all bool) { diff --git a/store/store.go b/store/store.go index 26a4173d..edb0468c 100644 --- a/store/store.go +++ b/store/store.go @@ -221,11 +221,9 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { // if the requested 'height' was not yet published // we subscribe to it - if head := s.contiguousHead.Load(); head == nil || height > (*head).Height() { - err := s.heightSub.Wait(ctx, height) - if err != nil && !errors.Is(err, errElapsedHeight) { - return zero, err - } + err := s.heightSub.Wait(ctx, height) + if err != nil && !errors.Is(err, errElapsedHeight) { + return zero, err } // otherwise, the errElapsedHeight is thrown, // which means the requested 'height' should be present @@ -441,7 +439,11 @@ func (s *Store[H]) flush(ctx context.Context, headers ...H) error { } // marshal and add to batch reference to the new head - b, err := headers[ln-1].Hash().MarshalJSON() + head := headers[ln-1] + if h := s.contiguousHead.Load(); h != nil { + head = *h + } + b, err := head.Hash().MarshalJSON() if err != nil { return err } @@ -510,13 +512,15 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { // based on already written headers. func (s *Store[H]) notifyAndAdvance(ctx context.Context, headers ...H) { // always inform heightSub about new headers seen - for _, h := range headers { - s.heightSub.Notify(h.Height()) + heights := make([]uint64, len(headers)) + for i := range headers { + heights[i] = headers[i].Height() } + s.heightSub.Notify(heights...) currHead := s.contiguousHead.Load() if currHead != nil { - s.advanceContiguousHead(ctx, (*currHead).Height()) + s.advanceContiguousHead(ctx, s.heightSub.Height()) } } @@ -539,34 +543,14 @@ func (s *Store[H]) advanceContiguousHead(ctx context.Context, currHeight uint64) } if currHeight > prevHeight { - s.updateContiguousHead(ctx, newHead) + s.contiguousHead.Store(&newHead) + s.heightSub.SetHeight(newHead.Height()) + log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) + s.metrics.newHead(newHead.Height()) } return currHeight } -func (s *Store[H]) updateContiguousHead(ctx context.Context, newHead H) { - s.contiguousHead.Store(&newHead) - s.heightSub.SetHeight(newHead.Height()) - log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) - s.metrics.newHead(newHead.Height()) - - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - b, err := newHead.Hash().MarshalJSON() - if err != nil { - log.Errorw("cannot marshal new head", - "height", newHead.Height(), "hash", newHead.Hash(), "err", err) - return - } - - if err := s.ds.Put(ctx, headKey, b); err != nil { - log.Errorw("cannot put new head", - "height", newHead.Height(), "hash", newHead.Hash(), "err", err) - return - } -} - // indexTo saves mapping between header Height and Hash to the given batch. func indexTo[H header.Header[H]](ctx context.Context, batch datastore.Batch, headers ...H) error { for _, h := range headers { From f2d860c20cf04191152a808d24ed6692cb717616 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 4 Feb 2025 13:54:47 +0100 Subject: [PATCH 36/44] simplify --- store/store.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/store/store.go b/store/store.go index edb0468c..1364d7af 100644 --- a/store/store.go +++ b/store/store.go @@ -115,6 +115,11 @@ func (s *Store[H]) Init(ctx context.Context, initial H) error { if s.heightSub.Height() != 0 { return errors.New("store already initialized") } + + // initialize with the initial head before first flush. + s.contiguousHead.Store(&initial) + s.heightSub.Init(initial.Height()) + // trust the given header as the initial head err := s.flush(ctx, initial) if err != nil { @@ -122,8 +127,6 @@ func (s *Store[H]) Init(ctx context.Context, initial H) error { } log.Infow("initialized head", "height", initial.Height(), "hash", initial.Hash()) - s.contiguousHead.Store(&initial) - s.heightSub.Init(initial.Height()) return nil } @@ -439,11 +442,8 @@ func (s *Store[H]) flush(ctx context.Context, headers ...H) error { } // marshal and add to batch reference to the new head - head := headers[ln-1] - if h := s.contiguousHead.Load(); h != nil { - head = *h - } - b, err := head.Hash().MarshalJSON() + head := s.contiguousHead.Load() + b, err := (*head).Hash().MarshalJSON() if err != nil { return err } From 5518222385193d0a50aa440ef046be1eda5d954c Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 4 Feb 2025 14:01:47 +0100 Subject: [PATCH 37/44] simplify again --- store/store.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/store/store.go b/store/store.go index 1364d7af..11d646f6 100644 --- a/store/store.go +++ b/store/store.go @@ -518,10 +518,7 @@ func (s *Store[H]) notifyAndAdvance(ctx context.Context, headers ...H) { } s.heightSub.Notify(heights...) - currHead := s.contiguousHead.Load() - if currHead != nil { - s.advanceContiguousHead(ctx, s.heightSub.Height()) - } + s.advanceContiguousHead(ctx, s.heightSub.Height()) } // advanceContiguousHead return a new highest contiguous height From aef55c4e9fdc370a48cb25cc0637af914c282910 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 4 Feb 2025 14:06:47 +0100 Subject: [PATCH 38/44] simplify --- store/store.go | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/store/store.go b/store/store.go index 11d646f6..bc97f40f 100644 --- a/store/store.go +++ b/store/store.go @@ -378,9 +378,10 @@ func (s *Store[H]) flushLoop() { for headers := range s.writes { // add headers to the pending and ensure they are accessible s.pending.Append(headers...) - // notify waiters in heightSub and advance contiguousHead - // if we don't have gaps. - s.notifyAndAdvance(ctx, headers...) + // always inform heightSub about new headers seen. + s.heightSub.Notify(getHeights(headers...)...) + // advance contiguousHead if we don't have gaps. + s.advanceContiguousHead(ctx, s.heightSub.Height()) // don't flush and continue if pending batch is not grown enough, // and Store is not stopping(headers == nil) if s.pending.Len() < s.Params.WriteBatchSize && headers != nil { @@ -508,19 +509,6 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { return data, nil } -// notifyAndAdvance will notify waiters in heightSub and advance contiguousHead -// based on already written headers. -func (s *Store[H]) notifyAndAdvance(ctx context.Context, headers ...H) { - // always inform heightSub about new headers seen - heights := make([]uint64, len(headers)) - for i := range headers { - heights[i] = headers[i].Height() - } - s.heightSub.Notify(heights...) - - s.advanceContiguousHead(ctx, s.heightSub.Height()) -} - // advanceContiguousHead return a new highest contiguous height // or returns the given height if not found. func (s *Store[H]) advanceContiguousHead(ctx context.Context, currHeight uint64) uint64 { @@ -558,3 +546,11 @@ func indexTo[H header.Header[H]](ctx context.Context, batch datastore.Batch, hea } return nil } + +func getHeights[H header.Header[H]](headers ...H) []uint64 { + heights := make([]uint64, len(headers)) + for i := range headers { + heights[i] = headers[i].Height() + } + return heights +} From ac051e4bb5081cef9ac87903be64d84642a6bcc8 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 4 Feb 2025 14:10:53 +0100 Subject: [PATCH 39/44] linter --- store/heightsub.go | 2 +- store/store.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/store/heightsub.go b/store/heightsub.go index 7f83f275..1bed1d3f 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -64,7 +64,7 @@ func (hs *heightSub) SetHeight(height uint64) { } hs.heightSubsLk.Lock() - defer hs.heightSubsLk.Unlock() //nolint:gocritic we have a return below + defer hs.heightSubsLk.Unlock() //nolint:gocritic // we have a return below for ; curr <= height; curr++ { hs.notify(curr, true) diff --git a/store/store.go b/store/store.go index bc97f40f..3204649f 100644 --- a/store/store.go +++ b/store/store.go @@ -178,7 +178,7 @@ func (s *Store[H]) Height() uint64 { return s.heightSub.Height() } -func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) { +func (s *Store[H]) Head(_ context.Context, _ ...header.HeadOption[H]) (H, error) { if head := s.contiguousHead.Load(); head != nil { return *head, nil } @@ -324,7 +324,6 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { // collect valid headers verified := make([]H, 0, lh) for i, h := range headers { - err = head.Verify(h) if err != nil { var verErr *header.VerifyError From 35856eb4644cd69cf36e674fd550d40ce9d0edd4 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Tue, 4 Feb 2025 17:47:08 +0100 Subject: [PATCH 40/44] think different --- store/store.go | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/store/store.go b/store/store.go index 3204649f..3f810c13 100644 --- a/store/store.go +++ b/store/store.go @@ -468,11 +468,8 @@ func (s *Store[H]) loadHeadKey(ctx context.Context) error { return err } - newHeight := s.advanceContiguousHead(ctx, h.Height()) - if newHeight >= h.Height() { - s.contiguousHead.Store(&h) - s.heightSub.SetHeight(h.Height()) - } + s.contiguousHead.Store(&h) + s.heightSub.SetHeight(h.Height()) return nil } @@ -510,29 +507,28 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { // advanceContiguousHead return a new highest contiguous height // or returns the given height if not found. -func (s *Store[H]) advanceContiguousHead(ctx context.Context, currHeight uint64) uint64 { - // TODO(cristaloleg): benchmark this timeout or make it dynamic. - advCtx, advCancel := context.WithTimeout(ctx, 10*time.Second) - defer advCancel() +func (s *Store[H]) advanceContiguousHead(ctx context.Context, height uint64) { + newHead, ok := s.nextContiguous(ctx, height) + if ok && newHead.Height() > height { + s.contiguousHead.Store(&newHead) + s.heightSub.SetHeight(newHead.Height()) + log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) + s.metrics.newHead(newHead.Height()) + } +} - prevHeight := currHeight +func (s *Store[H]) nextContiguous(ctx context.Context, height uint64) (H, bool) { var newHead H + newHeight := height for { - h, err := s.getByHeight(advCtx, currHeight+1) + h, err := s.getByHeight(ctx, newHeight+1) if err != nil { break } newHead = h - currHeight++ - } - - if currHeight > prevHeight { - s.contiguousHead.Store(&newHead) - s.heightSub.SetHeight(newHead.Height()) - log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) - s.metrics.newHead(newHead.Height()) + newHeight++ } - return currHeight + return newHead, newHeight != height } // indexTo saves mapping between header Height and Hash to the given batch. From 73b5c893777ff6601d19ca5c00392226334fa685 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 5 Feb 2025 10:44:49 +0100 Subject: [PATCH 41/44] more suggestions --- store/store.go | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/store/store.go b/store/store.go index 3f810c13..b98acd90 100644 --- a/store/store.go +++ b/store/store.go @@ -138,7 +138,7 @@ func (s *Store[H]) Start(ctx context.Context) error { default: } - if err := s.loadHeadKey(ctx); err != nil { + if err := s.loadContiguousHead(ctx); err != nil { // we might start on an empty datastore, no key is okay. if !errors.Is(err, datastore.ErrNotFound) { return fmt.Errorf("header/store: cannot load headKey: %w", err) @@ -442,8 +442,8 @@ func (s *Store[H]) flush(ctx context.Context, headers ...H) error { } // marshal and add to batch reference to the new head - head := s.contiguousHead.Load() - b, err := (*head).Hash().MarshalJSON() + head := *s.contiguousHead.Load() + b, err := head.Hash().MarshalJSON() if err != nil { return err } @@ -462,7 +462,8 @@ func (s *Store[H]) flush(ctx context.Context, headers ...H) error { return batch.Commit(ctx) } -func (s *Store[H]) loadHeadKey(ctx context.Context) error { +// loadContiguousHead from the disk and sets contiguousHead and heightSub. +func (s *Store[H]) loadContiguousHead(ctx context.Context) error { h, err := s.readHead(ctx) if err != nil { return err @@ -505,30 +506,32 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { return data, nil } -// advanceContiguousHead return a new highest contiguous height -// or returns the given height if not found. +// advanceContiguousHead updates contiguousHead and heightSub if a higher +// contiguous header exists on a disk. func (s *Store[H]) advanceContiguousHead(ctx context.Context, height uint64) { - newHead, ok := s.nextContiguous(ctx, height) - if ok && newHead.Height() > height { - s.contiguousHead.Store(&newHead) - s.heightSub.SetHeight(newHead.Height()) - log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) - s.metrics.newHead(newHead.Height()) + newHead := s.nextContiguousHead(ctx, height) + if newHead.IsZero() || newHead.Height() <= height { + return } + + s.contiguousHead.Store(&newHead) + s.heightSub.SetHeight(newHead.Height()) + log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) + s.metrics.newHead(newHead.Height()) } -func (s *Store[H]) nextContiguous(ctx context.Context, height uint64) (H, bool) { +// nextContiguousHead returns a next contiguous header if any. +func (s *Store[H]) nextContiguousHead(ctx context.Context, height uint64) H { var newHead H - newHeight := height for { - h, err := s.getByHeight(ctx, newHeight+1) + height++ + h, err := s.getByHeight(ctx, height) if err != nil { break } newHead = h - newHeight++ } - return newHead, newHeight != height + return newHead } // indexTo saves mapping between header Height and Hash to the given batch. From 84a70905e67993e889e9d6e3f7cdbdda881cb87a Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 5 Feb 2025 13:29:11 +0100 Subject: [PATCH 42/44] upd cmnt --- store/store.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/store/store.go b/store/store.go index b98acd90..397f9f34 100644 --- a/store/store.go +++ b/store/store.go @@ -520,7 +520,8 @@ func (s *Store[H]) advanceContiguousHead(ctx context.Context, height uint64) { s.metrics.newHead(newHead.Height()) } -// nextContiguousHead returns a next contiguous header if any. +// nextContiguousHead iterates up header by header until it finds a gap. +// if height+1 header not found returns a default header. func (s *Store[H]) nextContiguousHead(ctx context.Context, height uint64) H { var newHead H for { From 921de51acd1cbd5bfc598fa994357962413e84a0 Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 13 Feb 2025 11:40:54 +0100 Subject: [PATCH 43/44] revert head method --- store/store.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/store/store.go b/store/store.go index 397f9f34..3b8d7246 100644 --- a/store/store.go +++ b/store/store.go @@ -178,13 +178,24 @@ func (s *Store[H]) Height() uint64 { return s.heightSub.Height() } -func (s *Store[H]) Head(_ context.Context, _ ...header.HeadOption[H]) (H, error) { - if head := s.contiguousHead.Load(); head != nil { - return *head, nil +func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) { + head, err := s.GetByHeight(ctx, s.heightSub.Height()) + if err == nil { + return head, nil } var zero H - return zero, header.ErrNoHead + head, err = s.readHead(ctx) + switch { + default: + return zero, err + case errors.Is(err, datastore.ErrNotFound), errors.Is(err, header.ErrNotFound): + return zero, header.ErrNoHead + case err == nil: + s.heightSub.SetHeight(head.Height()) + log.Infow("loaded head", "height", head.Height(), "hash", head.Hash()) + return head, nil + } } func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) { From dc6d5bf711568f7c5a9c049a6f50ae71c0f7ed2c Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 13 Feb 2025 12:01:01 +0100 Subject: [PATCH 44/44] revert but a bit --- store/store.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/store/store.go b/store/store.go index 3b8d7246..55f9ce34 100644 --- a/store/store.go +++ b/store/store.go @@ -179,6 +179,10 @@ func (s *Store[H]) Height() uint64 { } func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) { + if head := s.contiguousHead.Load(); head != nil { + return *head, nil + } + head, err := s.GetByHeight(ctx, s.heightSub.Height()) if err == nil { return head, nil