Skip to content

Commit 8f84b25

Browse files
committed
feat(sync): maintain tail (#265)
Adds `Tail` method to `Syncer` that can initialize a node with a height estimated header or using the configured height(or hash) as the new `Tail`. Besides, it can gracefully handle reconfiguration of height/hash upon restarts. ~This change deliberately leaves some tech debt behind. The new Tail method has quite a lot of branchiness and could be teared apart/rewritten reducing nesting and improving readability. Some more specific debts are marked with a TODO. All of this is not "clean" and simply achieves the goal. The future backward sync will anyway require us to refactor this logic, so there is minimum value in cleaning things now.~ Nvm, I end up refactoring it for the sake of reviewers sanity and code maintainability. There are still TODOs left for things that are literally blocked on bsync or else, but everything else was uniformly integrated into the existing code. Depends on #283 and #285 Closes #258 and #259
1 parent 9a62cf4 commit 8f84b25

File tree

9 files changed

+629
-43
lines changed

9 files changed

+629
-43
lines changed

headertest/dummy_suite.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"time"
66
)
77

8+
const HeaderTime = time.Nanosecond
9+
810
// DummySuite provides everything you need to test chain of DummyHeaders.
911
// If not, please don't hesitate to extend it for your case.
1012
type DummySuite struct {
@@ -42,7 +44,7 @@ func (s *DummySuite) NextHeader() *DummyHeader {
4244
}
4345

4446
dh := RandDummyHeader(s.t)
45-
dh.Timestamp = s.head.Time().Add(time.Nanosecond)
47+
dh.Timestamp = s.head.Time().Add(HeaderTime)
4648
dh.HeightI = s.head.Height() + 1
4749
dh.PreviousHash = s.head.Hash()
4850
dh.Chainid = s.head.ChainID()

headertest/store.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ func NewDummyStore(t *testing.T) *Store[*DummyHeader] {
2828
func NewStore[H header.Header[H]](_ *testing.T, gen Generator[H], numHeaders int) *Store[H] {
2929
store := &Store[H]{
3030
Headers: make(map[uint64]H),
31-
HeadHeight: 0,
31+
HeadHeight: 1,
32+
TailHeight: 1,
3233
}
3334

3435
for i := 0; i < numHeaders; i++ {

sync/options.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package sync
33
import (
44
"fmt"
55
"time"
6+
7+
"github.com/celestiaorg/go-header"
68
)
79

810
// Option is the functional option that is applied to the Syner instance
@@ -20,6 +22,22 @@ type Parameters struct {
2022
// needed to report and punish misbehavior should be less than the unbonding
2123
// period.
2224
TrustingPeriod time.Duration
25+
// PruningWindow defines the duration within which headers are retained before being pruned.
26+
PruningWindow time.Duration
27+
// SyncFromHash is the hash of the header from which the syncer should start syncing.
28+
//
29+
// By default, Syncer maintains PruningWindow number of headers. SyncFromHash overrides this default,
30+
// allowing any user to specify a custom starting point.
31+
//
32+
// SyncFromHash has higher priority than SyncFromHeight.
33+
SyncFromHash header.Hash
34+
// SyncFromHeight is the height of the header from which the syncer should start syncing.
35+
//
36+
// By default, Syncer maintains PruningWindow number of headers. SyncFromHeight overrides this default,
37+
// allowing any user to specify a custom starting point.
38+
//
39+
// SyncFromHeight has lower priority than SyncFromHash.
40+
SyncFromHeight uint64
2341
// blockTime provides a reference point for the Syncer to determine
2442
// whether its subjective head is outdated.
2543
// Keeping it private to disable serialization for it.
@@ -36,6 +54,7 @@ type Parameters struct {
3654
func DefaultParameters() Parameters {
3755
return Parameters{
3856
TrustingPeriod: 336 * time.Hour, // tendermint's default trusting period
57+
PruningWindow: 337 * time.Hour,
3958
}
4059
}
4160

@@ -83,3 +102,26 @@ func WithParams(params Parameters) Option {
83102
*old = params
84103
}
85104
}
105+
106+
// WithSyncFromHash sets given header hash a starting point for syncing.
107+
// See [Parameters.SyncFromHash] for details.
108+
func WithSyncFromHash(hash header.Hash) Option {
109+
return func(p *Parameters) {
110+
p.SyncFromHash = hash
111+
}
112+
}
113+
114+
// WithSyncFromHeight sets given height a starting point for syncing.
115+
// See [Parameters.SyncFromHeight] for details.
116+
func WithSyncFromHeight(height uint64) Option {
117+
return func(p *Parameters) {
118+
p.SyncFromHeight = height
119+
}
120+
}
121+
122+
// WithPruningWindow sets the duration within which headers will be retained before being pruned.
123+
func WithPruningWindow(window time.Duration) Option {
124+
return func(p *Parameters) {
125+
p.PruningWindow = window
126+
}
127+
}

sync/sync.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,17 @@ func (s *Syncer[H]) Start(ctx context.Context) error {
9494
s.ctx, s.cancel = context.WithCancel(context.Background())
9595
// register validator for header subscriptions
9696
// syncer does not subscribe itself and syncs headers together with validation
97-
err := s.sub.SetVerifier(s.incomingNetworkHead)
97+
err := s.sub.SetVerifier(func(ctx context.Context, h H) error {
98+
if err := s.incomingNetworkHead(ctx, h); err != nil {
99+
return err
100+
}
101+
// lazily trigger pruning by getting subjective tail
102+
if _, err := s.subjectiveTail(ctx, h); err != nil {
103+
log.Errorw("subjective tail", "head", h.Height(), "err", err)
104+
}
105+
106+
return nil
107+
})
98108
if err != nil {
99109
return err
100110
}

sync/sync_head.go

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import (
1313
// the exchange to request the head of the chain from the network.
1414
var headRequestTimeout = time.Second * 2
1515

16-
// Head returns the Network Head.
16+
// Head returns the Network Head or an error. It will try to get the most recent header until it fails entirely.
17+
// It may return an error with a header which caused it.
1718
//
1819
// Known subjective head is considered network head if it is recent enough(now-timestamp<=blocktime)
1920
// Otherwise, we attempt to request recent network head from a trusted peer and
@@ -25,6 +26,13 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err
2526
if err != nil {
2627
return sbjHead, err
2728
}
29+
defer func() {
30+
// always ensure tail is up to date
31+
_, err = s.subjectiveTail(ctx, sbjHead)
32+
if err != nil {
33+
log.Errorw("subjective tail", "head_height", sbjHead.Height(), "err", err)
34+
}
35+
}()
2836
// if subjective header is recent enough (relative to the network's block time) - just use it
2937
if isRecent(sbjHead, s.Params.blockTime, s.Params.recencyThreshold) {
3038
return sbjHead, nil
@@ -59,7 +67,7 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err
5967
// subjectiveHead returns the latest known local header that is not expired(within trusting period).
6068
// If the header is expired, it is retrieved from a trusted peer without validation;
6169
// in other words, an automatic subjective initialization is performed.
62-
func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) {
70+
func (s *Syncer[H]) subjectiveHead(ctx context.Context) (sbjHead H, err error) {
6371
// pending head is the latest known subjective head and sync target, so try to get it
6472
// NOTES:
6573
// * Empty when no sync is in progress
@@ -70,37 +78,53 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) {
7078
}
7179
// if pending is empty - get the latest stored/synced head
7280
storeHead, err := s.store.Head(ctx)
73-
if err != nil {
81+
switch {
82+
case errors.Is(err, header.ErrEmptyStore):
83+
log.Info("empty store, initializing...")
84+
s.metrics.subjectiveInitialization(s.ctx)
85+
case !storeHead.IsZero() && isExpired(storeHead, s.Params.TrustingPeriod):
86+
log.Infow("stored head header expired", "height", storeHead.Height())
87+
default:
7488
return storeHead, err
7589
}
76-
// check if the stored header is not expired and use it
77-
if !isExpired(storeHead, s.Params.TrustingPeriod) {
78-
return storeHead, nil
90+
// fetch a new head from trusted peers if not available locally
91+
newHead, err := s.head.Head(ctx)
92+
if err != nil {
93+
return newHead, err
94+
}
95+
switch {
96+
case isExpired(newHead, s.Params.TrustingPeriod):
97+
// forbid initializing off an expired header
98+
err := fmt.Errorf("subjective initialization with an expired header(%d)", newHead.Height())
99+
log.Error(err, "\n trusted peers are out of sync")
100+
s.metrics.trustedPeersOutOufSync(s.ctx)
101+
return newHead, err
102+
case !isRecent(newHead, s.Params.blockTime, s.Params.recencyThreshold):
103+
// it's not the most recent, buts its good enough - allow initialization
104+
log.Warnw("subjective initialization with not recent header", "height", newHead.Height())
105+
s.metrics.trustedPeersOutOufSync(s.ctx)
79106
}
80-
// otherwise, request head from a trusted peer
81-
log.Infow("stored head header expired", "height", storeHead.Height())
82107

83-
trustHead, err := s.head.Head(ctx)
108+
_, err = s.subjectiveTail(ctx, newHead)
84109
if err != nil {
85-
return trustHead, err
110+
return newHead, fmt.Errorf(
111+
"subjective tail during subjective initialization for head %d: %w",
112+
newHead.Height(),
113+
err,
114+
)
86115
}
87-
s.metrics.subjectiveInitialization(s.ctx)
88-
// and set it as the new subjective head without validation,
116+
117+
// and set the fetched head as the new subjective head validating it against the tail
89118
// or, in other words, do 'automatic subjective initialization'
90-
// NOTE: we avoid validation as the head expired to prevent possibility of the Long-Range Attack
91-
s.setSubjectiveHead(ctx, trustHead)
92-
switch {
93-
default:
94-
log.Infow("subjective initialization finished", "height", trustHead.Height())
95-
return trustHead, nil
96-
case isExpired(trustHead, s.Params.TrustingPeriod):
97-
log.Warnw("subjective initialization with an expired header", "height", trustHead.Height())
98-
case !isRecent(trustHead, s.Params.blockTime, s.Params.recencyThreshold):
99-
log.Warnw("subjective initialization with an old header", "height", trustHead.Height())
119+
err = s.incomingNetworkHead(ctx, newHead)
120+
if err != nil {
121+
err = fmt.Errorf("subjective initialization failed for head(%d): %w", newHead.Height(), err)
122+
log.Error(err)
123+
return newHead, err
100124
}
101-
log.Warn("trusted peer is out of sync")
102-
s.metrics.trustedPeersOutOufSync(s.ctx)
103-
return trustHead, nil
125+
126+
log.Infow("subjective initialization finished", "head", newHead.Height())
127+
return newHead, nil
104128
}
105129

106130
// setSubjectiveHead takes already validated head and sets it as the new sync target.
@@ -138,20 +162,19 @@ func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, head H) error {
138162
s.incomingMu.Lock()
139163
defer s.incomingMu.Unlock()
140164

141-
err := s.verify(ctx, head)
142-
if err != nil {
165+
if err := s.verify(ctx, head); err != nil {
143166
return err
144167
}
145168

146169
s.setSubjectiveHead(ctx, head)
147-
return err
170+
return nil
148171
}
149172

150173
// verify verifies given network head candidate.
151174
func (s *Syncer[H]) verify(ctx context.Context, newHead H) error {
152175
sbjHead, err := s.subjectiveHead(ctx)
153176
if err != nil {
154-
log.Errorw("getting subjective head during validation", "err", err)
177+
log.Errorw("getting subjective head during new network head verification", "err", err)
155178
return err
156179
}
157180

sync/sync_store.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,24 +48,41 @@ func (s *syncStore[H]) Append(ctx context.Context, headers ...H) error {
4848
}
4949

5050
head, err := s.Head(ctx)
51-
if err != nil && !errors.Is(err, context.Canceled) {
52-
panic(err)
51+
if errors.Is(err, header.ErrEmptyStore) {
52+
// short-circuit for an initialization path
53+
if err := s.Store.Append(ctx, headers...); err != nil {
54+
return err
55+
}
56+
57+
s.head.Store(&headers[len(headers)-1])
58+
return nil
59+
}
60+
if err != nil {
61+
return err
5362
}
5463

55-
for _, h := range headers {
56-
if h.Height() != head.Height()+1 {
57-
return &errNonAdjacent{
58-
Head: head.Height(),
59-
Attempted: h.Height(),
64+
// TODO(@Wondertan): As store evolved, certain invariants it had were removed.
65+
// However, Syncer has yet to be refactored to not assume those invariants and until then
66+
// this method is a shim that allows using store with old assumptions.
67+
// To be reworked by bsync.
68+
if headers[0].Height() >= head.Height() {
69+
for _, h := range headers {
70+
if h.Height() != head.Height()+1 {
71+
return &errNonAdjacent{
72+
Head: head.Height(),
73+
Attempted: h.Height(),
74+
}
6075
}
76+
77+
head = h
6178
}
62-
head = h
79+
80+
s.head.Store(&head)
6381
}
6482

6583
if err := s.Store.Append(ctx, headers...); err != nil {
6684
return err
6785
}
6886

69-
s.head.Store(&headers[len(headers)-1])
7087
return nil
7188
}

sync/sync_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ func TestSyncSimpleRequestingHead(t *testing.T) {
3636
localStore,
3737
headertest.NewDummySubscriber(),
3838
WithBlockTime(time.Second*30),
39-
WithRecencyThreshold(time.Second*35), // add 5 second buffer
40-
WithTrustingPeriod(time.Microsecond),
39+
WithRecencyThreshold(time.Microsecond),
40+
WithTrustingPeriod(time.Minute*1),
4141
)
4242
require.NoError(t, err)
4343
err = syncer.Start(ctx)

0 commit comments

Comments
 (0)