Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion headertest/dummy_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"
)

const HeaderTime = time.Nanosecond

// DummySuite provides everything you need to test chain of DummyHeaders.
// If not, please don't hesitate to extend it for your case.
type DummySuite struct {
Expand Down Expand Up @@ -42,7 +44,7 @@ func (s *DummySuite) NextHeader() *DummyHeader {
}

dh := RandDummyHeader(s.t)
dh.Timestamp = s.head.Time().Add(time.Nanosecond)
dh.Timestamp = s.head.Time().Add(HeaderTime)
dh.HeightI = s.head.Height() + 1
dh.PreviousHash = s.head.Hash()
dh.Chainid = s.head.ChainID()
Expand Down
3 changes: 2 additions & 1 deletion headertest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func NewDummyStore(t *testing.T) *Store[*DummyHeader] {
func NewStore[H header.Header[H]](_ *testing.T, gen Generator[H], numHeaders int) *Store[H] {
store := &Store[H]{
Headers: make(map[uint64]H),
HeadHeight: 0,
HeadHeight: 1,
TailHeight: 1,
}

for i := 0; i < numHeaders; i++ {
Expand Down
44 changes: 44 additions & 0 deletions sync/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sync
import (
"fmt"
"time"

"github.com/celestiaorg/go-header"
)

// Option is the functional option that is applied to the Syner instance
Expand All @@ -20,6 +22,24 @@ type Parameters struct {
// needed to report and punish misbehavior should be less than the unbonding
// period.
TrustingPeriod time.Duration
// SyncFromHash is the hash of the header from which the syncer should start syncing.
//
// By default, the syncer will start syncing from Tail, height of which is identified by the
// network head time minus TrustingPeriod. SyncFromHash overrides this default, allowing
// user to specify a custom starting point.
//
// SyncFromHash has higher priority than SyncFromHeight.
SyncFromHash header.Hash
// SyncFromHeight is the height of the header from which the syncer should start syncing.
//
// By default, the syncer will start syncing from Tail, height of which is identified by the
// network head time minus TrustingPeriod. SyncFromHeight overrides this default, allowing
// user to specify a custom starting point.
//
// SyncFromHeight has lower priority than SyncFromHash.
SyncFromHeight uint64
// PruningWindow defines the duration within which headers will be retained before being pruned.
PruningWindow time.Duration
// blockTime provides a reference point for the Syncer to determine
// whether its subjective head is outdated.
// Keeping it private to disable serialization for it.
Expand All @@ -36,6 +56,7 @@ type Parameters struct {
func DefaultParameters() Parameters {
return Parameters{
TrustingPeriod: 336 * time.Hour, // tendermint's default trusting period
PruningWindow: 337 * time.Hour,
}
}

Expand Down Expand Up @@ -83,3 +104,26 @@ func WithParams(params Parameters) Option {
*old = params
}
}

// WithSyncFromHash sets given header hash a starting point for syncing.
// See [Parameters.SyncFromHash] for details.
func WithSyncFromHash(hash header.Hash) Option {
return func(p *Parameters) {
p.SyncFromHash = hash
}
}

// WithSyncFromHeight sets given height a starting point for syncing.
// See [Parameters.SyncFromHeight] for details.
func WithSyncFromHeight(height uint64) Option {
return func(p *Parameters) {
p.SyncFromHeight = height
}
}

// WithPruningWindow sets the duration within which headers will be retained before being pruned.
func WithPruningWindow(window time.Duration) Option {
return func(p *Parameters) {
p.PruningWindow = window
}
}
1 change: 0 additions & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func (s *Syncer[H]) Start(ctx context.Context) error {
if err != nil {
return err
}
// gets the latest head and kicks off syncing if necessary
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relic, will get it back

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returned

_, err = s.Head(ctx)
if err != nil {
return fmt.Errorf("error getting latest head during Start: %w", err)
Expand Down
82 changes: 56 additions & 26 deletions sync/sync_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err
}
// if subjective header is recent enough (relative to the network's block time) - just use it
if isRecent(sbjHead, s.Params.blockTime, s.Params.recencyThreshold) {
// TODO(@Wondertan): This is temporary. Subjective tail is always triggered when Syncer is started
// unless we have a very recent head. Generally this is fine and the way it should work, however due to
// the way diff syncing works in moveTail method, we need to ensure that diff syncing happens on Start always
// which is triggered by this call.
// To be removed by bsync.
_, err = s.subjectiveTail(ctx, sbjHead)
if err != nil {
log.Errorw("getting subjective tail", "err", err)
return sbjHead, err
}

return sbjHead, nil
}

Expand Down Expand Up @@ -70,37 +81,43 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) {
}
// if pending is empty - get the latest stored/synced head
storeHead, err := s.store.Head(ctx)
if err != nil {
switch {
case errors.Is(err, header.ErrEmptyStore):
log.Info("empty store, initializing...")
s.metrics.subjectiveInitialization(s.ctx)
Comment on lines +82 to +84
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay so this will make it so that we do not have to manually initialise the store via nodebuilder in celestia-node in order to kick off header syncing - syncer will do it for us here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's was the purpose behind doing #243

case !storeHead.IsZero() && isExpired(storeHead, s.Params.TrustingPeriod):
log.Infow("stored head header expired", "height", storeHead.Height())
default:
return storeHead, err
}
// check if the stored header is not expired and use it
if !isExpired(storeHead, s.Params.TrustingPeriod) {
return storeHead, nil
}
// otherwise, request head from a trusted peer
log.Infow("stored head header expired", "height", storeHead.Height())

trustHead, err := s.head.Head(ctx)
// fetch a new head from trusted peers if not available locally
newHead, err := s.head.Head(ctx)
if err != nil {
return trustHead, err
return newHead, err
}
s.metrics.subjectiveInitialization(s.ctx)
// and set it as the new subjective head without validation,
// or, in other words, do 'automatic subjective initialization'
// NOTE: we avoid validation as the head expired to prevent possibility of the Long-Range Attack
s.setSubjectiveHead(ctx, trustHead)
switch {
default:
log.Infow("subjective initialization finished", "height", trustHead.Height())
return trustHead, nil
case isExpired(trustHead, s.Params.TrustingPeriod):
log.Warnw("subjective initialization with an expired header", "height", trustHead.Height())
case !isRecent(trustHead, s.Params.blockTime, s.Params.recencyThreshold):
log.Warnw("subjective initialization with an old header", "height", trustHead.Height())
case isExpired(newHead, s.Params.TrustingPeriod):
// forbid initializing off an expired header
err := fmt.Errorf("subjective initialization with an expired header(%d)", newHead.Height())
log.Error(err, "\n trusted peers are out of sync")
s.metrics.trustedPeersOutOufSync(s.ctx)
return newHead, err
case !isRecent(newHead, s.Params.blockTime, s.Params.recencyThreshold):
log.Warnw("subjective initialization with not recent header", "height", newHead.Height())
s.metrics.trustedPeersOutOufSync(s.ctx)
}
log.Warn("trusted peer is out of sync")
s.metrics.trustedPeersOutOufSync(s.ctx)
return trustHead, nil

// and set the fetched head as the new subjective head validating it against the tail
// or, in other words, do 'automatic subjective initialization'
err = s.incomingNetworkHead(ctx, newHead)
if err != nil {
err = fmt.Errorf("subjective initialization failed for head(%d): %w", newHead.Height(), err)
log.Error(err)
return newHead, err
}

log.Infow("subjective initialization finished", "head", newHead.Height())
return newHead, nil
}

// setSubjectiveHead takes already validated head and sets it as the new sync target.
Expand Down Expand Up @@ -138,7 +155,20 @@ func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, head H) error {
s.incomingMu.Lock()
defer s.incomingMu.Unlock()

err := s.verify(ctx, head)
// TODO(@Wondertan): We need to ensure Tail is available before verification for subj init case and this is fine here
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we perform a sanity check here on the incoming head somehow to ensure we do not get exorbitant height that could trigger a cut of the tail too significantly? (meaning one that is shorter than the pruning window)

// however, check if that's ok to trigger Tail moves on network header that hasn't been verified yet
// To be reworked by bsync.
_, err := s.subjectiveTail(ctx, head)
if err != nil {
log.Errorw("subjective tail failed",
"new_head", head.Height(),
"hash", head.Hash().String(),
"err", err,
)
return err
}

err = s.verify(ctx, head)
if err != nil {
return err
}
Expand Down
35 changes: 26 additions & 9 deletions sync/sync_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,41 @@ func (s *syncStore[H]) Append(ctx context.Context, headers ...H) error {
}

head, err := s.Head(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
panic(err)
if errors.Is(err, header.ErrEmptyStore) {
// short-circuit for an initialization path
if err := s.Store.Append(ctx, headers...); err != nil {
return err
}

s.head.Store(&headers[len(headers)-1])
return nil
}
if err != nil {
return err
}

for _, h := range headers {
if h.Height() != head.Height()+1 {
return &errNonAdjacent{
Head: head.Height(),
Attempted: h.Height(),
// TODO(@Wondertan): As store evolved, certain invariants it had were removed.
// However, Syncer has yet to be refactored to not assume those invariants and until then
// this method is a shim that allows using store with old assumptions.
// To be reworked by bsync.
if headers[0].Height() >= head.Height() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we extract into separate func here ensureAppendAdjacency

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we could

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't see much value from doing this tbh, so ignoring

for _, h := range headers {
if h.Height() != head.Height()+1 {
return &errNonAdjacent{
Head: head.Height(),
Attempted: h.Height(),
}
}

head = h
}
head = h

s.head.Store(&head)
}

if err := s.Store.Append(ctx, headers...); err != nil {
return err
}

s.head.Store(&headers[len(headers)-1])
return nil
}
Loading