diff --git a/headertest/dummy_suite.go b/headertest/dummy_suite.go index 27a78c5e..1f63ce17 100644 --- a/headertest/dummy_suite.go +++ b/headertest/dummy_suite.go @@ -5,6 +5,8 @@ import ( "time" ) +const HeaderTime = time.Nanosecond + // DummySuite provides everything you need to test chain of DummyHeaders. // If not, please don't hesitate to extend it for your case. type DummySuite struct { @@ -42,7 +44,7 @@ func (s *DummySuite) NextHeader() *DummyHeader { } dh := RandDummyHeader(s.t) - dh.Timestamp = s.head.Time().Add(time.Nanosecond) + dh.Timestamp = s.head.Time().Add(HeaderTime) dh.HeightI = s.head.Height() + 1 dh.PreviousHash = s.head.Hash() dh.Chainid = s.head.ChainID() diff --git a/headertest/store.go b/headertest/store.go index 10444278..dbadba60 100644 --- a/headertest/store.go +++ b/headertest/store.go @@ -28,7 +28,8 @@ func NewDummyStore(t *testing.T) *Store[*DummyHeader] { func NewStore[H header.Header[H]](_ *testing.T, gen Generator[H], numHeaders int) *Store[H] { store := &Store[H]{ Headers: make(map[uint64]H), - HeadHeight: 0, + HeadHeight: 1, + TailHeight: 1, } for i := 0; i < numHeaders; i++ { diff --git a/sync/options.go b/sync/options.go index 62d06250..51483329 100644 --- a/sync/options.go +++ b/sync/options.go @@ -3,6 +3,8 @@ package sync import ( "fmt" "time" + + "github.com/celestiaorg/go-header" ) // Option is the functional option that is applied to the Syner instance @@ -20,6 +22,22 @@ type Parameters struct { // needed to report and punish misbehavior should be less than the unbonding // period. TrustingPeriod time.Duration + // PruningWindow defines the duration within which headers are retained before being pruned. + PruningWindow time.Duration + // SyncFromHash is the hash of the header from which the syncer should start syncing. + // + // By default, Syncer maintains PruningWindow number of headers. SyncFromHash overrides this default, + // allowing any user to specify a custom starting point. + // + // SyncFromHash has higher priority than SyncFromHeight. + SyncFromHash header.Hash + // SyncFromHeight is the height of the header from which the syncer should start syncing. + // + // By default, Syncer maintains PruningWindow number of headers. SyncFromHeight overrides this default, + // allowing any user to specify a custom starting point. + // + // SyncFromHeight has lower priority than SyncFromHash. + SyncFromHeight uint64 // blockTime provides a reference point for the Syncer to determine // whether its subjective head is outdated. // Keeping it private to disable serialization for it. @@ -36,6 +54,7 @@ type Parameters struct { func DefaultParameters() Parameters { return Parameters{ TrustingPeriod: 336 * time.Hour, // tendermint's default trusting period + PruningWindow: 337 * time.Hour, } } @@ -83,3 +102,26 @@ func WithParams(params Parameters) Option { *old = params } } + +// WithSyncFromHash sets given header hash a starting point for syncing. +// See [Parameters.SyncFromHash] for details. +func WithSyncFromHash(hash header.Hash) Option { + return func(p *Parameters) { + p.SyncFromHash = hash + } +} + +// WithSyncFromHeight sets given height a starting point for syncing. +// See [Parameters.SyncFromHeight] for details. +func WithSyncFromHeight(height uint64) Option { + return func(p *Parameters) { + p.SyncFromHeight = height + } +} + +// WithPruningWindow sets the duration within which headers will be retained before being pruned. +func WithPruningWindow(window time.Duration) Option { + return func(p *Parameters) { + p.PruningWindow = window + } +} diff --git a/sync/sync.go b/sync/sync.go index dd3bf451..6e6de8f8 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -94,7 +94,17 @@ func (s *Syncer[H]) Start(ctx context.Context) error { s.ctx, s.cancel = context.WithCancel(context.Background()) // register validator for header subscriptions // syncer does not subscribe itself and syncs headers together with validation - err := s.sub.SetVerifier(s.incomingNetworkHead) + err := s.sub.SetVerifier(func(ctx context.Context, h H) error { + if err := s.incomingNetworkHead(ctx, h); err != nil { + return err + } + // lazily trigger pruning by getting subjective tail + if _, err := s.subjectiveTail(ctx, h); err != nil { + log.Errorw("subjective tail", "head", h.Height(), "err", err) + } + + return nil + }) if err != nil { return err } diff --git a/sync/sync_head.go b/sync/sync_head.go index 3126d0d0..594e9ee6 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -13,7 +13,8 @@ import ( // the exchange to request the head of the chain from the network. var headRequestTimeout = time.Second * 2 -// Head returns the Network Head. +// Head returns the Network Head or an error. It will try to get the most recent header until it fails entirely. +// It may return an error with a header which caused it. // // Known subjective head is considered network head if it is recent enough(now-timestamp<=blocktime) // Otherwise, we attempt to request recent network head from a trusted peer and @@ -25,6 +26,13 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err if err != nil { return sbjHead, err } + defer func() { + // always ensure tail is up to date + _, err = s.subjectiveTail(ctx, sbjHead) + if err != nil { + log.Errorw("subjective tail", "head_height", sbjHead.Height(), "err", err) + } + }() // if subjective header is recent enough (relative to the network's block time) - just use it if isRecent(sbjHead, s.Params.blockTime, s.Params.recencyThreshold) { return sbjHead, nil @@ -59,7 +67,7 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err // subjectiveHead returns the latest known local header that is not expired(within trusting period). // If the header is expired, it is retrieved from a trusted peer without validation; // in other words, an automatic subjective initialization is performed. -func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { +func (s *Syncer[H]) subjectiveHead(ctx context.Context) (sbjHead H, err error) { // pending head is the latest known subjective head and sync target, so try to get it // NOTES: // * Empty when no sync is in progress @@ -70,37 +78,53 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { } // if pending is empty - get the latest stored/synced head storeHead, err := s.store.Head(ctx) - if err != nil { + switch { + case errors.Is(err, header.ErrEmptyStore): + log.Info("empty store, initializing...") + s.metrics.subjectiveInitialization(s.ctx) + case !storeHead.IsZero() && isExpired(storeHead, s.Params.TrustingPeriod): + log.Infow("stored head header expired", "height", storeHead.Height()) + default: return storeHead, err } - // check if the stored header is not expired and use it - if !isExpired(storeHead, s.Params.TrustingPeriod) { - return storeHead, nil + // fetch a new head from trusted peers if not available locally + newHead, err := s.head.Head(ctx) + if err != nil { + return newHead, err + } + switch { + case isExpired(newHead, s.Params.TrustingPeriod): + // forbid initializing off an expired header + err := fmt.Errorf("subjective initialization with an expired header(%d)", newHead.Height()) + log.Error(err, "\n trusted peers are out of sync") + s.metrics.trustedPeersOutOufSync(s.ctx) + return newHead, err + case !isRecent(newHead, s.Params.blockTime, s.Params.recencyThreshold): + // it's not the most recent, buts its good enough - allow initialization + log.Warnw("subjective initialization with not recent header", "height", newHead.Height()) + s.metrics.trustedPeersOutOufSync(s.ctx) } - // otherwise, request head from a trusted peer - log.Infow("stored head header expired", "height", storeHead.Height()) - trustHead, err := s.head.Head(ctx) + _, err = s.subjectiveTail(ctx, newHead) if err != nil { - return trustHead, err + return newHead, fmt.Errorf( + "subjective tail during subjective initialization for head %d: %w", + newHead.Height(), + err, + ) } - s.metrics.subjectiveInitialization(s.ctx) - // and set it as the new subjective head without validation, + + // and set the fetched head as the new subjective head validating it against the tail // or, in other words, do 'automatic subjective initialization' - // NOTE: we avoid validation as the head expired to prevent possibility of the Long-Range Attack - s.setSubjectiveHead(ctx, trustHead) - switch { - default: - log.Infow("subjective initialization finished", "height", trustHead.Height()) - return trustHead, nil - case isExpired(trustHead, s.Params.TrustingPeriod): - log.Warnw("subjective initialization with an expired header", "height", trustHead.Height()) - case !isRecent(trustHead, s.Params.blockTime, s.Params.recencyThreshold): - log.Warnw("subjective initialization with an old header", "height", trustHead.Height()) + err = s.incomingNetworkHead(ctx, newHead) + if err != nil { + err = fmt.Errorf("subjective initialization failed for head(%d): %w", newHead.Height(), err) + log.Error(err) + return newHead, err } - log.Warn("trusted peer is out of sync") - s.metrics.trustedPeersOutOufSync(s.ctx) - return trustHead, nil + + log.Infow("subjective initialization finished", "head", newHead.Height()) + return newHead, nil } // setSubjectiveHead takes already validated head and sets it as the new sync target. @@ -138,20 +162,19 @@ func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, head H) error { s.incomingMu.Lock() defer s.incomingMu.Unlock() - err := s.verify(ctx, head) - if err != nil { + if err := s.verify(ctx, head); err != nil { return err } s.setSubjectiveHead(ctx, head) - return err + return nil } // verify verifies given network head candidate. func (s *Syncer[H]) verify(ctx context.Context, newHead H) error { sbjHead, err := s.subjectiveHead(ctx) if err != nil { - log.Errorw("getting subjective head during validation", "err", err) + log.Errorw("getting subjective head during new network head verification", "err", err) return err } diff --git a/sync/sync_store.go b/sync/sync_store.go index 8254a6cb..62838dfc 100644 --- a/sync/sync_store.go +++ b/sync/sync_store.go @@ -48,24 +48,41 @@ func (s *syncStore[H]) Append(ctx context.Context, headers ...H) error { } head, err := s.Head(ctx) - if err != nil && !errors.Is(err, context.Canceled) { - panic(err) + if errors.Is(err, header.ErrEmptyStore) { + // short-circuit for an initialization path + if err := s.Store.Append(ctx, headers...); err != nil { + return err + } + + s.head.Store(&headers[len(headers)-1]) + return nil + } + if err != nil { + return err } - for _, h := range headers { - if h.Height() != head.Height()+1 { - return &errNonAdjacent{ - Head: head.Height(), - Attempted: h.Height(), + // TODO(@Wondertan): As store evolved, certain invariants it had were removed. + // However, Syncer has yet to be refactored to not assume those invariants and until then + // this method is a shim that allows using store with old assumptions. + // To be reworked by bsync. + if headers[0].Height() >= head.Height() { + for _, h := range headers { + if h.Height() != head.Height()+1 { + return &errNonAdjacent{ + Head: head.Height(), + Attempted: h.Height(), + } } + + head = h } - head = h + + s.head.Store(&head) } if err := s.Store.Append(ctx, headers...); err != nil { return err } - s.head.Store(&headers[len(headers)-1]) return nil } diff --git a/sync/sync_test.go b/sync/sync_test.go index 2945d978..06ac7eff 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -36,8 +36,8 @@ func TestSyncSimpleRequestingHead(t *testing.T) { localStore, headertest.NewDummySubscriber(), WithBlockTime(time.Second*30), - WithRecencyThreshold(time.Second*35), // add 5 second buffer - WithTrustingPeriod(time.Microsecond), + WithRecencyThreshold(time.Microsecond), + WithTrustingPeriod(time.Minute*1), ) require.NoError(t, err) err = syncer.Start(ctx) diff --git a/sync/syncer_tail.go b/sync/syncer_tail.go new file mode 100644 index 00000000..89c9f942 --- /dev/null +++ b/sync/syncer_tail.go @@ -0,0 +1,229 @@ +package sync + +import ( + "bytes" + "context" + "errors" + "fmt" + + "github.com/celestiaorg/go-header" +) + +// subjectiveTail returns the current actual Tail header. +// Lazily fetching it if it doesn't exist locally or moving it to a different height. +// Moving is done if either parameters are changed or tail moved outside a pruning window. +func (s *Syncer[H]) subjectiveTail(ctx context.Context, head H) (H, error) { + oldTail, err := s.store.Tail(ctx) + if err != nil && !errors.Is(err, header.ErrEmptyStore) { + return oldTail, err + } + + newTail, err := s.renewTail(ctx, oldTail, head) + if err != nil { + return oldTail, fmt.Errorf("updating tail: %w", err) + } + + if err := s.moveTail(ctx, oldTail, newTail); err != nil { + return oldTail, fmt.Errorf( + "moving tail from %d to %d: %w", + oldTail.Height(), + newTail.Height(), + err, + ) + } + + return newTail, nil +} + +// renewTail resolves the new actual tail header respecting Syncer parameters. +func (s *Syncer[H]) renewTail(ctx context.Context, oldTail, head H) (newTail H, err error) { + useHash, tailHash := s.tailHash(oldTail) + switch { + case useHash: + if tailHash == nil { + // nothing to renew, stick to the existing old tail hash + return oldTail, nil + } + + newTail, err = s.store.Get(ctx, tailHash) + if err == nil { + return newTail, nil + } + if !errors.Is(err, header.ErrNotFound) { + return newTail, fmt.Errorf( + "loading SyncFromHash tail from store(%x): %w", + tailHash, + err, + ) + } + + log.Debugw("tail hash not available locally, fetching...", "hash", tailHash) + newTail, err = s.getter.Get(ctx, tailHash) + if err != nil { + return newTail, fmt.Errorf("fetching SyncFromHash tail(%x): %w", tailHash, err) + } + case !useHash: + tailHeight, err := s.tailHeight(ctx, oldTail, head) + if err != nil { + return oldTail, err + } + + if tailHeight <= s.store.Height() { + // check if the new tail is below the current head to avoid heightSub blocking + newTail, err = s.store.GetByHeight(ctx, tailHeight) + if err == nil { + return newTail, nil + } + if !errors.Is(err, header.ErrNotFound) { + return newTail, fmt.Errorf( + "loading SyncFromHeight tail from store(%d): %w", + tailHeight, + err, + ) + } + } + + log.Debugw("tail height not available locally, fetching...", "height", tailHeight) + newTail, err = s.getter.GetByHeight(ctx, tailHeight) + if err != nil { + return newTail, fmt.Errorf("fetching SyncFromHeight tail(%d): %w", tailHeight, err) + } + } + + if err := s.store.Append(ctx, newTail); err != nil { + return newTail, fmt.Errorf("appending tail header: %w", err) + } + + return newTail, nil +} + +// moveTail moves the Tail to be the 'to' header. +// It will prune the store if the new Tail is higher than the old one or +// sync up the difference if the new Tail is lower than the old one. +func (s *Syncer[H]) moveTail(ctx context.Context, from, to H) error { + if from.IsZero() { + // no need to move the tail if it was not set previously + return nil + } + + switch { + case from.Height() < to.Height(): + log.Infof("move tail up from %d to %d, pruning the diff...", from.Height(), to.Height()) + err := s.store.DeleteTo(ctx, to.Height()) + if err != nil { + return fmt.Errorf( + "deleting headers up to newly configured tail(%d): %w", + to.Height(), + err, + ) + } + case from.Height() > to.Height(): + log.Infof("move tail down from %d to %d, syncing the diff...", from.Height(), to.Height()) + + // TODO(@Wondertan): This works but it assumes this code is only run before syncing routine starts. + // If run after, it may race with other in prog syncs. + // To be reworked by bsync. + err := s.doSync(ctx, to, from) + if err != nil { + return fmt.Errorf( + "syncing the diff between from(%d) and to tail(%d): %w", + from.Height(), + to.Height(), + err, + ) + } + } + + return nil +} + +// tailHash reports whether tail hash should be used and returns it. +// Returns empty hash if it hasn't changed from the old tail hash. +func (s *Syncer[H]) tailHash(oldTail H) (bool, header.Hash) { + hash := s.Params.SyncFromHash + if hash == nil { + return false, nil + } + + updated := oldTail.IsZero() || !bytes.Equal(hash, oldTail.Hash()) + if !updated { + return true, nil + } + + log.Debugw("tail hash updated", "hash", hash) + return true, hash +} + +// tailHeight figures the actual tail height based on the Syncer parameters. +func (s *Syncer[H]) tailHeight(ctx context.Context, oldTail, head H) (uint64, error) { + height := s.Params.SyncFromHeight + if height > 0 { + return height, nil + } + + if oldTail.IsZero() { + return s.estimateTailHeight(head), nil + } + + height, err := s.findTailHeight(ctx, oldTail, head) + if err != nil { + return 0, fmt.Errorf("finding tail height: %w", err) + } + + return height, nil +} + +// estimateTailHeight estimates the tail header based on the current head. +// It respects the trusting period, ensuring Syncer never initializes off an expired header. +func (s *Syncer[H]) estimateTailHeight(head H) uint64 { + headersToRetain := uint64(s.Params.TrustingPeriod / s.Params.blockTime) //nolint:gosec + if headersToRetain >= head.Height() { + // means chain is very young so we can keep all headers starting from genesis + return 1 + } + + return head.Height() - headersToRetain +} + +// findTailHeight find the tail height based on the current head and tail. +// It respects the pruning window, ensuring Syncer maintains the tail within the window. +func (s *Syncer[H]) findTailHeight(ctx context.Context, oldTail, head H) (uint64, error) { + expectedTailTime := head.Time().UTC().Add(-s.Params.PruningWindow) + currentTailTime := oldTail.Time().UTC() + + timeDiff := expectedTailTime.Sub(currentTailTime) + if timeDiff <= 0 { + // current tail is relevant as is + return oldTail.Height(), nil + } + log.Debugw( + "current tail is beyond pruning window", + "tail_height", oldTail.Height(), + "time_diff", timeDiff.String(), + "window", s.Params.PruningWindow.String(), + ) + + heightDiff := uint64(timeDiff / s.Params.blockTime) //nolint:gosec + newTailHeight := oldTail.Height() + heightDiff + for { + // store keeps all the headers up to the current head + // to iterate over the headers and find the most accurate tail + newTail, err := s.store.GetByHeight(ctx, newTailHeight) + if err != nil { + return 0, fmt.Errorf( + "getting estimated new tail(%d) from store: %w", + newTailHeight, + err, + ) + } + if newTail.Time().UTC().Compare(expectedTailTime) <= 0 { + // new tail time is before or equal to expectedTailTime + break + } + + newTailHeight++ + } + + log.Debugw("found new tail height", "height", newTailHeight) + return newTailHeight, nil +} diff --git a/sync/syncer_tail_test.go b/sync/syncer_tail_test.go new file mode 100644 index 00000000..d50d3b26 --- /dev/null +++ b/sync/syncer_tail_test.go @@ -0,0 +1,262 @@ +package sync + +import ( + "context" + "testing" + "time" + + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/celestiaorg/go-header/headertest" + "github.com/celestiaorg/go-header/store" +) + +func TestSyncer_TailHashOverHeight(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + localStore, err := store.NewStore[*headertest.DummyHeader]( + ds, + store.WithWriteBatchSize(1), + ) + require.NoError(t, err) + err = localStore.Start(ctx) + require.NoError(t, err) + + startFrom, err := remoteStore.GetByHeight(ctx, 50) + require.NoError(t, err) + + syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(headertest.HeaderTime), + WithSyncFromHash(startFrom.Hash()), + ) + require.NoError(t, err) + + err = syncer.Start(ctx) + require.NoError(t, err) + time.Sleep(time.Millisecond * 10) + err = syncer.SyncWait(ctx) + require.NoError(t, err) + + tail, err := localStore.Tail(ctx) + require.NoError(t, err) + assert.EqualValues(t, 50, tail.Height()) + + err = syncer.Stop(ctx) + require.NoError(t, err) + + syncer.Params.SyncFromHeight = 99 + + err = syncer.Start(ctx) + require.NoError(t, err) + time.Sleep(time.Millisecond * 10) + + tail, err = localStore.Tail(ctx) + require.NoError(t, err) + assert.EqualValues(t, 50, tail.Height()) +} + +func TestSyncer_TailEstimation(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*50) + t.Cleanup(cancel) + + remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + localStore, err := store.NewStore[*headertest.DummyHeader]( + ds, + store.WithWriteBatchSize(1), + ) + require.NoError(t, err) + err = localStore.Start(ctx) + require.NoError(t, err) + + syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(headertest.HeaderTime), + WithPruningWindow(time.Nanosecond*50), + ) + require.NoError(t, err) + + err = syncer.Start(ctx) + require.NoError(t, err) + time.Sleep(time.Millisecond * 10) + err = syncer.SyncWait(ctx) + require.NoError(t, err) + require.EqualValues(t, 100, syncer.State().Height) + + tail, err := localStore.Tail(ctx) + require.NoError(t, err) + require.EqualValues(t, tail.Height(), 1) + + // simulate new header arrival by triggering recency check + head, err := syncer.Head(ctx) + require.NoError(t, err) + require.Equal(t, head.Height(), remoteStore.Height()) + + tail, err = localStore.Tail(ctx) + require.NoError(t, err) + require.EqualValues(t, 50, tail.Height()) +} + +func TestSyncer_TailReconfiguration(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + localStore, err := store.NewStore[*headertest.DummyHeader]( + ds, + store.WithWriteBatchSize(1), + ) + require.NoError(t, err) + err = localStore.Start(ctx) + require.NoError(t, err) + + syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(time.Second*6), + WithRecencyThreshold(time.Nanosecond), + ) + require.NoError(t, err) + + err = syncer.Start(ctx) + require.NoError(t, err) + time.Sleep(time.Millisecond * 10) + err = syncer.SyncWait(ctx) + require.NoError(t, err) + err = syncer.Stop(ctx) + require.NoError(t, err) + time.Sleep(time.Millisecond * 10) + + syncer.Params.SyncFromHeight = 69 + + err = syncer.Start(ctx) + require.NoError(t, err) + + storeTail, err := localStore.Tail(ctx) + require.NoError(t, err) + assert.EqualValues(t, syncer.Params.SyncFromHeight, storeTail.Height()) +} + +func TestSyncer_TailInitialization(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100) + + expectedTail, err := remoteStore.GetByHeight(ctx, 69) + require.NoError(t, err) + + tests := []struct { + name string + option Option + expected func() *headertest.DummyHeader + expectedAfterRestart func() *headertest.DummyHeader + }{ + { + "Estimate", + func(p *Parameters) {}, // noop to trigger estimation, + func() *headertest.DummyHeader { + remoteTail, err := remoteStore.Tail(ctx) + require.NoError(t, err) + return remoteTail + }, + func() *headertest.DummyHeader { + remoteTail, err := remoteStore.Tail(ctx) + require.NoError(t, err) + return remoteTail + }, + }, + { + "SyncFromHash", + WithSyncFromHash(expectedTail.Hash()), + func() *headertest.DummyHeader { + return expectedTail + }, + func() *headertest.DummyHeader { + expectedTail, err := remoteStore.GetByHeight(ctx, expectedTail.Height()+10) + require.NoError(t, err) + return expectedTail + }, + }, + { + "SyncFromHeight", + WithSyncFromHeight(expectedTail.Height()), + func() *headertest.DummyHeader { + return expectedTail + }, + func() *headertest.DummyHeader { + expectedTail, err := remoteStore.GetByHeight(ctx, expectedTail.Height()-10) + require.NoError(t, err) + return expectedTail + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + localStore, err := store.NewStore[*headertest.DummyHeader]( + ds, + store.WithWriteBatchSize(1), + ) + require.NoError(t, err) + err = localStore.Start(ctx) + require.NoError(t, err) + + syncer, err := NewSyncer[*headertest.DummyHeader]( + remoteStore, + localStore, + headertest.NewDummySubscriber(), + WithBlockTime(time.Second*6), + WithRecencyThreshold(time.Nanosecond), + test.option, + ) + require.NoError(t, err) + + err = syncer.Start(ctx) + require.NoError(t, err) + time.Sleep(time.Millisecond * 100) + + // check that the syncer has the expected tail and head + expectedTail := test.expected() + storeTail, err := localStore.Tail(ctx) + require.NoError(t, err) + assert.EqualValues(t, expectedTail.Height(), storeTail.Height()) + storeHead, err := localStore.Head(ctx) + require.NoError(t, err) + assert.EqualValues(t, remoteStore.Height(), storeHead.Height()) + + // restart the Syncer and set a new tail + err = syncer.Stop(ctx) + require.NoError(t, err) + expectedTail = test.expectedAfterRestart() + syncer.Params.SyncFromHeight = expectedTail.Height() + syncer.Params.SyncFromHash = expectedTail.Hash() + err = syncer.Start(ctx) + require.NoError(t, err) + + time.Sleep(time.Millisecond * 10) + + // ensure that the Syncer moved to the new tail after restart + storeTail, err = localStore.Tail(ctx) + require.NoError(t, err) + assert.EqualValues(t, expectedTail.Height(), storeTail.Height()) + }) + } +}