From 115af456c09e4a024c366962738429bd341a3faf Mon Sep 17 00:00:00 2001 From: Wondertan Date: Mon, 31 Mar 2025 23:11:08 +0200 Subject: [PATCH 01/23] feat(sync): subjectiveTail --- sync/options.go | 18 ++++++++ sync/sync_head.go | 98 +++++++++++++++++++++++++++++++++++++++--- sync/sync_head_test.go | 33 ++++++++++++++ 3 files changed, 142 insertions(+), 7 deletions(-) diff --git a/sync/options.go b/sync/options.go index 62d06250..b89e9cf0 100644 --- a/sync/options.go +++ b/sync/options.go @@ -3,6 +3,8 @@ package sync import ( "fmt" "time" + + "github.com/celestiaorg/go-header" ) // Option is the functional option that is applied to the Syner instance @@ -20,6 +22,22 @@ type Parameters struct { // needed to report and punish misbehavior should be less than the unbonding // period. TrustingPeriod time.Duration + // SyncFromHash is the hash of the header from which the syncer should start syncing. + // + // By default, the syncer will start syncing from Tail, height of which is identified by the + // network head time minus TrustingPeriod. SyncFromHash overrides this default, allowing + // user to specify a custom starting point. + // + // SyncFromHash has higher priority than SyncFromHeight. + SyncFromHash header.Hash + // SyncFromHeight is the height of the header from which the syncer should start syncing. + // + // By default, the syncer will start syncing from Tail, height of which is identified by the + // network head time minus TrustingPeriod. SyncFromHeight overrides this default, allowing + // user to specify a custom starting point. + // + // SyncFromHeight has lower priority than SyncFromHash. + SyncFromHeight uint64 // blockTime provides a reference point for the Syncer to determine // whether its subjective head is outdated. // Keeping it private to disable serialization for it. diff --git a/sync/sync_head.go b/sync/sync_head.go index 3126d0d0..056a0016 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -1,6 +1,7 @@ package sync import ( + "bytes" "context" "errors" "fmt" @@ -56,6 +57,80 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err return s.subjectiveHead(ctx) } +func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { + tail, err := s.store.Tail(ctx) + switch { + case errors.Is(err, header.ErrEmptyStore): + switch { + case s.Params.SyncFromHash != nil: + tail, err = s.getter.Get(ctx, s.Params.SyncFromHash) + if err != nil { + return tail, fmt.Errorf("getting tail header by hash(%s): %w", s.Params.SyncFromHash, err) + } + case s.Params.SyncFromHeight != 0: + tail, err = s.getter.GetByHeight(ctx, s.Params.SyncFromHeight) + if err != nil { + return tail, fmt.Errorf("getting tail header(%d): %w", s.Params.SyncFromHeight, err) + } + default: + head, err := s.Head(ctx) + if err != nil { + return head, err + } + + tailHeight := estimateTail(head, s.Params.blockTime, s.Params.TrustingPeriod) + tail, err = s.getter.GetByHeight(ctx, tailHeight) + if err != nil { + return tail, fmt.Errorf("getting estimated tail header(%d): %w", tailHeight, err) + } + } + + err = s.store.Store.Append(ctx, tail) + if err != nil { + return tail, fmt.Errorf("appending tail header: %w", err) + } + + case !s.isTailActual(tail): + if s.Params.SyncFromHash != nil { + tail, err = s.getter.Get(ctx, s.Params.SyncFromHash) + if err != nil { + return tail, fmt.Errorf("getting tail header by hash(%s): %w", s.Params.SyncFromHash, err) + } + } else if s.Params.SyncFromHeight != 0 { + tail, err = s.getter.GetByHeight(ctx, s.Params.SyncFromHeight) + if err != nil { + return tail, fmt.Errorf("getting tail header(%d): %w", s.Params.SyncFromHeight, err) + } + } + + // TODO: Delete or sync up the diff + + case err != nil: + return tail, err + } + + return tail, nil +} + +// isTailActual checks if the given tail is actual based on the sync parameters. +func (s *Syncer[H]) isTailActual(tail H) bool { + if tail.IsZero() { + return false + } + + switch { + case s.Params.SyncFromHash == nil && s.Params.SyncFromHeight == 0: + // if both overrides are zero value, then we good with whatever tail there is + return true + case s.Params.SyncFromHash != nil && bytes.Equal(s.Params.SyncFromHash, tail.Hash()): + return true + case s.Params.SyncFromHeight != 0 && s.Params.SyncFromHeight == tail.Height(): + return true + default: + return false + } +} + // subjectiveHead returns the latest known local header that is not expired(within trusting period). // If the header is expired, it is retrieved from a trusted peer without validation; // in other words, an automatic subjective initialization is performed. @@ -70,15 +145,14 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { } // if pending is empty - get the latest stored/synced head storeHead, err := s.store.Head(ctx) - if err != nil { + switch { + case errors.Is(err, header.ErrEmptyStore): + log.Infow("no stored head, initializing...", "height") + case !storeHead.IsZero() && isExpired(storeHead, s.Params.TrustingPeriod): + log.Infow("stored head header expired", "height", storeHead.Height()) + default: return storeHead, err } - // check if the stored header is not expired and use it - if !isExpired(storeHead, s.Params.TrustingPeriod) { - return storeHead, nil - } - // otherwise, request head from a trusted peer - log.Infow("stored head header expired", "height", storeHead.Height()) trustHead, err := s.head.Head(ctx) if err != nil { @@ -257,3 +331,13 @@ func isRecent[H header.Header[H]](header H, blockTime, recencyThreshold time.Dur } return time.Since(header.Time()) <= recencyThreshold } + +func estimateTail[H header.Header[H]](head H, blockTime, trustingPeriod time.Duration) (height uint64) { + headersToRetain := uint64(trustingPeriod / blockTime) + + if headersToRetain >= head.Height() { + return 1 + } + tail := head.Height() - headersToRetain + return tail +} diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index 8d0dcf1c..dfe56fa7 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -8,14 +8,47 @@ import ( "testing" "time" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/celestiaorg/go-header" "github.com/celestiaorg/go-header/headertest" "github.com/celestiaorg/go-header/local" + "github.com/celestiaorg/go-header/store" ) +func TestSyncer_Tail(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + remoteStore := headertest.NewDummyStore(t) + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + localStore, err := store.NewStore[*headertest.DummyHeader](ds) + require.NoError(t, err) + err = localStore.Start(ctx) + require.NoError(t, err) + + syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore, + localStore, + headertest.NewDummySubscriber(), + WithRecencyThreshold(time.Nanosecond), // force recent requests + WithBlockTime(time.Second*6), + ) + require.NoError(t, err) + + tail, err := syncer.Tail(ctx) + require.NoError(t, err) + assert.NotNil(t, tail) + + storeTail, err := localStore.Tail(ctx) + require.NoError(t, err) + assert.EqualValues(t, tail.Height(), storeTail.Height()) +} + func TestSyncer_HeadConcurrencyError(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) From c698d74a50263a2cb1877a1ca11aa8064758ecce Mon Sep 17 00:00:00 2001 From: Wondertan Date: Tue, 6 May 2025 23:31:01 +0200 Subject: [PATCH 02/23] fix head --- sync/sync_head.go | 19 +++++++++++++------ sync/sync_head_test.go | 12 ++++++++++-- sync/sync_store.go | 18 ++++++++++-------- 3 files changed, 33 insertions(+), 16 deletions(-) diff --git a/sync/sync_head.go b/sync/sync_head.go index 056a0016..a85cf1e3 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -61,6 +61,13 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { tail, err := s.store.Tail(ctx) switch { case errors.Is(err, header.ErrEmptyStore): + // TODO(@Wondertan): This is a temporary solution requesting the head directly from the network instead of + // calling general Head path. This is needed to ensure Tail is written to the store first. + head, err := s.head.Head(ctx) + if err != nil { + return head, err + } + switch { case s.Params.SyncFromHash != nil: tail, err = s.getter.Get(ctx, s.Params.SyncFromHash) @@ -73,11 +80,6 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { return tail, fmt.Errorf("getting tail header(%d): %w", s.Params.SyncFromHeight, err) } default: - head, err := s.Head(ctx) - if err != nil { - return head, err - } - tailHeight := estimateTail(head, s.Params.blockTime, s.Params.TrustingPeriod) tail, err = s.getter.GetByHeight(ctx, tailHeight) if err != nil { @@ -85,11 +87,16 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { } } - err = s.store.Store.Append(ctx, tail) + err = s.store.Append(ctx, tail) if err != nil { return tail, fmt.Errorf("appending tail header: %w", err) } + err = s.incomingNetworkHead(ctx, head) + if err != nil { + return tail, fmt.Errorf("applying head from trusted peers: %w", err) + } + case !s.isTailActual(tail): if s.Params.SyncFromHash != nil { tail, err = s.getter.Get(ctx, s.Params.SyncFromHash) diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index dfe56fa7..2939a187 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -23,7 +23,7 @@ func TestSyncer_Tail(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) - remoteStore := headertest.NewDummyStore(t) + remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) ds := dssync.MutexWrap(datastore.NewMapDatastore()) localStore, err := store.NewStore[*headertest.DummyHeader](ds) @@ -44,9 +44,17 @@ func TestSyncer_Tail(t *testing.T) { require.NoError(t, err) assert.NotNil(t, tail) + err = syncer.Start(ctx) + require.NoError(t, err) + + time.Sleep(time.Millisecond * 100) storeTail, err := localStore.Tail(ctx) require.NoError(t, err) assert.EqualValues(t, tail.Height(), storeTail.Height()) + + storeHead, err := localStore.Head(ctx) + require.NoError(t, err) + assert.EqualValues(t, remoteStore.Height(), storeHead.Height()) } func TestSyncer_HeadConcurrencyError(t *testing.T) { @@ -435,7 +443,7 @@ type errorGetter struct{} func (e errorGetter) Head( context.Context, - ...header.HeadOption[*headertest.DummyHeader], +...header.HeadOption[*headertest.DummyHeader], ) (*headertest.DummyHeader, error) { time.Sleep(time.Millisecond * 1) return nil, fmt.Errorf("error") diff --git a/sync/sync_store.go b/sync/sync_store.go index 8254a6cb..fd838abc 100644 --- a/sync/sync_store.go +++ b/sync/sync_store.go @@ -48,18 +48,20 @@ func (s *syncStore[H]) Append(ctx context.Context, headers ...H) error { } head, err := s.Head(ctx) - if err != nil && !errors.Is(err, context.Canceled) { - panic(err) + if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, header.ErrEmptyStore) { + return err } - for _, h := range headers { - if h.Height() != head.Height()+1 { - return &errNonAdjacent{ - Head: head.Height(), - Attempted: h.Height(), + if !errors.Is(err, header.ErrEmptyStore) { + for _, h := range headers { + if h.Height() != head.Height()+1 { + return &errNonAdjacent{ + Head: head.Height(), + Attempted: h.Height(), + } } + head = h } - head = h } if err := s.Store.Append(ctx, headers...); err != nil { From f87e8b4a7cdac6638df495c3fb7a8f01154e890c Mon Sep 17 00:00:00 2001 From: Wondertan Date: Tue, 6 May 2025 23:39:31 +0200 Subject: [PATCH 03/23] lint --- sync/sync_head.go | 21 ++++++++++++++++----- sync/sync_head_test.go | 2 +- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/sync/sync_head.go b/sync/sync_head.go index a85cf1e3..33a72c43 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -72,7 +72,11 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { case s.Params.SyncFromHash != nil: tail, err = s.getter.Get(ctx, s.Params.SyncFromHash) if err != nil { - return tail, fmt.Errorf("getting tail header by hash(%s): %w", s.Params.SyncFromHash, err) + return tail, fmt.Errorf( + "getting tail header by hash(%s): %w", + s.Params.SyncFromHash, + err, + ) } case s.Params.SyncFromHeight != 0: tail, err = s.getter.GetByHeight(ctx, s.Params.SyncFromHeight) @@ -101,7 +105,11 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { if s.Params.SyncFromHash != nil { tail, err = s.getter.Get(ctx, s.Params.SyncFromHash) if err != nil { - return tail, fmt.Errorf("getting tail header by hash(%s): %w", s.Params.SyncFromHash, err) + return tail, fmt.Errorf( + "getting tail header by hash(%s): %w", + s.Params.SyncFromHash, + err, + ) } } else if s.Params.SyncFromHeight != 0 { tail, err = s.getter.GetByHeight(ctx, s.Params.SyncFromHeight) @@ -154,7 +162,7 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { storeHead, err := s.store.Head(ctx) switch { case errors.Is(err, header.ErrEmptyStore): - log.Infow("no stored head, initializing...", "height") + log.Info("no stored head, initializing...") case !storeHead.IsZero() && isExpired(storeHead, s.Params.TrustingPeriod): log.Infow("stored head header expired", "height", storeHead.Height()) default: @@ -339,8 +347,11 @@ func isRecent[H header.Header[H]](header H, blockTime, recencyThreshold time.Dur return time.Since(header.Time()) <= recencyThreshold } -func estimateTail[H header.Header[H]](head H, blockTime, trustingPeriod time.Duration) (height uint64) { - headersToRetain := uint64(trustingPeriod / blockTime) +func estimateTail[H header.Header[H]]( + head H, + blockTime, trustingPeriod time.Duration, +) (height uint64) { + headersToRetain := uint64(trustingPeriod / blockTime) //nolint:gosec if headersToRetain >= head.Height() { return 1 diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index 2939a187..1fcb132c 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -443,7 +443,7 @@ type errorGetter struct{} func (e errorGetter) Head( context.Context, -...header.HeadOption[*headertest.DummyHeader], + ...header.HeadOption[*headertest.DummyHeader], ) (*headertest.DummyHeader, error) { time.Sleep(time.Millisecond * 1) return nil, fmt.Errorf("error") From f97fe87c5f3f7feeca24bf55ce5f0ea7b9c588cb Mon Sep 17 00:00:00 2001 From: Wondertan Date: Thu, 8 May 2025 00:53:36 +0200 Subject: [PATCH 04/23] add diff handling and minor other improvements --- sync/options.go | 16 +++++++ sync/sync.go | 6 ++- sync/sync_head.go | 80 ++++++++++++++++++++++++++-------- sync/sync_head_test.go | 98 +++++++++++++++++++++++++++++++++++++++++- 4 files changed, 180 insertions(+), 20 deletions(-) diff --git a/sync/options.go b/sync/options.go index b89e9cf0..0282bf90 100644 --- a/sync/options.go +++ b/sync/options.go @@ -101,3 +101,19 @@ func WithParams(params Parameters) Option { *old = params } } + +// WithSyncFromHash sets given header hash a starting point for syncing. +// See [Parameters.SyncFromHash] for details. +func WithSyncFromHash(hash header.Hash) Option { + return func(p *Parameters) { + p.SyncFromHash = hash + } +} + +// WithSyncFromHeight sets given height a starting point for syncing. +// See [Parameters.SyncFromHeight] for details. +func WithSyncFromHeight(height uint64) Option { + return func(p *Parameters) { + p.SyncFromHeight = height + } +} diff --git a/sync/sync.go b/sync/sync.go index dd3bf451..ee151bb5 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -98,7 +98,11 @@ func (s *Syncer[H]) Start(ctx context.Context) error { if err != nil { return err } - // gets the latest head and kicks off syncing if necessary + // gets the latest tail and then head, kicking off syncing if necessary + _, err = s.Tail(ctx) + if err != nil { + return fmt.Errorf("error getting tail during Start: %w", err) + } _, err = s.Head(ctx) if err != nil { return fmt.Errorf("error getting latest head during Start: %w", err) diff --git a/sync/sync_head.go b/sync/sync_head.go index 33a72c43..14229747 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -57,15 +57,25 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err return s.subjectiveHead(ctx) } +// Tail returns the current Tail header. +// +// If the underlying header store is not initialized/empty, it lazily performs subjective initialization +// based on either estimated or preconfigured Tail. +// +// If the preconfigured Tail(SyncFromHash/Header) has changed upon Syncer restarts, it lazily sets the new Tail +// and resolves the difference. func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { tail, err := s.store.Tail(ctx) switch { case errors.Is(err, header.ErrEmptyStore): - // TODO(@Wondertan): This is a temporary solution requesting the head directly from the network instead of - // calling general Head path. This is needed to ensure Tail is written to the store first. + // Store is empty, likely the first start - initialize. + log.Info("empty store, initializing...") + // TODO(@Wondertan): Requesting the head directly from the network instead of + // calling general Head path. This is a temporary solution needed to ensure Tail is written to the store first + // before Head. To be reworked by bsync. head, err := s.head.Head(ctx) if err != nil { - return head, err + return head, fmt.Errorf("requesting network head: %w", err) } switch { @@ -101,24 +111,64 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { return tail, fmt.Errorf("applying head from trusted peers: %w", err) } - case !s.isTailActual(tail): + log.Infof("initialized with Tail %d and Head %d", tail.Height(), head.Height()) + + // TODO: Make sure all the metrics for this alternative subjective init path are added + + case !tail.IsZero() && !s.isTailActual(tail): + // Configured Tail has changed - get a new one and resolve the diff + + currentTail, newTail := tail, tail + if s.Params.SyncFromHash != nil { - tail, err = s.getter.Get(ctx, s.Params.SyncFromHash) + // check first locally if the new Tail exists + newTail, err = s.store.Get(ctx, s.Params.SyncFromHash) if err != nil { - return tail, fmt.Errorf( - "getting tail header by hash(%s): %w", - s.Params.SyncFromHash, - err, - ) + // if for whatever reason Tail is not available locally, request the new one from the network. + newTail, err = s.getter.Get(ctx, s.Params.SyncFromHash) + if err != nil { + return tail, fmt.Errorf( + "getting tail header by hash(%s): %w", + s.Params.SyncFromHash, + err, + ) + } } } else if s.Params.SyncFromHeight != 0 { - tail, err = s.getter.GetByHeight(ctx, s.Params.SyncFromHeight) + // check first locally if the new Tail exists + newTail, err = s.store.GetByHeight(ctx, s.Params.SyncFromHeight) if err != nil { - return tail, fmt.Errorf("getting tail header(%d): %w", s.Params.SyncFromHeight, err) + // if for whatever reason Tail is not available locally, request the new one from the network. + newTail, err = s.getter.GetByHeight(ctx, s.Params.SyncFromHeight) + if err != nil { + return tail, fmt.Errorf( + "getting tail header by hash(%s): %w", + s.Params.SyncFromHash, + err, + ) + } } } - // TODO: Delete or sync up the diff + if currentTail.Height() > newTail.Height() { + log.Infow("tail header changed from %d to %d, syncing the diff...", currentTail, newTail) + // TODO(@Wondertan): This works but it assumes this code is only run before syncing routine starts. + // If run after, it may race with other in prog syncs. + // To be reworked by bsync. + err := s.doSync(ctx, newTail, currentTail) + if err != nil { + return tail, fmt.Errorf("syncing the diff between old and new Tail: %w", err) + } + } else if currentTail.Height() < newTail.Height() { + log.Infow("Tail header changed from %d to %d, pruning the diff...", currentTail, newTail) + err := s.store.DeleteTo(ctx, newTail.Height()) + if err != nil { + return tail, fmt.Errorf("deleting headers up to newly configured Tail(%d): %w", newTail.Height(), err) + } + } else { + // equals case, must not happen + panic("currentTail == newTail") + } case err != nil: return tail, err @@ -129,10 +179,6 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { // isTailActual checks if the given tail is actual based on the sync parameters. func (s *Syncer[H]) isTailActual(tail H) bool { - if tail.IsZero() { - return false - } - switch { case s.Params.SyncFromHash == nil && s.Params.SyncFromHeight == 0: // if both overrides are zero value, then we good with whatever tail there is diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index 1fcb132c..3df6d405 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -31,23 +31,117 @@ func TestSyncer_Tail(t *testing.T) { err = localStore.Start(ctx) require.NoError(t, err) + syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(time.Second*6), + ) + require.NoError(t, err) + + tail, err := syncer.Tail(ctx) + require.NoError(t, err) + assert.NotNil(t, tail) + + time.Sleep(time.Millisecond * 10) + + err = syncer.Start(ctx) + require.NoError(t, err) + + time.Sleep(time.Millisecond * 10) + + storeTail, err := localStore.Tail(ctx) + require.NoError(t, err) + assert.EqualValues(t, tail.Height(), storeTail.Height()) + + storeHead, err := localStore.Head(ctx) + require.NoError(t, err) + assert.EqualValues(t, remoteStore.Height(), storeHead.Height()) +} + +func TestSyncer_TailInitFromHash(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + localStore, err := store.NewStore[*headertest.DummyHeader](ds) + require.NoError(t, err) + err = localStore.Start(ctx) + require.NoError(t, err) + + expectedTail, err := remoteStore.GetByHeight(ctx, 69) + require.NoError(t, err) + syncer, err := NewSyncer[*headertest.DummyHeader]( remoteStore, localStore, headertest.NewDummySubscriber(), WithRecencyThreshold(time.Nanosecond), // force recent requests WithBlockTime(time.Second*6), + WithSyncFromHash(expectedTail.Hash()), ) require.NoError(t, err) tail, err := syncer.Tail(ctx) require.NoError(t, err) assert.NotNil(t, tail) + assert.EqualValues(t, tail.Height(), expectedTail.Height()) + + time.Sleep(time.Millisecond * 10) err = syncer.Start(ctx) require.NoError(t, err) - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 10) + + storeTail, err := localStore.Tail(ctx) + require.NoError(t, err) + assert.EqualValues(t, tail.Height(), storeTail.Height()) + + storeHead, err := localStore.Head(ctx) + require.NoError(t, err) + assert.EqualValues(t, remoteStore.Height(), storeHead.Height()) +} + +func TestSyncer_TailInitFromHeight(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + localStore, err := store.NewStore[*headertest.DummyHeader](ds) + require.NoError(t, err) + err = localStore.Start(ctx) + require.NoError(t, err) + + expectedTail, err := remoteStore.GetByHeight(ctx, 69) + require.NoError(t, err) + + syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore, + localStore, + headertest.NewDummySubscriber(), + WithRecencyThreshold(time.Nanosecond), // force recent requests + WithBlockTime(time.Second*6), + WithSyncFromHeight(expectedTail.Height()), + ) + require.NoError(t, err) + + tail, err := syncer.Tail(ctx) + require.NoError(t, err) + assert.NotNil(t, tail) + assert.EqualValues(t, tail.Height(), expectedTail.Height()) + + time.Sleep(time.Millisecond * 10) + + err = syncer.Start(ctx) + require.NoError(t, err) + + time.Sleep(time.Millisecond * 10) + storeTail, err := localStore.Tail(ctx) require.NoError(t, err) assert.EqualValues(t, tail.Height(), storeTail.Height()) @@ -443,7 +537,7 @@ type errorGetter struct{} func (e errorGetter) Head( context.Context, - ...header.HeadOption[*headertest.DummyHeader], +...header.HeadOption[*headertest.DummyHeader], ) (*headertest.DummyHeader, error) { time.Sleep(time.Millisecond * 1) return nil, fmt.Errorf("error") From 9faaccdbfc04a54ffd291bfb51419d6b23b07646 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Thu, 8 May 2025 01:18:26 +0200 Subject: [PATCH 05/23] lint + combine tests --- headertest/store.go | 3 +- sync/sync_head.go | 25 ++++-- sync/sync_head_test.go | 174 ++++++++++++++--------------------------- 3 files changed, 79 insertions(+), 123 deletions(-) diff --git a/headertest/store.go b/headertest/store.go index 10444278..dbadba60 100644 --- a/headertest/store.go +++ b/headertest/store.go @@ -28,7 +28,8 @@ func NewDummyStore(t *testing.T) *Store[*DummyHeader] { func NewStore[H header.Header[H]](_ *testing.T, gen Generator[H], numHeaders int) *Store[H] { store := &Store[H]{ Headers: make(map[uint64]H), - HeadHeight: 0, + HeadHeight: 1, + TailHeight: 1, } for i := 0; i < numHeaders; i++ { diff --git a/sync/sync_head.go b/sync/sync_head.go index 14229747..e44c0667 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -150,8 +150,13 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { } } - if currentTail.Height() > newTail.Height() { - log.Infow("tail header changed from %d to %d, syncing the diff...", currentTail, newTail) + switch { + case currentTail.Height() > newTail.Height(): + log.Infow( + "tail header changed from %d to %d, syncing the diff...", + currentTail, + newTail, + ) // TODO(@Wondertan): This works but it assumes this code is only run before syncing routine starts. // If run after, it may race with other in prog syncs. // To be reworked by bsync. @@ -159,13 +164,21 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { if err != nil { return tail, fmt.Errorf("syncing the diff between old and new Tail: %w", err) } - } else if currentTail.Height() < newTail.Height() { - log.Infow("Tail header changed from %d to %d, pruning the diff...", currentTail, newTail) + case currentTail.Height() < newTail.Height(): + log.Infow( + "Tail header changed from %d to %d, pruning the diff...", + currentTail, + newTail, + ) err := s.store.DeleteTo(ctx, newTail.Height()) if err != nil { - return tail, fmt.Errorf("deleting headers up to newly configured Tail(%d): %w", newTail.Height(), err) + return tail, fmt.Errorf( + "deleting headers up to newly configured Tail(%d): %w", + newTail.Height(), + err, + ) } - } else { + default: // equals case, must not happen panic("currentTail == newTail") } diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index 3df6d405..bc62f7ba 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -19,136 +19,78 @@ import ( "github.com/celestiaorg/go-header/store" ) -func TestSyncer_Tail(t *testing.T) { +func TestSyncer_TailInit(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - localStore, err := store.NewStore[*headertest.DummyHeader](ds) - require.NoError(t, err) - err = localStore.Start(ctx) - require.NoError(t, err) - - syncer, err := NewSyncer[*headertest.DummyHeader]( - remoteStore, - localStore, - headertest.NewDummySubscriber(), - WithBlockTime(time.Second*6), - ) - require.NoError(t, err) - - tail, err := syncer.Tail(ctx) - require.NoError(t, err) - assert.NotNil(t, tail) - - time.Sleep(time.Millisecond * 10) - - err = syncer.Start(ctx) - require.NoError(t, err) - - time.Sleep(time.Millisecond * 10) - - storeTail, err := localStore.Tail(ctx) - require.NoError(t, err) - assert.EqualValues(t, tail.Height(), storeTail.Height()) - - storeHead, err := localStore.Head(ctx) - require.NoError(t, err) - assert.EqualValues(t, remoteStore.Height(), storeHead.Height()) -} - -func TestSyncer_TailInitFromHash(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - - remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) - - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - localStore, err := store.NewStore[*headertest.DummyHeader](ds) - require.NoError(t, err) - err = localStore.Start(ctx) - require.NoError(t, err) - expectedTail, err := remoteStore.GetByHeight(ctx, 69) require.NoError(t, err) - syncer, err := NewSyncer[*headertest.DummyHeader]( - remoteStore, - localStore, - headertest.NewDummySubscriber(), - WithRecencyThreshold(time.Nanosecond), // force recent requests - WithBlockTime(time.Second*6), - WithSyncFromHash(expectedTail.Hash()), - ) - require.NoError(t, err) - - tail, err := syncer.Tail(ctx) - require.NoError(t, err) - assert.NotNil(t, tail) - assert.EqualValues(t, tail.Height(), expectedTail.Height()) - - time.Sleep(time.Millisecond * 10) - - err = syncer.Start(ctx) - require.NoError(t, err) - - time.Sleep(time.Millisecond * 10) - - storeTail, err := localStore.Tail(ctx) - require.NoError(t, err) - assert.EqualValues(t, tail.Height(), storeTail.Height()) - - storeHead, err := localStore.Head(ctx) - require.NoError(t, err) - assert.EqualValues(t, remoteStore.Height(), storeHead.Height()) -} - -func TestSyncer_TailInitFromHeight(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - - remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) - - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - localStore, err := store.NewStore[*headertest.DummyHeader](ds) - require.NoError(t, err) - err = localStore.Start(ctx) - require.NoError(t, err) - - expectedTail, err := remoteStore.GetByHeight(ctx, 69) - require.NoError(t, err) + tests := []struct { + name string + option Option + expected func() *headertest.DummyHeader + }{ + { + "Estimate", + func(p *Parameters) {}, // noop to trigger estimation, + func() *headertest.DummyHeader { + remoteTail, err := remoteStore.Tail(ctx) + require.NoError(t, err) + return remoteTail + }, + }, + { + "SyncFromHash", + WithSyncFromHash(expectedTail.Hash()), + func() *headertest.DummyHeader { + return expectedTail + }, + }, + { + "SyncFromHeight", + WithSyncFromHeight(expectedTail.Height()), + func() *headertest.DummyHeader { + return expectedTail + }, + }, + } - syncer, err := NewSyncer[*headertest.DummyHeader]( - remoteStore, - localStore, - headertest.NewDummySubscriber(), - WithRecencyThreshold(time.Nanosecond), // force recent requests - WithBlockTime(time.Second*6), - WithSyncFromHeight(expectedTail.Height()), - ) - require.NoError(t, err) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + localStore, err := store.NewStore[*headertest.DummyHeader](ds) + require.NoError(t, err) + err = localStore.Start(ctx) + require.NoError(t, err) - tail, err := syncer.Tail(ctx) - require.NoError(t, err) - assert.NotNil(t, tail) - assert.EqualValues(t, tail.Height(), expectedTail.Height()) + syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(time.Second*6), + test.option, + ) + require.NoError(t, err) - time.Sleep(time.Millisecond * 10) + err = syncer.Start(ctx) + require.NoError(t, err) - err = syncer.Start(ctx) - require.NoError(t, err) + time.Sleep(time.Millisecond * 100) - time.Sleep(time.Millisecond * 10) + expectedTail := test.expected() - storeTail, err := localStore.Tail(ctx) - require.NoError(t, err) - assert.EqualValues(t, tail.Height(), storeTail.Height()) + storeTail, err := localStore.Tail(ctx) + require.NoError(t, err) + assert.EqualValues(t, expectedTail.Height(), storeTail.Height()) - storeHead, err := localStore.Head(ctx) - require.NoError(t, err) - assert.EqualValues(t, remoteStore.Height(), storeHead.Height()) + storeHead, err := localStore.Head(ctx) + require.NoError(t, err) + assert.EqualValues(t, remoteStore.Height(), storeHead.Height()) + }) + } } func TestSyncer_HeadConcurrencyError(t *testing.T) { @@ -537,7 +479,7 @@ type errorGetter struct{} func (e errorGetter) Head( context.Context, -...header.HeadOption[*headertest.DummyHeader], + ...header.HeadOption[*headertest.DummyHeader], ) (*headertest.DummyHeader, error) { time.Sleep(time.Millisecond * 1) return nil, fmt.Errorf("error") From 96b450b79848d7622715cbe50740106508a5aea0 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Thu, 8 May 2025 01:33:26 +0200 Subject: [PATCH 06/23] refine logs/metrics --- sync/sync_head.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/sync/sync_head.go b/sync/sync_head.go index e44c0667..9830e2c6 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -70,13 +70,14 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { case errors.Is(err, header.ErrEmptyStore): // Store is empty, likely the first start - initialize. log.Info("empty store, initializing...") - // TODO(@Wondertan): Requesting the head directly from the network instead of - // calling general Head path. This is a temporary solution needed to ensure Tail is written to the store first - // before Head. To be reworked by bsync. + // TODO(@Wondertan): Copying the initialization logic here instead of calling the general Head path. + // This is a temporary solution needed to ensure Tail is written to the store first before Head. + // To be reworked by bsync. head, err := s.head.Head(ctx) if err != nil { return head, fmt.Errorf("requesting network head: %w", err) } + s.metrics.subjectiveInitialization(s.ctx) switch { case s.Params.SyncFromHash != nil: @@ -111,9 +112,13 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { return tail, fmt.Errorf("applying head from trusted peers: %w", err) } - log.Infof("initialized with Tail %d and Head %d", tail.Height(), head.Height()) - - // TODO: Make sure all the metrics for this alternative subjective init path are added + log.Infow( + "subjective initialization finished", + "tail_height", + tail.Height(), + "head_height", + head.Height(), + ) case !tail.IsZero() && !s.isTailActual(tail): // Configured Tail has changed - get a new one and resolve the diff @@ -152,7 +157,7 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { switch { case currentTail.Height() > newTail.Height(): - log.Infow( + log.Infof( "tail header changed from %d to %d, syncing the diff...", currentTail, newTail, @@ -165,8 +170,8 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { return tail, fmt.Errorf("syncing the diff between old and new Tail: %w", err) } case currentTail.Height() < newTail.Height(): - log.Infow( - "Tail header changed from %d to %d, pruning the diff...", + log.Infof( + "tail header changed from %d to %d, pruning the diff...", currentTail, newTail, ) From d37c42be43f7523b2d16c15f64cadcc3aa9e72ae Mon Sep 17 00:00:00 2001 From: Wondertan Date: Thu, 8 May 2025 13:47:59 +0200 Subject: [PATCH 07/23] tests for reinit --- sync/sync_head_test.go | 42 +++++++++++++++++++++++++++++++++++------- sync/sync_store.go | 21 ++++++++++++++++++--- 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index bc62f7ba..95a8d849 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -29,9 +29,10 @@ func TestSyncer_TailInit(t *testing.T) { require.NoError(t, err) tests := []struct { - name string - option Option - expected func() *headertest.DummyHeader + name string + option Option + expected func() *headertest.DummyHeader + expectedAfterRestart func() *headertest.DummyHeader }{ { "Estimate", @@ -41,6 +42,11 @@ func TestSyncer_TailInit(t *testing.T) { require.NoError(t, err) return remoteTail }, + func() *headertest.DummyHeader { + remoteTail, err := remoteStore.Tail(ctx) + require.NoError(t, err) + return remoteTail + }, }, { "SyncFromHash", @@ -48,6 +54,11 @@ func TestSyncer_TailInit(t *testing.T) { func() *headertest.DummyHeader { return expectedTail }, + func() *headertest.DummyHeader { + expectedTail, err := remoteStore.GetByHeight(ctx, expectedTail.Height()+10) + require.NoError(t, err) + return expectedTail + }, }, { "SyncFromHeight", @@ -55,13 +66,18 @@ func TestSyncer_TailInit(t *testing.T) { func() *headertest.DummyHeader { return expectedTail }, + func() *headertest.DummyHeader { + expectedTail, err := remoteStore.GetByHeight(ctx, expectedTail.Height()-10) + require.NoError(t, err) + return expectedTail + }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) - localStore, err := store.NewStore[*headertest.DummyHeader](ds) + localStore, err := store.NewStore[*headertest.DummyHeader](ds, store.WithWriteBatchSize(1)) require.NoError(t, err) err = localStore.Start(ctx) require.NoError(t, err) @@ -77,18 +93,30 @@ func TestSyncer_TailInit(t *testing.T) { err = syncer.Start(ctx) require.NoError(t, err) - time.Sleep(time.Millisecond * 100) + // check that the syncer has the expected tail and head expectedTail := test.expected() - storeTail, err := localStore.Tail(ctx) require.NoError(t, err) assert.EqualValues(t, expectedTail.Height(), storeTail.Height()) - storeHead, err := localStore.Head(ctx) require.NoError(t, err) assert.EqualValues(t, remoteStore.Height(), storeHead.Height()) + + // restart the Syncer and set a new tail + err = syncer.Stop(ctx) + require.NoError(t, err) + expectedTail = test.expectedAfterRestart() + syncer.Params.SyncFromHeight = expectedTail.Height() + syncer.Params.SyncFromHash = expectedTail.Hash() + err = syncer.Start(ctx) + require.NoError(t, err) + + // ensure that the Syncer moved to the new tail after restart + storeTail, err = localStore.Tail(ctx) + require.NoError(t, err) + assert.EqualValues(t, expectedTail.Height(), storeTail.Height()) }) } } diff --git a/sync/sync_store.go b/sync/sync_store.go index fd838abc..62838dfc 100644 --- a/sync/sync_store.go +++ b/sync/sync_store.go @@ -48,11 +48,24 @@ func (s *syncStore[H]) Append(ctx context.Context, headers ...H) error { } head, err := s.Head(ctx) - if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, header.ErrEmptyStore) { + if errors.Is(err, header.ErrEmptyStore) { + // short-circuit for an initialization path + if err := s.Store.Append(ctx, headers...); err != nil { + return err + } + + s.head.Store(&headers[len(headers)-1]) + return nil + } + if err != nil { return err } - if !errors.Is(err, header.ErrEmptyStore) { + // TODO(@Wondertan): As store evolved, certain invariants it had were removed. + // However, Syncer has yet to be refactored to not assume those invariants and until then + // this method is a shim that allows using store with old assumptions. + // To be reworked by bsync. + if headers[0].Height() >= head.Height() { for _, h := range headers { if h.Height() != head.Height()+1 { return &errNonAdjacent{ @@ -60,14 +73,16 @@ func (s *syncStore[H]) Append(ctx context.Context, headers ...H) error { Attempted: h.Height(), } } + head = h } + + s.head.Store(&head) } if err := s.Store.Append(ctx, headers...); err != nil { return err } - s.head.Store(&headers[len(headers)-1]) return nil } From 70f65dea032db43d3a499e9d5a995b076906240f Mon Sep 17 00:00:00 2001 From: Wondertan Date: Thu, 8 May 2025 15:27:38 +0200 Subject: [PATCH 08/23] append missing tail manually --- sync/sync_head.go | 22 +++++++++++++++++++--- sync/sync_head_test.go | 7 ++++++- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/sync/sync_head.go b/sync/sync_head.go index 9830e2c6..3899db11 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -124,8 +124,8 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { // Configured Tail has changed - get a new one and resolve the diff currentTail, newTail := tail, tail - - if s.Params.SyncFromHash != nil { + switch { + case s.Params.SyncFromHash != nil: // check first locally if the new Tail exists newTail, err = s.store.Get(ctx, s.Params.SyncFromHash) if err != nil { @@ -138,8 +138,16 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { err, ) } + err = s.store.Append(ctx, newTail) + if err != nil { + return tail, fmt.Errorf( + "appending the new tail header(%d): %w", + newTail.Height(), + err, + ) + } } - } else if s.Params.SyncFromHeight != 0 { + case s.Params.SyncFromHeight != 0: // check first locally if the new Tail exists newTail, err = s.store.GetByHeight(ctx, s.Params.SyncFromHeight) if err != nil { @@ -152,6 +160,14 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { err, ) } + err = s.store.Append(ctx, newTail) + if err != nil { + return tail, fmt.Errorf( + "appending the new tail header(%d): %w", + newTail.Height(), + err, + ) + } } } diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index 95a8d849..0785dcf5 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -77,7 +77,10 @@ func TestSyncer_TailInit(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) - localStore, err := store.NewStore[*headertest.DummyHeader](ds, store.WithWriteBatchSize(1)) + localStore, err := store.NewStore[*headertest.DummyHeader]( + ds, + store.WithWriteBatchSize(1), + ) require.NoError(t, err) err = localStore.Start(ctx) require.NoError(t, err) @@ -113,6 +116,8 @@ func TestSyncer_TailInit(t *testing.T) { err = syncer.Start(ctx) require.NoError(t, err) + time.Sleep(time.Millisecond * 10) + // ensure that the Syncer moved to the new tail after restart storeTail, err = localStore.Tail(ctx) require.NoError(t, err) From df446ed67f03964906a468d64701a3041b0f0548 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Fri, 9 May 2025 19:25:19 +0200 Subject: [PATCH 09/23] major simplificussy --- sync/sync.go | 6 +- sync/sync_head.go | 238 ++++++----------------------------------- sync/sync_head_test.go | 110 ------------------- sync/sync_store.go | 4 + sync/sync_tail.go | 156 +++++++++++++++++++++++++++ sync/sync_tail_test.go | 162 ++++++++++++++++++++++++++++ sync/sync_test.go | 4 +- 7 files changed, 360 insertions(+), 320 deletions(-) create mode 100644 sync/sync_tail.go create mode 100644 sync/sync_tail_test.go diff --git a/sync/sync.go b/sync/sync.go index ee151bb5..2e28efe7 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -98,11 +98,6 @@ func (s *Syncer[H]) Start(ctx context.Context) error { if err != nil { return err } - // gets the latest tail and then head, kicking off syncing if necessary - _, err = s.Tail(ctx) - if err != nil { - return fmt.Errorf("error getting tail during Start: %w", err) - } _, err = s.Head(ctx) if err != nil { return fmt.Errorf("error getting latest head during Start: %w", err) @@ -115,6 +110,7 @@ func (s *Syncer[H]) Start(ctx context.Context) error { // Stop stops Syncer. func (s *Syncer[H]) Stop(context.Context) error { s.cancel() + s.store.Reset() return s.metrics.Close() } diff --git a/sync/sync_head.go b/sync/sync_head.go index 3899db11..995a4356 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -1,7 +1,6 @@ package sync import ( - "bytes" "context" "errors" "fmt" @@ -57,175 +56,6 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err return s.subjectiveHead(ctx) } -// Tail returns the current Tail header. -// -// If the underlying header store is not initialized/empty, it lazily performs subjective initialization -// based on either estimated or preconfigured Tail. -// -// If the preconfigured Tail(SyncFromHash/Header) has changed upon Syncer restarts, it lazily sets the new Tail -// and resolves the difference. -func (s *Syncer[H]) Tail(ctx context.Context) (H, error) { - tail, err := s.store.Tail(ctx) - switch { - case errors.Is(err, header.ErrEmptyStore): - // Store is empty, likely the first start - initialize. - log.Info("empty store, initializing...") - // TODO(@Wondertan): Copying the initialization logic here instead of calling the general Head path. - // This is a temporary solution needed to ensure Tail is written to the store first before Head. - // To be reworked by bsync. - head, err := s.head.Head(ctx) - if err != nil { - return head, fmt.Errorf("requesting network head: %w", err) - } - s.metrics.subjectiveInitialization(s.ctx) - - switch { - case s.Params.SyncFromHash != nil: - tail, err = s.getter.Get(ctx, s.Params.SyncFromHash) - if err != nil { - return tail, fmt.Errorf( - "getting tail header by hash(%s): %w", - s.Params.SyncFromHash, - err, - ) - } - case s.Params.SyncFromHeight != 0: - tail, err = s.getter.GetByHeight(ctx, s.Params.SyncFromHeight) - if err != nil { - return tail, fmt.Errorf("getting tail header(%d): %w", s.Params.SyncFromHeight, err) - } - default: - tailHeight := estimateTail(head, s.Params.blockTime, s.Params.TrustingPeriod) - tail, err = s.getter.GetByHeight(ctx, tailHeight) - if err != nil { - return tail, fmt.Errorf("getting estimated tail header(%d): %w", tailHeight, err) - } - } - - err = s.store.Append(ctx, tail) - if err != nil { - return tail, fmt.Errorf("appending tail header: %w", err) - } - - err = s.incomingNetworkHead(ctx, head) - if err != nil { - return tail, fmt.Errorf("applying head from trusted peers: %w", err) - } - - log.Infow( - "subjective initialization finished", - "tail_height", - tail.Height(), - "head_height", - head.Height(), - ) - - case !tail.IsZero() && !s.isTailActual(tail): - // Configured Tail has changed - get a new one and resolve the diff - - currentTail, newTail := tail, tail - switch { - case s.Params.SyncFromHash != nil: - // check first locally if the new Tail exists - newTail, err = s.store.Get(ctx, s.Params.SyncFromHash) - if err != nil { - // if for whatever reason Tail is not available locally, request the new one from the network. - newTail, err = s.getter.Get(ctx, s.Params.SyncFromHash) - if err != nil { - return tail, fmt.Errorf( - "getting tail header by hash(%s): %w", - s.Params.SyncFromHash, - err, - ) - } - err = s.store.Append(ctx, newTail) - if err != nil { - return tail, fmt.Errorf( - "appending the new tail header(%d): %w", - newTail.Height(), - err, - ) - } - } - case s.Params.SyncFromHeight != 0: - // check first locally if the new Tail exists - newTail, err = s.store.GetByHeight(ctx, s.Params.SyncFromHeight) - if err != nil { - // if for whatever reason Tail is not available locally, request the new one from the network. - newTail, err = s.getter.GetByHeight(ctx, s.Params.SyncFromHeight) - if err != nil { - return tail, fmt.Errorf( - "getting tail header by hash(%s): %w", - s.Params.SyncFromHash, - err, - ) - } - err = s.store.Append(ctx, newTail) - if err != nil { - return tail, fmt.Errorf( - "appending the new tail header(%d): %w", - newTail.Height(), - err, - ) - } - } - } - - switch { - case currentTail.Height() > newTail.Height(): - log.Infof( - "tail header changed from %d to %d, syncing the diff...", - currentTail, - newTail, - ) - // TODO(@Wondertan): This works but it assumes this code is only run before syncing routine starts. - // If run after, it may race with other in prog syncs. - // To be reworked by bsync. - err := s.doSync(ctx, newTail, currentTail) - if err != nil { - return tail, fmt.Errorf("syncing the diff between old and new Tail: %w", err) - } - case currentTail.Height() < newTail.Height(): - log.Infof( - "tail header changed from %d to %d, pruning the diff...", - currentTail, - newTail, - ) - err := s.store.DeleteTo(ctx, newTail.Height()) - if err != nil { - return tail, fmt.Errorf( - "deleting headers up to newly configured Tail(%d): %w", - newTail.Height(), - err, - ) - } - default: - // equals case, must not happen - panic("currentTail == newTail") - } - - case err != nil: - return tail, err - } - - return tail, nil -} - -// isTailActual checks if the given tail is actual based on the sync parameters. -func (s *Syncer[H]) isTailActual(tail H) bool { - switch { - case s.Params.SyncFromHash == nil && s.Params.SyncFromHeight == 0: - // if both overrides are zero value, then we good with whatever tail there is - return true - case s.Params.SyncFromHash != nil && bytes.Equal(s.Params.SyncFromHash, tail.Hash()): - return true - case s.Params.SyncFromHeight != 0 && s.Params.SyncFromHeight == tail.Height(): - return true - default: - return false - } -} - // subjectiveHead returns the latest known local header that is not expired(within trusting period). // If the header is expired, it is retrieved from a trusted peer without validation; // in other words, an automatic subjective initialization is performed. @@ -242,34 +72,41 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { storeHead, err := s.store.Head(ctx) switch { case errors.Is(err, header.ErrEmptyStore): - log.Info("no stored head, initializing...") + log.Info("empty store, initializing...") + s.metrics.subjectiveInitialization(s.ctx) case !storeHead.IsZero() && isExpired(storeHead, s.Params.TrustingPeriod): log.Infow("stored head header expired", "height", storeHead.Height()) default: return storeHead, err } - - trustHead, err := s.head.Head(ctx) + // fetch a new head from trusted peers if not available locally + newHead, err := s.head.Head(ctx) if err != nil { - return trustHead, err + return newHead, err } - s.metrics.subjectiveInitialization(s.ctx) - // and set it as the new subjective head without validation, - // or, in other words, do 'automatic subjective initialization' - // NOTE: we avoid validation as the head expired to prevent possibility of the Long-Range Attack - s.setSubjectiveHead(ctx, trustHead) switch { - default: - log.Infow("subjective initialization finished", "height", trustHead.Height()) - return trustHead, nil - case isExpired(trustHead, s.Params.TrustingPeriod): - log.Warnw("subjective initialization with an expired header", "height", trustHead.Height()) - case !isRecent(trustHead, s.Params.blockTime, s.Params.recencyThreshold): - log.Warnw("subjective initialization with an old header", "height", trustHead.Height()) + case isExpired(newHead, s.Params.TrustingPeriod): + // forbid initializing off an expired header + err := fmt.Errorf("subjective initialization with an expired header(%d)", newHead.Height()) + log.Error(err, "\n trusted peers are out of sync") + s.metrics.trustedPeersOutOufSync(s.ctx) + return newHead, err + case !isRecent(newHead, s.Params.blockTime, s.Params.recencyThreshold): + log.Warnw("subjective initialization with not recent header", "height", newHead.Height()) + s.metrics.trustedPeersOutOufSync(s.ctx) } - log.Warn("trusted peer is out of sync") - s.metrics.trustedPeersOutOufSync(s.ctx) - return trustHead, nil + + // and set the fetched head as the new subjective head validating it against the tail + // or, in other words, do 'automatic subjective initialization' + err = s.incomingNetworkHead(ctx, newHead) + if err != nil { + err = fmt.Errorf("subjective initialization failed for head(%d): %w", newHead.Height(), err) + log.Error(err) + return newHead, err + } + + log.Infow("subjective initialization finished", "head", newHead.Height()) + return newHead, nil } // setSubjectiveHead takes already validated head and sets it as the new sync target. @@ -307,7 +144,15 @@ func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, head H) error { s.incomingMu.Lock() defer s.incomingMu.Unlock() - err := s.verify(ctx, head) + // TODO(@Wondertan): We need to ensure Tail is available before verification for subj init case and this is fine here + // however, check if that's ok to trigger Tail moves on network header that hasn't been verified yet + // To be reworked by bsync. + _, err := s.subjectiveTail(ctx, head) + if err != nil { + return err + } + + err = s.verify(ctx, head) if err != nil { return err } @@ -426,16 +271,3 @@ func isRecent[H header.Header[H]](header H, blockTime, recencyThreshold time.Dur } return time.Since(header.Time()) <= recencyThreshold } - -func estimateTail[H header.Header[H]]( - head H, - blockTime, trustingPeriod time.Duration, -) (height uint64) { - headersToRetain := uint64(trustingPeriod / blockTime) //nolint:gosec - - if headersToRetain >= head.Height() { - return 1 - } - tail := head.Height() - headersToRetain - return tail -} diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index 0785dcf5..8d0dcf1c 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -8,124 +8,14 @@ import ( "testing" "time" - "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/celestiaorg/go-header" "github.com/celestiaorg/go-header/headertest" "github.com/celestiaorg/go-header/local" - "github.com/celestiaorg/go-header/store" ) -func TestSyncer_TailInit(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - - remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) - - expectedTail, err := remoteStore.GetByHeight(ctx, 69) - require.NoError(t, err) - - tests := []struct { - name string - option Option - expected func() *headertest.DummyHeader - expectedAfterRestart func() *headertest.DummyHeader - }{ - { - "Estimate", - func(p *Parameters) {}, // noop to trigger estimation, - func() *headertest.DummyHeader { - remoteTail, err := remoteStore.Tail(ctx) - require.NoError(t, err) - return remoteTail - }, - func() *headertest.DummyHeader { - remoteTail, err := remoteStore.Tail(ctx) - require.NoError(t, err) - return remoteTail - }, - }, - { - "SyncFromHash", - WithSyncFromHash(expectedTail.Hash()), - func() *headertest.DummyHeader { - return expectedTail - }, - func() *headertest.DummyHeader { - expectedTail, err := remoteStore.GetByHeight(ctx, expectedTail.Height()+10) - require.NoError(t, err) - return expectedTail - }, - }, - { - "SyncFromHeight", - WithSyncFromHeight(expectedTail.Height()), - func() *headertest.DummyHeader { - return expectedTail - }, - func() *headertest.DummyHeader { - expectedTail, err := remoteStore.GetByHeight(ctx, expectedTail.Height()-10) - require.NoError(t, err) - return expectedTail - }, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - localStore, err := store.NewStore[*headertest.DummyHeader]( - ds, - store.WithWriteBatchSize(1), - ) - require.NoError(t, err) - err = localStore.Start(ctx) - require.NoError(t, err) - - syncer, err := NewSyncer[*headertest.DummyHeader]( - remoteStore, - localStore, - headertest.NewDummySubscriber(), - WithBlockTime(time.Second*6), - test.option, - ) - require.NoError(t, err) - - err = syncer.Start(ctx) - require.NoError(t, err) - time.Sleep(time.Millisecond * 100) - - // check that the syncer has the expected tail and head - expectedTail := test.expected() - storeTail, err := localStore.Tail(ctx) - require.NoError(t, err) - assert.EqualValues(t, expectedTail.Height(), storeTail.Height()) - storeHead, err := localStore.Head(ctx) - require.NoError(t, err) - assert.EqualValues(t, remoteStore.Height(), storeHead.Height()) - - // restart the Syncer and set a new tail - err = syncer.Stop(ctx) - require.NoError(t, err) - expectedTail = test.expectedAfterRestart() - syncer.Params.SyncFromHeight = expectedTail.Height() - syncer.Params.SyncFromHash = expectedTail.Hash() - err = syncer.Start(ctx) - require.NoError(t, err) - - time.Sleep(time.Millisecond * 10) - - // ensure that the Syncer moved to the new tail after restart - storeTail, err = localStore.Tail(ctx) - require.NoError(t, err) - assert.EqualValues(t, expectedTail.Height(), storeTail.Height()) - }) - } -} - func TestSyncer_HeadConcurrencyError(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) diff --git a/sync/sync_store.go b/sync/sync_store.go index 62838dfc..dcac5d6a 100644 --- a/sync/sync_store.go +++ b/sync/sync_store.go @@ -28,6 +28,10 @@ type syncStore[H header.Header[H]] struct { head atomic.Pointer[H] } +func (s *syncStore[H]) Reset() { + s.head.Store(nil) +} + func (s *syncStore[H]) Head(ctx context.Context) (H, error) { if headPtr := s.head.Load(); headPtr != nil { return *headPtr, nil diff --git a/sync/sync_tail.go b/sync/sync_tail.go new file mode 100644 index 00000000..fd3b755a --- /dev/null +++ b/sync/sync_tail.go @@ -0,0 +1,156 @@ +package sync + +import ( + "bytes" + "context" + "errors" + "fmt" + "time" + + "github.com/celestiaorg/go-header" +) + +// TODO: +// * Flush + +// subjectiveTail returns the current Tail header. +// It ensures the Tail is actual and valid according to Parameters. +func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { + tail, err := s.store.Tail(ctx) + if err != nil && !errors.Is(err, header.ErrEmptyStore) { + return tail, err + } + + var fetched bool + if tailHash, outdated := s.isTailHashOutdated(tail); outdated { + tail, err = s.store.Get(ctx, tailHash) + if err != nil { + tail, err = s.getter.Get(ctx, tailHash) + if err != nil { + return tail, fmt.Errorf("getting SyncFromHash tail(%x): %w", tailHash, err) + } + fetched = true + } + } else if tailHeight, outdated := s.isTailHeightOutdated(tail); outdated { + // hack for the case with necessary tail in the future avoiding heightSub + storeCtx, cancel := context.WithTimeout(ctx, time.Second) + tail, err = s.store.GetByHeight(storeCtx, tailHeight) + cancel() + if err != nil { + tail, err = s.getter.GetByHeight(ctx, tailHeight) + if err != nil { + return tail, fmt.Errorf("getting SyncFromHeight tail(%d): %w", tailHeight, err) + } + fetched = true + } + } else if tailHash == nil && tailHeight == 0 { + if tail.IsZero() { + // no previously known Tail available - estimate solely on Head + estimatedHeight := estimateTail(head, s.Params.blockTime, s.Params.TrustingPeriod) + tail, err = s.getter.GetByHeight(ctx, estimatedHeight) + if err != nil { + return tail, fmt.Errorf("getting estimated tail(%d): %w", tailHeight, err) + } + fetched = true + } else { + // have a known Tail - estimate basing on it. + cutoffTime := head.Time().UTC().Add(-s.Params.TrustingPeriod) + diff := cutoffTime.Sub(tail.Time().UTC()) + if diff <= 0 { + // current tail is relevant as is + return tail, nil + } + + toDeleteEstimate := uint64(diff / s.Params.blockTime) + estimatedNewTail := tail.Height() + toDeleteEstimate + + for { + tail, err = s.store.GetByHeight(ctx, estimatedNewTail) + if err != nil { + log.Errorw("getting estimated tail from store ", "error", err) + return tail, err + } + if tail.Time().UTC().Before(cutoffTime) { + break + } + + estimatedNewTail++ + } + } + } + + if fetched { + if err := s.store.Append(ctx, tail); err != nil { + return tail, fmt.Errorf("appending tail header: %w", err) + } + + time.Sleep(time.Millisecond * 1000) + // TODO: Flush + } + + if err := s.moveTail(ctx, tail); err != nil { + return tail, fmt.Errorf("moving tail: %w", err) + } + + return tail, nil +} + +func (s *Syncer[H]) moveTail(ctx context.Context, new H) error { + old, err := s.store.Tail(ctx) + if errors.Is(err, header.ErrEmptyStore) { + return nil + } + if err != nil { + return err + } + + switch { + case old.Height() < new.Height(): + log.Infof("move tail up from %d to %d, pruning the diff...", old, new) + err := s.store.DeleteTo(ctx, new.Height()) + if err != nil { + return fmt.Errorf( + "deleting headers up to newly configured Tail(%d): %w", + new.Height(), + err, + ) + } + case old.Height() > new.Height(): + log.Infof("move tail down from %d to %d, syncing the diff...", old, new) + + // TODO(@Wondertan): This works but it assumes this code is only run before syncing routine starts. + // If run after, it may race with other in prog syncs. + // To be reworked by bsync. + err := s.doSync(ctx, new, old) + if err != nil { + return fmt.Errorf("syncing the diff between old and new Tail: %w", err) + } + } + + return nil +} + +func estimateTail[H header.Header[H]]( + head H, + blockTime, trustingPeriod time.Duration, +) (height uint64) { + headersToRetain := uint64(trustingPeriod / blockTime) //nolint:gosec + + if headersToRetain >= head.Height() { + return 1 + } + tail := head.Height() - headersToRetain + return tail +} + +func (s *Syncer[H]) isTailHashOutdated(h H) (header.Hash, bool) { + wantHash := s.Params.SyncFromHash + outdated := wantHash != nil && (h.IsZero() || !bytes.Equal(wantHash, h.Hash())) + return wantHash, outdated +} + +func (s *Syncer[H]) isTailHeightOutdated(h H) (uint64, bool) { + wantHeight := s.Params.SyncFromHeight + outdated := wantHeight > 0 && (h.IsZero() || h.Height() != wantHeight) + return wantHeight, outdated +} diff --git a/sync/sync_tail_test.go b/sync/sync_tail_test.go new file mode 100644 index 00000000..0b912787 --- /dev/null +++ b/sync/sync_tail_test.go @@ -0,0 +1,162 @@ +package sync + +import ( + "context" + "testing" + "time" + + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/go-header/headertest" + "github.com/celestiaorg/go-header/store" +) + +func TestSyncer_TailReconfiguration(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + localStore, err := store.NewStore[*headertest.DummyHeader]( + ds, + store.WithWriteBatchSize(1), + ) + require.NoError(t, err) + err = localStore.Start(ctx) + require.NoError(t, err) + + syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(time.Second*6), + ) + require.NoError(t, err) + + err = syncer.Start(ctx) + require.NoError(t, err) + err = syncer.Stop(ctx) + require.NoError(t, err) + time.Sleep(time.Millisecond * 10) + + syncer.Params.SyncFromHeight = 69 + + err = syncer.Start(ctx) + require.NoError(t, err) + time.Sleep(time.Millisecond * 10) + + storeTail, err := localStore.Tail(ctx) + require.NoError(t, err) + assert.EqualValues(t, syncer.Params.SyncFromHeight, storeTail.Height()) +} + +func TestSyncer_TailInitialization(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) + + expectedTail, err := remoteStore.GetByHeight(ctx, 69) + require.NoError(t, err) + + tests := []struct { + name string + option Option + expected func() *headertest.DummyHeader + expectedAfterRestart func() *headertest.DummyHeader + }{ + { + "Estimate", + func(p *Parameters) {}, // noop to trigger estimation, + func() *headertest.DummyHeader { + remoteTail, err := remoteStore.Tail(ctx) + require.NoError(t, err) + return remoteTail + }, + func() *headertest.DummyHeader { + remoteTail, err := remoteStore.Tail(ctx) + require.NoError(t, err) + return remoteTail + }, + }, + { + "SyncFromHash", + WithSyncFromHash(expectedTail.Hash()), + func() *headertest.DummyHeader { + return expectedTail + }, + func() *headertest.DummyHeader { + expectedTail, err := remoteStore.GetByHeight(ctx, expectedTail.Height()+10) + require.NoError(t, err) + return expectedTail + }, + }, + { + "SyncFromHeight", + WithSyncFromHeight(expectedTail.Height()), + func() *headertest.DummyHeader { + return expectedTail + }, + func() *headertest.DummyHeader { + expectedTail, err := remoteStore.GetByHeight(ctx, expectedTail.Height()-10) + require.NoError(t, err) + return expectedTail + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + localStore, err := store.NewStore[*headertest.DummyHeader]( + ds, + store.WithWriteBatchSize(1), + ) + require.NoError(t, err) + err = localStore.Start(ctx) + require.NoError(t, err) + + syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(time.Second*6), + test.option, + ) + require.NoError(t, err) + + err = syncer.Start(ctx) + require.NoError(t, err) + time.Sleep(time.Millisecond * 100) + + // check that the syncer has the expected tail and head + expectedTail := test.expected() + storeTail, err := localStore.Tail(ctx) + require.NoError(t, err) + assert.EqualValues(t, expectedTail.Height(), storeTail.Height()) + storeHead, err := localStore.Head(ctx) + require.NoError(t, err) + assert.EqualValues(t, remoteStore.Height(), storeHead.Height()) + + // restart the Syncer and set a new tail + err = syncer.Stop(ctx) + require.NoError(t, err) + expectedTail = test.expectedAfterRestart() + syncer.Params.SyncFromHeight = expectedTail.Height() + syncer.Params.SyncFromHash = expectedTail.Hash() + err = syncer.Start(ctx) + require.NoError(t, err) + + time.Sleep(time.Millisecond * 10) + + // ensure that the Syncer moved to the new tail after restart + storeTail, err = localStore.Tail(ctx) + require.NoError(t, err) + assert.EqualValues(t, expectedTail.Height(), storeTail.Height()) + }) + } +} diff --git a/sync/sync_test.go b/sync/sync_test.go index 2945d978..06ac7eff 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -36,8 +36,8 @@ func TestSyncSimpleRequestingHead(t *testing.T) { localStore, headertest.NewDummySubscriber(), WithBlockTime(time.Second*30), - WithRecencyThreshold(time.Second*35), // add 5 second buffer - WithTrustingPeriod(time.Microsecond), + WithRecencyThreshold(time.Microsecond), + WithTrustingPeriod(time.Minute*1), ) require.NoError(t, err) err = syncer.Start(ctx) From 5d59427e2fcac941b3292c06fe5e9eee1f796dc9 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Sun, 11 May 2025 03:20:52 +0200 Subject: [PATCH 10/23] comment improvements --- sync/sync_tail.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sync/sync_tail.go b/sync/sync_tail.go index fd3b755a..b1a29e96 100644 --- a/sync/sync_tail.go +++ b/sync/sync_tail.go @@ -14,7 +14,8 @@ import ( // * Flush // subjectiveTail returns the current Tail header. -// It ensures the Tail is actual and valid according to Parameters. +// Lazily fetching it if it doesn't exist locally or moving it to a different height. +// Moving is done if either parameters are changed or tail moved outside a pruning window. func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { tail, err := s.store.Tail(ctx) if err != nil && !errors.Is(err, header.ErrEmptyStore) { @@ -32,7 +33,7 @@ func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { fetched = true } } else if tailHeight, outdated := s.isTailHeightOutdated(tail); outdated { - // hack for the case with necessary tail in the future avoiding heightSub + // hack for the case with tailHeight > store.Height avoiding heightSub storeCtx, cancel := context.WithTimeout(ctx, time.Second) tail, err = s.store.GetByHeight(storeCtx, tailHeight) cancel() @@ -95,6 +96,9 @@ func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { return tail, nil } +// moveTail moves the Tail to be the given header. +// It will prune the store if the new Tail is higher than the old one or +// sync up if the new Tail is lower than the old one. func (s *Syncer[H]) moveTail(ctx context.Context, new H) error { old, err := s.store.Tail(ctx) if errors.Is(err, header.ErrEmptyStore) { From d83e15d5a1ee0f6725199b7b56ab4e0977bd2c68 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Mon, 12 May 2025 02:25:04 +0200 Subject: [PATCH 11/23] minor changes --- sync/sync_tail.go | 10 ++-------- sync/sync_tail_test.go | 2 ++ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/sync/sync_tail.go b/sync/sync_tail.go index b1a29e96..9cd591bb 100644 --- a/sync/sync_tail.go +++ b/sync/sync_tail.go @@ -10,9 +10,6 @@ import ( "github.com/celestiaorg/go-header" ) -// TODO: -// * Flush - // subjectiveTail returns the current Tail header. // Lazily fetching it if it doesn't exist locally or moving it to a different height. // Moving is done if either parameters are changed or tail moved outside a pruning window. @@ -84,9 +81,6 @@ func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { if err := s.store.Append(ctx, tail); err != nil { return tail, fmt.Errorf("appending tail header: %w", err) } - - time.Sleep(time.Millisecond * 1000) - // TODO: Flush } if err := s.moveTail(ctx, tail); err != nil { @@ -110,7 +104,7 @@ func (s *Syncer[H]) moveTail(ctx context.Context, new H) error { switch { case old.Height() < new.Height(): - log.Infof("move tail up from %d to %d, pruning the diff...", old, new) + log.Infof("move tail up from %d to %d, pruning the diff...", old.Height(), new.Height()) err := s.store.DeleteTo(ctx, new.Height()) if err != nil { return fmt.Errorf( @@ -120,7 +114,7 @@ func (s *Syncer[H]) moveTail(ctx context.Context, new H) error { ) } case old.Height() > new.Height(): - log.Infof("move tail down from %d to %d, syncing the diff...", old, new) + log.Infof("move tail down from %d to %d, syncing the diff...", old.Height(), new.Height()) // TODO(@Wondertan): This works but it assumes this code is only run before syncing routine starts. // If run after, it may race with other in prog syncs. diff --git a/sync/sync_tail_test.go b/sync/sync_tail_test.go index 0b912787..be093cd9 100644 --- a/sync/sync_tail_test.go +++ b/sync/sync_tail_test.go @@ -34,6 +34,7 @@ func TestSyncer_TailReconfiguration(t *testing.T) { localStore, headertest.NewDummySubscriber(), WithBlockTime(time.Second*6), + WithRecencyThreshold(time.Nanosecond), ) require.NoError(t, err) @@ -125,6 +126,7 @@ func TestSyncer_TailInitialization(t *testing.T) { localStore, headertest.NewDummySubscriber(), WithBlockTime(time.Second*6), + WithRecencyThreshold(time.Nanosecond), test.option, ) require.NoError(t, err) From cb71de039e44827e7a01189c1fd019383cc7dc6e Mon Sep 17 00:00:00 2001 From: Wondertan Date: Tue, 13 May 2025 23:40:56 +0200 Subject: [PATCH 12/23] add pruningwindow --- sync/options.go | 3 +++ sync/sync.go | 1 - sync/sync_head.go | 8 +++++++- sync/sync_store.go | 4 ---- sync/sync_tail.go | 16 ++++++++++------ sync/sync_tail_test.go | 4 +++- 6 files changed, 23 insertions(+), 13 deletions(-) diff --git a/sync/options.go b/sync/options.go index 0282bf90..26398f49 100644 --- a/sync/options.go +++ b/sync/options.go @@ -38,6 +38,8 @@ type Parameters struct { // // SyncFromHeight has lower priority than SyncFromHash. SyncFromHeight uint64 + // PruningWindow defines the duration within which headers will be retained before being pruned. + PruningWindow time.Duration // blockTime provides a reference point for the Syncer to determine // whether its subjective head is outdated. // Keeping it private to disable serialization for it. @@ -54,6 +56,7 @@ type Parameters struct { func DefaultParameters() Parameters { return Parameters{ TrustingPeriod: 336 * time.Hour, // tendermint's default trusting period + PruningWindow: 337 * time.Hour, } } diff --git a/sync/sync.go b/sync/sync.go index 2e28efe7..4c4ee500 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -110,7 +110,6 @@ func (s *Syncer[H]) Start(ctx context.Context) error { // Stop stops Syncer. func (s *Syncer[H]) Stop(context.Context) error { s.cancel() - s.store.Reset() return s.metrics.Close() } diff --git a/sync/sync_head.go b/sync/sync_head.go index 995a4356..1f4e02d4 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -50,7 +50,13 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err // NOTE: We could trust the netHead like we do during 'automatic subjective initialization' // but in this case our subjective head is not expired, so we should verify netHead // and only if it is valid, set it as new head - _ = s.incomingNetworkHead(ctx, netHead) + err = s.incomingNetworkHead(ctx, netHead) + if err != nil { + log.Errorw("incoming network head failed", + "height", netHead.Height(), + "hash", netHead.Hash().String(), + "err", err) + } // netHead was either accepted or rejected as the new subjective // anyway return most current known subjective head return s.subjectiveHead(ctx) diff --git a/sync/sync_store.go b/sync/sync_store.go index dcac5d6a..62838dfc 100644 --- a/sync/sync_store.go +++ b/sync/sync_store.go @@ -28,10 +28,6 @@ type syncStore[H header.Header[H]] struct { head atomic.Pointer[H] } -func (s *syncStore[H]) Reset() { - s.head.Store(nil) -} - func (s *syncStore[H]) Head(ctx context.Context) (H, error) { if headPtr := s.head.Load(); headPtr != nil { return *headPtr, nil diff --git a/sync/sync_tail.go b/sync/sync_tail.go index 9cd591bb..01c19bf1 100644 --- a/sync/sync_tail.go +++ b/sync/sync_tail.go @@ -10,6 +10,11 @@ import ( "github.com/celestiaorg/go-header" ) +// TODO: +// * Refactor tests +// * Write tests for estimation +// * Ensure sync always happen on start + // subjectiveTail returns the current Tail header. // Lazily fetching it if it doesn't exist locally or moving it to a different height. // Moving is done if either parameters are changed or tail moved outside a pruning window. @@ -30,11 +35,10 @@ func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { fetched = true } } else if tailHeight, outdated := s.isTailHeightOutdated(tail); outdated { - // hack for the case with tailHeight > store.Height avoiding heightSub - storeCtx, cancel := context.WithTimeout(ctx, time.Second) - tail, err = s.store.GetByHeight(storeCtx, tailHeight) - cancel() - if err != nil { + if tailHeight <= s.store.Height() { + tail, err = s.store.GetByHeight(ctx, tailHeight) + } + if err != nil || tailHeight != tail.Height() { tail, err = s.getter.GetByHeight(ctx, tailHeight) if err != nil { return tail, fmt.Errorf("getting SyncFromHeight tail(%d): %w", tailHeight, err) @@ -52,7 +56,7 @@ func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { fetched = true } else { // have a known Tail - estimate basing on it. - cutoffTime := head.Time().UTC().Add(-s.Params.TrustingPeriod) + cutoffTime := head.Time().UTC().Add(-s.Params.PruningWindow) diff := cutoffTime.Sub(tail.Time().UTC()) if diff <= 0 { // current tail is relevant as is diff --git a/sync/sync_tail_test.go b/sync/sync_tail_test.go index be093cd9..db90bc3d 100644 --- a/sync/sync_tail_test.go +++ b/sync/sync_tail_test.go @@ -40,6 +40,9 @@ func TestSyncer_TailReconfiguration(t *testing.T) { err = syncer.Start(ctx) require.NoError(t, err) + time.Sleep(time.Millisecond * 10) + syncer.SyncWait(ctx) + err = syncer.Stop(ctx) require.NoError(t, err) time.Sleep(time.Millisecond * 10) @@ -48,7 +51,6 @@ func TestSyncer_TailReconfiguration(t *testing.T) { err = syncer.Start(ctx) require.NoError(t, err) - time.Sleep(time.Millisecond * 10) storeTail, err := localStore.Tail(ctx) require.NoError(t, err) From eb96e986755325bb1f12e0c62e6844fb2a6839eb Mon Sep 17 00:00:00 2001 From: Wondertan Date: Tue, 13 May 2025 23:48:51 +0200 Subject: [PATCH 13/23] lint --- sync/sync_tail.go | 24 ++++++++++++------------ sync/sync_tail_test.go | 4 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/sync/sync_tail.go b/sync/sync_tail.go index 01c19bf1..0d29df0f 100644 --- a/sync/sync_tail.go +++ b/sync/sync_tail.go @@ -63,7 +63,7 @@ func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { return tail, nil } - toDeleteEstimate := uint64(diff / s.Params.blockTime) + toDeleteEstimate := uint64(diff / s.Params.blockTime) //nolint:gosec estimatedNewTail := tail.Height() + toDeleteEstimate for { @@ -97,8 +97,8 @@ func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { // moveTail moves the Tail to be the given header. // It will prune the store if the new Tail is higher than the old one or // sync up if the new Tail is lower than the old one. -func (s *Syncer[H]) moveTail(ctx context.Context, new H) error { - old, err := s.store.Tail(ctx) +func (s *Syncer[H]) moveTail(ctx context.Context, to H) error { + from, err := s.store.Tail(ctx) if errors.Is(err, header.ErrEmptyStore) { return nil } @@ -107,25 +107,25 @@ func (s *Syncer[H]) moveTail(ctx context.Context, new H) error { } switch { - case old.Height() < new.Height(): - log.Infof("move tail up from %d to %d, pruning the diff...", old.Height(), new.Height()) - err := s.store.DeleteTo(ctx, new.Height()) + case from.Height() < to.Height(): + log.Infof("move tail up from %d to %d, pruning the diff...", from.Height(), to.Height()) + err := s.store.DeleteTo(ctx, to.Height()) if err != nil { return fmt.Errorf( - "deleting headers up to newly configured Tail(%d): %w", - new.Height(), + "deleting headers up to newly configured tail(%d): %w", + to.Height(), err, ) } - case old.Height() > new.Height(): - log.Infof("move tail down from %d to %d, syncing the diff...", old.Height(), new.Height()) + case from.Height() > to.Height(): + log.Infof("move tail down from %d to %d, syncing the diff...", from.Height(), to.Height()) // TODO(@Wondertan): This works but it assumes this code is only run before syncing routine starts. // If run after, it may race with other in prog syncs. // To be reworked by bsync. - err := s.doSync(ctx, new, old) + err := s.doSync(ctx, to, from) if err != nil { - return fmt.Errorf("syncing the diff between old and new Tail: %w", err) + return fmt.Errorf("syncing the diff between from and new tail: %w", err) } } diff --git a/sync/sync_tail_test.go b/sync/sync_tail_test.go index db90bc3d..b4546c44 100644 --- a/sync/sync_tail_test.go +++ b/sync/sync_tail_test.go @@ -41,8 +41,8 @@ func TestSyncer_TailReconfiguration(t *testing.T) { err = syncer.Start(ctx) require.NoError(t, err) time.Sleep(time.Millisecond * 10) - syncer.SyncWait(ctx) - + err = syncer.SyncWait(ctx) + require.NoError(t, err) err = syncer.Stop(ctx) require.NoError(t, err) time.Sleep(time.Millisecond * 10) From 88e6f695588766afb81d4820438e676c718db18f Mon Sep 17 00:00:00 2001 From: Wondertan Date: Tue, 13 May 2025 23:50:13 +0200 Subject: [PATCH 14/23] add option --- sync/options.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sync/options.go b/sync/options.go index 26398f49..0a36c055 100644 --- a/sync/options.go +++ b/sync/options.go @@ -120,3 +120,10 @@ func WithSyncFromHeight(height uint64) Option { p.SyncFromHeight = height } } + +// WithPruningWindow sets the duration within which headers will be retained before being pruned. +func WithPruningWindow(window time.Duration) Option { + return func(p *Parameters) { + p.PruningWindow = window + } +} From a99d7ac98b1078b63119ecc2da97a84139a88ea2 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 14 May 2025 17:35:41 +0200 Subject: [PATCH 15/23] estimation test --- headertest/dummy_suite.go | 4 +++- sync/sync_head.go | 13 ++++++----- sync/sync_tail.go | 13 ++++++++--- sync/sync_tail_test.go | 45 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 64 insertions(+), 11 deletions(-) diff --git a/headertest/dummy_suite.go b/headertest/dummy_suite.go index 27a78c5e..1f63ce17 100644 --- a/headertest/dummy_suite.go +++ b/headertest/dummy_suite.go @@ -5,6 +5,8 @@ import ( "time" ) +const HeaderTime = time.Nanosecond + // DummySuite provides everything you need to test chain of DummyHeaders. // If not, please don't hesitate to extend it for your case. type DummySuite struct { @@ -42,7 +44,7 @@ func (s *DummySuite) NextHeader() *DummyHeader { } dh := RandDummyHeader(s.t) - dh.Timestamp = s.head.Time().Add(time.Nanosecond) + dh.Timestamp = s.head.Time().Add(HeaderTime) dh.HeightI = s.head.Height() + 1 dh.PreviousHash = s.head.Hash() dh.Chainid = s.head.ChainID() diff --git a/sync/sync_head.go b/sync/sync_head.go index 1f4e02d4..db8513a7 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -50,13 +50,7 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err // NOTE: We could trust the netHead like we do during 'automatic subjective initialization' // but in this case our subjective head is not expired, so we should verify netHead // and only if it is valid, set it as new head - err = s.incomingNetworkHead(ctx, netHead) - if err != nil { - log.Errorw("incoming network head failed", - "height", netHead.Height(), - "hash", netHead.Hash().String(), - "err", err) - } + _ = s.incomingNetworkHead(ctx, netHead) // netHead was either accepted or rejected as the new subjective // anyway return most current known subjective head return s.subjectiveHead(ctx) @@ -155,6 +149,11 @@ func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, head H) error { // To be reworked by bsync. _, err := s.subjectiveTail(ctx, head) if err != nil { + log.Errorw("subjective tail failed", + "new_head", head.Height(), + "hash", head.Hash().String(), + "err", err, + ) return err } diff --git a/sync/sync_tail.go b/sync/sync_tail.go index 0d29df0f..74bf3c69 100644 --- a/sync/sync_tail.go +++ b/sync/sync_tail.go @@ -12,7 +12,6 @@ import ( // TODO: // * Refactor tests -// * Write tests for estimation // * Ensure sync always happen on start // subjectiveTail returns the current Tail header. @@ -26,8 +25,10 @@ func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { var fetched bool if tailHash, outdated := s.isTailHashOutdated(tail); outdated { + log.Debugw("tail hash updated", "hash", tailHash) tail, err = s.store.Get(ctx, tailHash) if err != nil { + log.Debugw("tail hash not available locally, fetching...", "hash", tailHash) tail, err = s.getter.Get(ctx, tailHash) if err != nil { return tail, fmt.Errorf("getting SyncFromHash tail(%x): %w", tailHash, err) @@ -35,10 +36,12 @@ func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { fetched = true } } else if tailHeight, outdated := s.isTailHeightOutdated(tail); outdated { + log.Debugw("tail height updated", "height", tailHeight) if tailHeight <= s.store.Height() { tail, err = s.store.GetByHeight(ctx, tailHeight) } if err != nil || tailHeight != tail.Height() { + log.Debugw("tail height not available locally, fetching...", "height", tailHeight) tail, err = s.getter.GetByHeight(ctx, tailHeight) if err != nil { return tail, fmt.Errorf("getting SyncFromHeight tail(%d): %w", tailHeight, err) @@ -62,6 +65,7 @@ func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { // current tail is relevant as is return tail, nil } + log.Debugw("current tail is beyond pruning window", "current_height", tail.Height(), "diff", diff.String()) toDeleteEstimate := uint64(diff / s.Params.blockTime) //nolint:gosec estimatedNewTail := tail.Height() + toDeleteEstimate @@ -69,15 +73,18 @@ func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { for { tail, err = s.store.GetByHeight(ctx, estimatedNewTail) if err != nil { - log.Errorw("getting estimated tail from store ", "error", err) + log.Errorw("getting estimated tail from store", "height", estimatedNewTail, "error", err) return tail, err } - if tail.Time().UTC().Before(cutoffTime) { + if tail.Time().UTC().Compare(cutoffTime) <= 0 { + // tail before or equal to cutoffTime break } estimatedNewTail++ } + + log.Debugw("estimated new tail", "new_height", tail.Height()) } } diff --git a/sync/sync_tail_test.go b/sync/sync_tail_test.go index b4546c44..b4b00e57 100644 --- a/sync/sync_tail_test.go +++ b/sync/sync_tail_test.go @@ -14,6 +14,51 @@ import ( "github.com/celestiaorg/go-header/store" ) +func TestSyncer_TailEstimation(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + localStore, err := store.NewStore[*headertest.DummyHeader]( + ds, + store.WithWriteBatchSize(1), + ) + require.NoError(t, err) + err = localStore.Start(ctx) + require.NoError(t, err) + + syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(headertest.HeaderTime), + WithPruningWindow(time.Nanosecond*50), + ) + require.NoError(t, err) + + err = syncer.Start(ctx) + require.NoError(t, err) + time.Sleep(time.Millisecond * 10) + err = syncer.SyncWait(ctx) + require.NoError(t, err) + require.EqualValues(t, 100, syncer.State().Height) + + tail, err := localStore.Tail(ctx) + require.NoError(t, err) + require.EqualValues(t, tail.Height(), 1) + + // simulate new header arrival by triggering recency check + head, err := syncer.Head(ctx) + require.NoError(t, err) + require.Equal(t, head.Height(), remoteStore.Height()) + + tail, err = localStore.Tail(ctx) + require.NoError(t, err) + require.EqualValues(t, 50, tail.Height()) +} + func TestSyncer_TailReconfiguration(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) From 630ffa3670bc0640250bdd4bfcb28e61118d45b8 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 14 May 2025 18:40:46 +0200 Subject: [PATCH 16/23] resolve potential rare but possible case that will annoy the user --- sync/sync_head.go | 11 +++++++++++ sync/sync_tail.go | 1 - 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/sync/sync_head.go b/sync/sync_head.go index db8513a7..7be1a525 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -27,6 +27,17 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err } // if subjective header is recent enough (relative to the network's block time) - just use it if isRecent(sbjHead, s.Params.blockTime, s.Params.recencyThreshold) { + // TODO(@Wondertan): This is temporary. Subjective tail is always triggered when Syncer is started + // unless we have a very recent head. Generally this is fine and the way it should work, however due to + // the way diff syncing works in moveTail method, we need to ensure that diff syncing happens on Start always + // which is triggered by this call. + // To be removed by bsync. + _, err = s.subjectiveTail(ctx, sbjHead) + if err != nil { + log.Errorw("getting subjective tail", "err", err) + return sbjHead, err + } + return sbjHead, nil } diff --git a/sync/sync_tail.go b/sync/sync_tail.go index 74bf3c69..b63b2270 100644 --- a/sync/sync_tail.go +++ b/sync/sync_tail.go @@ -12,7 +12,6 @@ import ( // TODO: // * Refactor tests -// * Ensure sync always happen on start // subjectiveTail returns the current Tail header. // Lazily fetching it if it doesn't exist locally or moving it to a different height. From 31d7e13ef6e84eeb95740742ccf24e5fd947c26d Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 14 May 2025 23:00:46 +0200 Subject: [PATCH 17/23] last refactor beautifcation --- sync/sync_tail.go | 238 +++++++++++++++++++++++++---------------- sync/sync_tail_test.go | 2 +- 2 files changed, 146 insertions(+), 94 deletions(-) diff --git a/sync/sync_tail.go b/sync/sync_tail.go index b63b2270..47bccdce 100644 --- a/sync/sync_tail.go +++ b/sync/sync_tail.go @@ -5,112 +5,100 @@ import ( "context" "errors" "fmt" - "time" "github.com/celestiaorg/go-header" ) -// TODO: -// * Refactor tests - -// subjectiveTail returns the current Tail header. +// subjectiveTail returns the current actual Tail header. // Lazily fetching it if it doesn't exist locally or moving it to a different height. // Moving is done if either parameters are changed or tail moved outside a pruning window. func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { - tail, err := s.store.Tail(ctx) + oldTail, err := s.store.Tail(ctx) if err != nil && !errors.Is(err, header.ErrEmptyStore) { - return tail, err + return oldTail, err + } + + newTail, err := s.updateTail(ctx, oldTail, head) + if err != nil { + return oldTail, fmt.Errorf("updating tail: %w", err) } - var fetched bool - if tailHash, outdated := s.isTailHashOutdated(tail); outdated { - log.Debugw("tail hash updated", "hash", tailHash) - tail, err = s.store.Get(ctx, tailHash) + if err := s.moveTail(ctx, oldTail, newTail); err != nil { + return oldTail, fmt.Errorf( + "moving tail from %d to %d: %w", + oldTail.Height(), + newTail.Height(), + err, + ) + } + + return newTail, nil +} + +// updateTail updates the tail header based on the Syncer parameters. +func (s *Syncer[H]) updateTail(ctx context.Context, oldTail, head H) (newTail H, err error) { + switch tailHash := s.tailHash(oldTail); tailHash { + case nil: + tailHeight, err := s.tailHeight(ctx, oldTail, head) if err != nil { - log.Debugw("tail hash not available locally, fetching...", "hash", tailHash) - tail, err = s.getter.Get(ctx, tailHash) - if err != nil { - return tail, fmt.Errorf("getting SyncFromHash tail(%x): %w", tailHash, err) - } - fetched = true + return oldTail, err } - } else if tailHeight, outdated := s.isTailHeightOutdated(tail); outdated { - log.Debugw("tail height updated", "height", tailHeight) + if tailHeight <= s.store.Height() { - tail, err = s.store.GetByHeight(ctx, tailHeight) - } - if err != nil || tailHeight != tail.Height() { - log.Debugw("tail height not available locally, fetching...", "height", tailHeight) - tail, err = s.getter.GetByHeight(ctx, tailHeight) - if err != nil { - return tail, fmt.Errorf("getting SyncFromHeight tail(%d): %w", tailHeight, err) - } - fetched = true - } - } else if tailHash == nil && tailHeight == 0 { - if tail.IsZero() { - // no previously known Tail available - estimate solely on Head - estimatedHeight := estimateTail(head, s.Params.blockTime, s.Params.TrustingPeriod) - tail, err = s.getter.GetByHeight(ctx, estimatedHeight) - if err != nil { - return tail, fmt.Errorf("getting estimated tail(%d): %w", tailHeight, err) + // check if the new tail is below the current head to avoid heightSub blocking + newTail, err = s.store.GetByHeight(ctx, tailHeight) + if err == nil { + return newTail, nil } - fetched = true - } else { - // have a known Tail - estimate basing on it. - cutoffTime := head.Time().UTC().Add(-s.Params.PruningWindow) - diff := cutoffTime.Sub(tail.Time().UTC()) - if diff <= 0 { - // current tail is relevant as is - return tail, nil - } - log.Debugw("current tail is beyond pruning window", "current_height", tail.Height(), "diff", diff.String()) - - toDeleteEstimate := uint64(diff / s.Params.blockTime) //nolint:gosec - estimatedNewTail := tail.Height() + toDeleteEstimate - - for { - tail, err = s.store.GetByHeight(ctx, estimatedNewTail) - if err != nil { - log.Errorw("getting estimated tail from store", "height", estimatedNewTail, "error", err) - return tail, err - } - if tail.Time().UTC().Compare(cutoffTime) <= 0 { - // tail before or equal to cutoffTime - break - } - - estimatedNewTail++ + if !errors.Is(err, header.ErrNotFound) { + return newTail, fmt.Errorf( + "loading SyncFromHeight tail from store(%d): %w", + tailHeight, + err, + ) } + } - log.Debugw("estimated new tail", "new_height", tail.Height()) + log.Debugw("tail height not available locally, fetching...", "height", tailHeight) + newTail, err = s.getter.GetByHeight(ctx, tailHeight) + if err != nil { + return newTail, fmt.Errorf("fetching SyncFromHeight tail(%d): %w", tailHeight, err) + } + default: + newTail, err = s.store.Get(ctx, tailHash) + if err == nil { + return newTail, nil + } + if !errors.Is(err, header.ErrNotFound) { + return newTail, fmt.Errorf( + "loading SyncFromHash tail from store(%x): %w", + tailHash, + err, + ) } - } - if fetched { - if err := s.store.Append(ctx, tail); err != nil { - return tail, fmt.Errorf("appending tail header: %w", err) + log.Debugw("tail hash not available locally, fetching...", "hash", tailHash) + newTail, err = s.getter.Get(ctx, tailHash) + if err != nil { + return newTail, fmt.Errorf("fetching SyncFromHash tail(%x): %w", tailHash, err) } } - if err := s.moveTail(ctx, tail); err != nil { - return tail, fmt.Errorf("moving tail: %w", err) + if err := s.store.Append(ctx, newTail); err != nil { + return newTail, fmt.Errorf("appending tail header: %w", err) } - return tail, nil + return newTail, nil } // moveTail moves the Tail to be the given header. // It will prune the store if the new Tail is higher than the old one or -// sync up if the new Tail is lower than the old one. -func (s *Syncer[H]) moveTail(ctx context.Context, to H) error { - from, err := s.store.Tail(ctx) - if errors.Is(err, header.ErrEmptyStore) { +// sync up the difference if the new Tail is lower than the old one. +func (s *Syncer[H]) moveTail(ctx context.Context, from, to H) error { + if from.IsZero() { + // no need to move the tail if it was not set previously return nil } - if err != nil { - return err - } switch { case from.Height() < to.Height(): @@ -138,27 +126,91 @@ func (s *Syncer[H]) moveTail(ctx context.Context, to H) error { return nil } -func estimateTail[H header.Header[H]]( - head H, - blockTime, trustingPeriod time.Duration, -) (height uint64) { - headersToRetain := uint64(trustingPeriod / blockTime) //nolint:gosec +// tailHash returns the expected tail hash. +// Does not return if the hash hasn't changed from the current tail hash. +func (s *Syncer[H]) tailHash(oldTail H) header.Hash { + hash := s.Params.SyncFromHash + if hash == nil { + return nil + } + updated := oldTail.IsZero() || !bytes.Equal(hash, oldTail.Hash()) + if !updated { + return nil + } + + log.Debugw("tail hash updated", "hash", hash) + return hash +} + +// tailHeight figures the actual tail height based on the Syncer parameters. +func (s *Syncer[H]) tailHeight(ctx context.Context, oldTail, head H) (uint64, error) { + height := s.Params.SyncFromHeight + if height > 0 { + return height, nil + } + + if oldTail.IsZero() { + return s.estimateTailHeader(head), nil + } + + height, err := s.findTailHeight(ctx, oldTail, head) + if err != nil { + return 0, fmt.Errorf("estimating oldTail height: %w", err) + } + + return height, nil +} + +// estimateTailHeader estimates the tail header based on the current head. +// It respects the trusting period, ensuring Syncer never initializes off an expired header. +func (s *Syncer[H]) estimateTailHeader(head H) uint64 { + headersToRetain := uint64(s.Params.TrustingPeriod / s.Params.blockTime) //nolint:gosec if headersToRetain >= head.Height() { + // means chain is very young so we can keep all headers starting from genesis return 1 } - tail := head.Height() - headersToRetain - return tail -} -func (s *Syncer[H]) isTailHashOutdated(h H) (header.Hash, bool) { - wantHash := s.Params.SyncFromHash - outdated := wantHash != nil && (h.IsZero() || !bytes.Equal(wantHash, h.Hash())) - return wantHash, outdated + return head.Height() - headersToRetain } -func (s *Syncer[H]) isTailHeightOutdated(h H) (uint64, bool) { - wantHeight := s.Params.SyncFromHeight - outdated := wantHeight > 0 && (h.IsZero() || h.Height() != wantHeight) - return wantHeight, outdated +// findTailHeight find the tail height based on the current head and tail. +// It respects the pruning window, ensuring Syncer maintains the tail within the window. +func (s *Syncer[H]) findTailHeight(ctx context.Context, oldTail, head H) (uint64, error) { + expectedTailTime := head.Time().UTC().Add(-s.Params.PruningWindow) + currentTailTime := oldTail.Time().UTC() + + timeDiff := expectedTailTime.Sub(currentTailTime) + if timeDiff <= 0 { + // current tail is relevant as is + return oldTail.Height(), nil + } + log.Debugw( + "current tail is beyond pruning window", + "tail_height", oldTail.Height(), + "time_diff", timeDiff.String(), + "window", s.Params.PruningWindow.String(), + ) + + heightDiff := uint64(timeDiff / s.Params.blockTime) //nolint:gosec + newTailHeight := oldTail.Height() + heightDiff + for { + newTail, err := s.store.GetByHeight(ctx, newTailHeight) + if err != nil { + return 0, fmt.Errorf( + "getting estimated new tail(%d) from store: %w", + newTailHeight, + err, + ) + } + if newTail.Time().UTC().Compare(expectedTailTime) <= 0 { + // oldTail before or equal to expectedTailTime + break + } + + newTailHeight++ + } + + log.Debugw("estimated new tail", "new_height", oldTail.Height()) + return newTailHeight, nil } diff --git a/sync/sync_tail_test.go b/sync/sync_tail_test.go index b4b00e57..73e31599 100644 --- a/sync/sync_tail_test.go +++ b/sync/sync_tail_test.go @@ -15,7 +15,7 @@ import ( ) func TestSyncer_TailEstimation(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*50) t.Cleanup(cancel) remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) From 661308178d5782aa6ab80a3307a4766b0b90b5a7 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 14 May 2025 23:17:21 +0200 Subject: [PATCH 18/23] minor findings --- sync/sync_tail.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sync/sync_tail.go b/sync/sync_tail.go index 47bccdce..66461e5d 100644 --- a/sync/sync_tail.go +++ b/sync/sync_tail.go @@ -18,7 +18,7 @@ func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { return oldTail, err } - newTail, err := s.updateTail(ctx, oldTail, head) + newTail, err := s.renewTail(ctx, oldTail, head) if err != nil { return oldTail, fmt.Errorf("updating tail: %w", err) } @@ -35,8 +35,8 @@ func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { return newTail, nil } -// updateTail updates the tail header based on the Syncer parameters. -func (s *Syncer[H]) updateTail(ctx context.Context, oldTail, head H) (newTail H, err error) { +// renewTail resolves the new actual tail header respecting Syncer parameters. +func (s *Syncer[H]) renewTail(ctx context.Context, oldTail, head H) (newTail H, err error) { switch tailHash := s.tailHash(oldTail); tailHash { case nil: tailHeight, err := s.tailHeight(ctx, oldTail, head) @@ -91,7 +91,7 @@ func (s *Syncer[H]) updateTail(ctx context.Context, oldTail, head H) (newTail H, return newTail, nil } -// moveTail moves the Tail to be the given header. +// moveTail moves the Tail to be the 'to' header. // It will prune the store if the new Tail is higher than the old one or // sync up the difference if the new Tail is lower than the old one. func (s *Syncer[H]) moveTail(ctx context.Context, from, to H) error { @@ -204,13 +204,13 @@ func (s *Syncer[H]) findTailHeight(ctx context.Context, oldTail, head H) (uint64 ) } if newTail.Time().UTC().Compare(expectedTailTime) <= 0 { - // oldTail before or equal to expectedTailTime + // new tail time is before or equal to expectedTailTime break } newTailHeight++ } - log.Debugw("estimated new tail", "new_height", oldTail.Height()) + log.Debugw("found new tail height", "height", newTailHeight) return newTailHeight, nil } From 8dec4516e9ca3add2d7dcb3952876bd89e8c7d2b Mon Sep 17 00:00:00 2001 From: Wondertan Date: Fri, 16 May 2025 18:13:59 +0200 Subject: [PATCH 19/23] minor fixes --- sync/sync.go | 1 + sync/sync_tail.go | 13 ++++++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/sync/sync.go b/sync/sync.go index 4c4ee500..dd3bf451 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -98,6 +98,7 @@ func (s *Syncer[H]) Start(ctx context.Context) error { if err != nil { return err } + // gets the latest head and kicks off syncing if necessary _, err = s.Head(ctx) if err != nil { return fmt.Errorf("error getting latest head during Start: %w", err) diff --git a/sync/sync_tail.go b/sync/sync_tail.go index 66461e5d..08251e9c 100644 --- a/sync/sync_tail.go +++ b/sync/sync_tail.go @@ -37,6 +37,7 @@ func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { // renewTail resolves the new actual tail header respecting Syncer parameters. func (s *Syncer[H]) renewTail(ctx context.Context, oldTail, head H) (newTail H, err error) { + // prioritizing hash over heights switch tailHash := s.tailHash(oldTail); tailHash { case nil: tailHeight, err := s.tailHeight(ctx, oldTail, head) @@ -119,7 +120,7 @@ func (s *Syncer[H]) moveTail(ctx context.Context, from, to H) error { // To be reworked by bsync. err := s.doSync(ctx, to, from) if err != nil { - return fmt.Errorf("syncing the diff between from and new tail: %w", err) + return fmt.Errorf("syncing the diff between from(%d) and to tail(%d): %w", from.Height(), to.Height(), err) } } @@ -151,20 +152,20 @@ func (s *Syncer[H]) tailHeight(ctx context.Context, oldTail, head H) (uint64, er } if oldTail.IsZero() { - return s.estimateTailHeader(head), nil + return s.estimateTailHeight(head), nil } height, err := s.findTailHeight(ctx, oldTail, head) if err != nil { - return 0, fmt.Errorf("estimating oldTail height: %w", err) + return 0, fmt.Errorf("finding tail height: %w", err) } return height, nil } -// estimateTailHeader estimates the tail header based on the current head. +// estimateTailHeight estimates the tail header based on the current head. // It respects the trusting period, ensuring Syncer never initializes off an expired header. -func (s *Syncer[H]) estimateTailHeader(head H) uint64 { +func (s *Syncer[H]) estimateTailHeight(head H) uint64 { headersToRetain := uint64(s.Params.TrustingPeriod / s.Params.blockTime) //nolint:gosec if headersToRetain >= head.Height() { // means chain is very young so we can keep all headers starting from genesis @@ -195,6 +196,8 @@ func (s *Syncer[H]) findTailHeight(ctx context.Context, oldTail, head H) (uint64 heightDiff := uint64(timeDiff / s.Params.blockTime) //nolint:gosec newTailHeight := oldTail.Height() + heightDiff for { + // store keeps all the headers up to the current head + // to iterate over the headers and find the most accurate tail newTail, err := s.store.GetByHeight(ctx, newTailHeight) if err != nil { return 0, fmt.Errorf( From 29315539ecfc86035447bdf963223924bcafd258 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Tue, 20 May 2025 14:44:43 +0200 Subject: [PATCH 20/23] add test and clarify docs --- sync/options.go | 14 +++++------- sync/sync_tail_test.go | 51 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/sync/options.go b/sync/options.go index 0a36c055..51483329 100644 --- a/sync/options.go +++ b/sync/options.go @@ -22,24 +22,22 @@ type Parameters struct { // needed to report and punish misbehavior should be less than the unbonding // period. TrustingPeriod time.Duration + // PruningWindow defines the duration within which headers are retained before being pruned. + PruningWindow time.Duration // SyncFromHash is the hash of the header from which the syncer should start syncing. // - // By default, the syncer will start syncing from Tail, height of which is identified by the - // network head time minus TrustingPeriod. SyncFromHash overrides this default, allowing - // user to specify a custom starting point. + // By default, Syncer maintains PruningWindow number of headers. SyncFromHash overrides this default, + // allowing any user to specify a custom starting point. // // SyncFromHash has higher priority than SyncFromHeight. SyncFromHash header.Hash // SyncFromHeight is the height of the header from which the syncer should start syncing. // - // By default, the syncer will start syncing from Tail, height of which is identified by the - // network head time minus TrustingPeriod. SyncFromHeight overrides this default, allowing - // user to specify a custom starting point. + // By default, Syncer maintains PruningWindow number of headers. SyncFromHeight overrides this default, + // allowing any user to specify a custom starting point. // // SyncFromHeight has lower priority than SyncFromHash. SyncFromHeight uint64 - // PruningWindow defines the duration within which headers will be retained before being pruned. - PruningWindow time.Duration // blockTime provides a reference point for the Syncer to determine // whether its subjective head is outdated. // Keeping it private to disable serialization for it. diff --git a/sync/sync_tail_test.go b/sync/sync_tail_test.go index 73e31599..d50d3b26 100644 --- a/sync/sync_tail_test.go +++ b/sync/sync_tail_test.go @@ -14,6 +14,57 @@ import ( "github.com/celestiaorg/go-header/store" ) +func TestSyncer_TailHashOverHeight(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + localStore, err := store.NewStore[*headertest.DummyHeader]( + ds, + store.WithWriteBatchSize(1), + ) + require.NoError(t, err) + err = localStore.Start(ctx) + require.NoError(t, err) + + startFrom, err := remoteStore.GetByHeight(ctx, 50) + require.NoError(t, err) + + syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(headertest.HeaderTime), + WithSyncFromHash(startFrom.Hash()), + ) + require.NoError(t, err) + + err = syncer.Start(ctx) + require.NoError(t, err) + time.Sleep(time.Millisecond * 10) + err = syncer.SyncWait(ctx) + require.NoError(t, err) + + tail, err := localStore.Tail(ctx) + require.NoError(t, err) + assert.EqualValues(t, 50, tail.Height()) + + err = syncer.Stop(ctx) + require.NoError(t, err) + + syncer.Params.SyncFromHeight = 99 + + err = syncer.Start(ctx) + require.NoError(t, err) + time.Sleep(time.Millisecond * 10) + + tail, err = localStore.Tail(ctx) + require.NoError(t, err) + assert.EqualValues(t, 50, tail.Height()) +} + func TestSyncer_TailEstimation(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*50) t.Cleanup(cancel) From 0b91ae8c3b93e35789c61230c4e02df0d39803be Mon Sep 17 00:00:00 2001 From: Wondertan Date: Tue, 20 May 2025 14:47:31 +0200 Subject: [PATCH 21/23] fix tail hash logic --- sync/sync_tail.go | 66 +++++++++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 28 deletions(-) diff --git a/sync/sync_tail.go b/sync/sync_tail.go index 08251e9c..89c9f942 100644 --- a/sync/sync_tail.go +++ b/sync/sync_tail.go @@ -37,9 +37,32 @@ func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { // renewTail resolves the new actual tail header respecting Syncer parameters. func (s *Syncer[H]) renewTail(ctx context.Context, oldTail, head H) (newTail H, err error) { - // prioritizing hash over heights - switch tailHash := s.tailHash(oldTail); tailHash { - case nil: + useHash, tailHash := s.tailHash(oldTail) + switch { + case useHash: + if tailHash == nil { + // nothing to renew, stick to the existing old tail hash + return oldTail, nil + } + + newTail, err = s.store.Get(ctx, tailHash) + if err == nil { + return newTail, nil + } + if !errors.Is(err, header.ErrNotFound) { + return newTail, fmt.Errorf( + "loading SyncFromHash tail from store(%x): %w", + tailHash, + err, + ) + } + + log.Debugw("tail hash not available locally, fetching...", "hash", tailHash) + newTail, err = s.getter.Get(ctx, tailHash) + if err != nil { + return newTail, fmt.Errorf("fetching SyncFromHash tail(%x): %w", tailHash, err) + } + case !useHash: tailHeight, err := s.tailHeight(ctx, oldTail, head) if err != nil { return oldTail, err @@ -65,24 +88,6 @@ func (s *Syncer[H]) renewTail(ctx context.Context, oldTail, head H) (newTail H, if err != nil { return newTail, fmt.Errorf("fetching SyncFromHeight tail(%d): %w", tailHeight, err) } - default: - newTail, err = s.store.Get(ctx, tailHash) - if err == nil { - return newTail, nil - } - if !errors.Is(err, header.ErrNotFound) { - return newTail, fmt.Errorf( - "loading SyncFromHash tail from store(%x): %w", - tailHash, - err, - ) - } - - log.Debugw("tail hash not available locally, fetching...", "hash", tailHash) - newTail, err = s.getter.Get(ctx, tailHash) - if err != nil { - return newTail, fmt.Errorf("fetching SyncFromHash tail(%x): %w", tailHash, err) - } } if err := s.store.Append(ctx, newTail); err != nil { @@ -120,28 +125,33 @@ func (s *Syncer[H]) moveTail(ctx context.Context, from, to H) error { // To be reworked by bsync. err := s.doSync(ctx, to, from) if err != nil { - return fmt.Errorf("syncing the diff between from(%d) and to tail(%d): %w", from.Height(), to.Height(), err) + return fmt.Errorf( + "syncing the diff between from(%d) and to tail(%d): %w", + from.Height(), + to.Height(), + err, + ) } } return nil } -// tailHash returns the expected tail hash. -// Does not return if the hash hasn't changed from the current tail hash. -func (s *Syncer[H]) tailHash(oldTail H) header.Hash { +// tailHash reports whether tail hash should be used and returns it. +// Returns empty hash if it hasn't changed from the old tail hash. +func (s *Syncer[H]) tailHash(oldTail H) (bool, header.Hash) { hash := s.Params.SyncFromHash if hash == nil { - return nil + return false, nil } updated := oldTail.IsZero() || !bytes.Equal(hash, oldTail.Hash()) if !updated { - return nil + return true, nil } log.Debugw("tail hash updated", "hash", hash) - return hash + return true, hash } // tailHeight figures the actual tail height based on the Syncer parameters. From b413137d696d6cff512f2841b0d590cf7566f696 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Tue, 20 May 2025 19:20:35 +0200 Subject: [PATCH 22/23] better file name --- sync/{sync_tail.go => syncer_tail.go} | 0 sync/{sync_tail_test.go => syncer_tail_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename sync/{sync_tail.go => syncer_tail.go} (100%) rename sync/{sync_tail_test.go => syncer_tail_test.go} (100%) diff --git a/sync/sync_tail.go b/sync/syncer_tail.go similarity index 100% rename from sync/sync_tail.go rename to sync/syncer_tail.go diff --git a/sync/sync_tail_test.go b/sync/syncer_tail_test.go similarity index 100% rename from sync/sync_tail_test.go rename to sync/syncer_tail_test.go From 86cd2fbff533052841f565e3eacb9ca546ab1d87 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 21 May 2025 06:19:22 +0200 Subject: [PATCH 23/23] fix security concern by never calling subjective tail before verifying gossiped headers --- sync/sync.go | 12 ++++++++++- sync/sync_head.go | 51 ++++++++++++++++++++--------------------------- 2 files changed, 33 insertions(+), 30 deletions(-) diff --git a/sync/sync.go b/sync/sync.go index dd3bf451..6e6de8f8 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -94,7 +94,17 @@ func (s *Syncer[H]) Start(ctx context.Context) error { s.ctx, s.cancel = context.WithCancel(context.Background()) // register validator for header subscriptions // syncer does not subscribe itself and syncs headers together with validation - err := s.sub.SetVerifier(s.incomingNetworkHead) + err := s.sub.SetVerifier(func(ctx context.Context, h H) error { + if err := s.incomingNetworkHead(ctx, h); err != nil { + return err + } + // lazily trigger pruning by getting subjective tail + if _, err := s.subjectiveTail(ctx, h); err != nil { + log.Errorw("subjective tail", "head", h.Height(), "err", err) + } + + return nil + }) if err != nil { return err } diff --git a/sync/sync_head.go b/sync/sync_head.go index 7be1a525..594e9ee6 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -13,7 +13,8 @@ import ( // the exchange to request the head of the chain from the network. var headRequestTimeout = time.Second * 2 -// Head returns the Network Head. +// Head returns the Network Head or an error. It will try to get the most recent header until it fails entirely. +// It may return an error with a header which caused it. // // Known subjective head is considered network head if it is recent enough(now-timestamp<=blocktime) // Otherwise, we attempt to request recent network head from a trusted peer and @@ -25,19 +26,15 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err if err != nil { return sbjHead, err } - // if subjective header is recent enough (relative to the network's block time) - just use it - if isRecent(sbjHead, s.Params.blockTime, s.Params.recencyThreshold) { - // TODO(@Wondertan): This is temporary. Subjective tail is always triggered when Syncer is started - // unless we have a very recent head. Generally this is fine and the way it should work, however due to - // the way diff syncing works in moveTail method, we need to ensure that diff syncing happens on Start always - // which is triggered by this call. - // To be removed by bsync. + defer func() { + // always ensure tail is up to date _, err = s.subjectiveTail(ctx, sbjHead) if err != nil { - log.Errorw("getting subjective tail", "err", err) - return sbjHead, err + log.Errorw("subjective tail", "head_height", sbjHead.Height(), "err", err) } - + }() + // if subjective header is recent enough (relative to the network's block time) - just use it + if isRecent(sbjHead, s.Params.blockTime, s.Params.recencyThreshold) { return sbjHead, nil } @@ -70,7 +67,7 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err // subjectiveHead returns the latest known local header that is not expired(within trusting period). // If the header is expired, it is retrieved from a trusted peer without validation; // in other words, an automatic subjective initialization is performed. -func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { +func (s *Syncer[H]) subjectiveHead(ctx context.Context) (sbjHead H, err error) { // pending head is the latest known subjective head and sync target, so try to get it // NOTES: // * Empty when no sync is in progress @@ -103,10 +100,20 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { s.metrics.trustedPeersOutOufSync(s.ctx) return newHead, err case !isRecent(newHead, s.Params.blockTime, s.Params.recencyThreshold): + // it's not the most recent, buts its good enough - allow initialization log.Warnw("subjective initialization with not recent header", "height", newHead.Height()) s.metrics.trustedPeersOutOufSync(s.ctx) } + _, err = s.subjectiveTail(ctx, newHead) + if err != nil { + return newHead, fmt.Errorf( + "subjective tail during subjective initialization for head %d: %w", + newHead.Height(), + err, + ) + } + // and set the fetched head as the new subjective head validating it against the tail // or, in other words, do 'automatic subjective initialization' err = s.incomingNetworkHead(ctx, newHead) @@ -155,33 +162,19 @@ func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, head H) error { s.incomingMu.Lock() defer s.incomingMu.Unlock() - // TODO(@Wondertan): We need to ensure Tail is available before verification for subj init case and this is fine here - // however, check if that's ok to trigger Tail moves on network header that hasn't been verified yet - // To be reworked by bsync. - _, err := s.subjectiveTail(ctx, head) - if err != nil { - log.Errorw("subjective tail failed", - "new_head", head.Height(), - "hash", head.Hash().String(), - "err", err, - ) - return err - } - - err = s.verify(ctx, head) - if err != nil { + if err := s.verify(ctx, head); err != nil { return err } s.setSubjectiveHead(ctx, head) - return err + return nil } // verify verifies given network head candidate. func (s *Syncer[H]) verify(ctx context.Context, newHead H) error { sbjHead, err := s.subjectiveHead(ctx) if err != nil { - log.Errorw("getting subjective head during validation", "err", err) + log.Errorw("getting subjective head during new network head verification", "err", err) return err }