Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion headertest/dummy_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"
)

const HeaderTime = time.Nanosecond

// DummySuite provides everything you need to test chain of DummyHeaders.
// If not, please don't hesitate to extend it for your case.
type DummySuite struct {
Expand Down Expand Up @@ -42,7 +44,7 @@ func (s *DummySuite) NextHeader() *DummyHeader {
}

dh := RandDummyHeader(s.t)
dh.Timestamp = s.head.Time().Add(time.Nanosecond)
dh.Timestamp = s.head.Time().Add(HeaderTime)
dh.HeightI = s.head.Height() + 1
dh.PreviousHash = s.head.Hash()
dh.Chainid = s.head.ChainID()
Expand Down
3 changes: 2 additions & 1 deletion headertest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func NewDummyStore(t *testing.T) *Store[*DummyHeader] {
func NewStore[H header.Header[H]](_ *testing.T, gen Generator[H], numHeaders int) *Store[H] {
store := &Store[H]{
Headers: make(map[uint64]H),
HeadHeight: 0,
HeadHeight: 1,
TailHeight: 1,
}

for i := 0; i < numHeaders; i++ {
Expand Down
42 changes: 42 additions & 0 deletions sync/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sync
import (
"fmt"
"time"

"github.com/celestiaorg/go-header"
)

// Option is the functional option that is applied to the Syner instance
Expand All @@ -20,6 +22,22 @@ type Parameters struct {
// needed to report and punish misbehavior should be less than the unbonding
// period.
TrustingPeriod time.Duration
// PruningWindow defines the duration within which headers are retained before being pruned.
PruningWindow time.Duration
// SyncFromHash is the hash of the header from which the syncer should start syncing.
//
// By default, Syncer maintains PruningWindow number of headers. SyncFromHash overrides this default,
// allowing any user to specify a custom starting point.
//
// SyncFromHash has higher priority than SyncFromHeight.
SyncFromHash header.Hash
// SyncFromHeight is the height of the header from which the syncer should start syncing.
//
// By default, Syncer maintains PruningWindow number of headers. SyncFromHeight overrides this default,
// allowing any user to specify a custom starting point.
//
// SyncFromHeight has lower priority than SyncFromHash.
SyncFromHeight uint64
// blockTime provides a reference point for the Syncer to determine
// whether its subjective head is outdated.
// Keeping it private to disable serialization for it.
Expand All @@ -36,6 +54,7 @@ type Parameters struct {
func DefaultParameters() Parameters {
return Parameters{
TrustingPeriod: 336 * time.Hour, // tendermint's default trusting period
PruningWindow: 337 * time.Hour,
}
}

Expand Down Expand Up @@ -83,3 +102,26 @@ func WithParams(params Parameters) Option {
*old = params
}
}

// WithSyncFromHash sets given header hash a starting point for syncing.
// See [Parameters.SyncFromHash] for details.
func WithSyncFromHash(hash header.Hash) Option {
return func(p *Parameters) {
p.SyncFromHash = hash
}
}

// WithSyncFromHeight sets given height a starting point for syncing.
// See [Parameters.SyncFromHeight] for details.
func WithSyncFromHeight(height uint64) Option {
return func(p *Parameters) {
p.SyncFromHeight = height
}
}

// WithPruningWindow sets the duration within which headers will be retained before being pruned.
func WithPruningWindow(window time.Duration) Option {
return func(p *Parameters) {
p.PruningWindow = window
}
}
12 changes: 11 additions & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,17 @@ func (s *Syncer[H]) Start(ctx context.Context) error {
s.ctx, s.cancel = context.WithCancel(context.Background())
// register validator for header subscriptions
// syncer does not subscribe itself and syncs headers together with validation
err := s.sub.SetVerifier(s.incomingNetworkHead)
err := s.sub.SetVerifier(func(ctx context.Context, h H) error {
if err := s.incomingNetworkHead(ctx, h); err != nil {
return err
}
// lazily trigger pruning by getting subjective tail
if _, err := s.subjectiveTail(ctx, h); err != nil {
log.Errorw("subjective tail", "head", h.Height(), "err", err)
}

return nil
})
if err != nil {
return err
}
Expand Down
81 changes: 52 additions & 29 deletions sync/sync_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
// the exchange to request the head of the chain from the network.
var headRequestTimeout = time.Second * 2

// Head returns the Network Head.
// Head returns the Network Head or an error. It will try to get the most recent header until it fails entirely.
// It may return an error with a header which caused it.
//
// Known subjective head is considered network head if it is recent enough(now-timestamp<=blocktime)
// Otherwise, we attempt to request recent network head from a trusted peer and
Expand All @@ -25,6 +26,13 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err
if err != nil {
return sbjHead, err
}
defer func() {
// always ensure tail is up to date
_, err = s.subjectiveTail(ctx, sbjHead)
if err != nil {
log.Errorw("subjective tail", "head_height", sbjHead.Height(), "err", err)
}
}()
// if subjective header is recent enough (relative to the network's block time) - just use it
if isRecent(sbjHead, s.Params.blockTime, s.Params.recencyThreshold) {
return sbjHead, nil
Expand Down Expand Up @@ -59,7 +67,7 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err
// subjectiveHead returns the latest known local header that is not expired(within trusting period).
// If the header is expired, it is retrieved from a trusted peer without validation;
// in other words, an automatic subjective initialization is performed.
func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) {
func (s *Syncer[H]) subjectiveHead(ctx context.Context) (sbjHead H, err error) {
// pending head is the latest known subjective head and sync target, so try to get it
// NOTES:
// * Empty when no sync is in progress
Expand All @@ -70,37 +78,53 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) {
}
// if pending is empty - get the latest stored/synced head
storeHead, err := s.store.Head(ctx)
if err != nil {
switch {
case errors.Is(err, header.ErrEmptyStore):
log.Info("empty store, initializing...")
s.metrics.subjectiveInitialization(s.ctx)
Comment on lines +82 to +84
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay so this will make it so that we do not have to manually initialise the store via nodebuilder in celestia-node in order to kick off header syncing - syncer will do it for us here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's was the purpose behind doing #243

case !storeHead.IsZero() && isExpired(storeHead, s.Params.TrustingPeriod):
log.Infow("stored head header expired", "height", storeHead.Height())
default:
return storeHead, err
}
// check if the stored header is not expired and use it
if !isExpired(storeHead, s.Params.TrustingPeriod) {
return storeHead, nil
// fetch a new head from trusted peers if not available locally
newHead, err := s.head.Head(ctx)
if err != nil {
return newHead, err
}
switch {
case isExpired(newHead, s.Params.TrustingPeriod):
// forbid initializing off an expired header
err := fmt.Errorf("subjective initialization with an expired header(%d)", newHead.Height())
log.Error(err, "\n trusted peers are out of sync")
s.metrics.trustedPeersOutOufSync(s.ctx)
return newHead, err
case !isRecent(newHead, s.Params.blockTime, s.Params.recencyThreshold):
// it's not the most recent, buts its good enough - allow initialization
log.Warnw("subjective initialization with not recent header", "height", newHead.Height())
s.metrics.trustedPeersOutOufSync(s.ctx)
}
// otherwise, request head from a trusted peer
log.Infow("stored head header expired", "height", storeHead.Height())

trustHead, err := s.head.Head(ctx)
_, err = s.subjectiveTail(ctx, newHead)
if err != nil {
return trustHead, err
return newHead, fmt.Errorf(
"subjective tail during subjective initialization for head %d: %w",
newHead.Height(),
err,
)
}
s.metrics.subjectiveInitialization(s.ctx)
// and set it as the new subjective head without validation,

// and set the fetched head as the new subjective head validating it against the tail
// or, in other words, do 'automatic subjective initialization'
// NOTE: we avoid validation as the head expired to prevent possibility of the Long-Range Attack
s.setSubjectiveHead(ctx, trustHead)
switch {
default:
log.Infow("subjective initialization finished", "height", trustHead.Height())
return trustHead, nil
case isExpired(trustHead, s.Params.TrustingPeriod):
log.Warnw("subjective initialization with an expired header", "height", trustHead.Height())
case !isRecent(trustHead, s.Params.blockTime, s.Params.recencyThreshold):
log.Warnw("subjective initialization with an old header", "height", trustHead.Height())
err = s.incomingNetworkHead(ctx, newHead)
if err != nil {
err = fmt.Errorf("subjective initialization failed for head(%d): %w", newHead.Height(), err)
log.Error(err)
return newHead, err
}
log.Warn("trusted peer is out of sync")
s.metrics.trustedPeersOutOufSync(s.ctx)
return trustHead, nil

log.Infow("subjective initialization finished", "head", newHead.Height())
return newHead, nil
}

// setSubjectiveHead takes already validated head and sets it as the new sync target.
Expand Down Expand Up @@ -138,20 +162,19 @@ func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, head H) error {
s.incomingMu.Lock()
defer s.incomingMu.Unlock()

err := s.verify(ctx, head)
if err != nil {
if err := s.verify(ctx, head); err != nil {
return err
}

s.setSubjectiveHead(ctx, head)
return err
return nil
}

// verify verifies given network head candidate.
func (s *Syncer[H]) verify(ctx context.Context, newHead H) error {
sbjHead, err := s.subjectiveHead(ctx)
if err != nil {
log.Errorw("getting subjective head during validation", "err", err)
log.Errorw("getting subjective head during new network head verification", "err", err)
return err
}

Expand Down
35 changes: 26 additions & 9 deletions sync/sync_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,41 @@ func (s *syncStore[H]) Append(ctx context.Context, headers ...H) error {
}

head, err := s.Head(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
panic(err)
if errors.Is(err, header.ErrEmptyStore) {
// short-circuit for an initialization path
if err := s.Store.Append(ctx, headers...); err != nil {
return err
}

s.head.Store(&headers[len(headers)-1])
return nil
}
if err != nil {
return err
}

for _, h := range headers {
if h.Height() != head.Height()+1 {
return &errNonAdjacent{
Head: head.Height(),
Attempted: h.Height(),
// TODO(@Wondertan): As store evolved, certain invariants it had were removed.
// However, Syncer has yet to be refactored to not assume those invariants and until then
// this method is a shim that allows using store with old assumptions.
// To be reworked by bsync.
if headers[0].Height() >= head.Height() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we extract into separate func here ensureAppendAdjacency

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we could

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't see much value from doing this tbh, so ignoring

for _, h := range headers {
if h.Height() != head.Height()+1 {
return &errNonAdjacent{
Head: head.Height(),
Attempted: h.Height(),
}
}

head = h
}
head = h

s.head.Store(&head)
}

if err := s.Store.Append(ctx, headers...); err != nil {
return err
}

s.head.Store(&headers[len(headers)-1])
return nil
}
4 changes: 2 additions & 2 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func TestSyncSimpleRequestingHead(t *testing.T) {
localStore,
headertest.NewDummySubscriber(),
WithBlockTime(time.Second*30),
WithRecencyThreshold(time.Second*35), // add 5 second buffer
WithTrustingPeriod(time.Microsecond),
WithRecencyThreshold(time.Microsecond),
WithTrustingPeriod(time.Minute*1),
)
require.NoError(t, err)
err = syncer.Start(ctx)
Expand Down
Loading