-
Notifications
You must be signed in to change notification settings - Fork 26
feat(sync): maintain tail #265
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
115af45
c698d74
f87e8b4
f97fe87
9faaccd
96b450b
d37c42b
70f65de
df446ed
5d59427
d83e15d
cb71de0
eb96e98
88e6f69
a99d7ac
630ffa3
31d7e13
6613081
8dec451
2931553
0b91ae8
b413137
86cd2fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,17 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err | |
} | ||
// if subjective header is recent enough (relative to the network's block time) - just use it | ||
if isRecent(sbjHead, s.Params.blockTime, s.Params.recencyThreshold) { | ||
// TODO(@Wondertan): This is temporary. Subjective tail is always triggered when Syncer is started | ||
// unless we have a very recent head. Generally this is fine and the way it should work, however due to | ||
// the way diff syncing works in moveTail method, we need to ensure that diff syncing happens on Start always | ||
// which is triggered by this call. | ||
// To be removed by bsync. | ||
_, err = s.subjectiveTail(ctx, sbjHead) | ||
if err != nil { | ||
log.Errorw("getting subjective tail", "err", err) | ||
return sbjHead, err | ||
} | ||
|
||
return sbjHead, nil | ||
} | ||
|
||
|
@@ -70,37 +81,43 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { | |
} | ||
// if pending is empty - get the latest stored/synced head | ||
storeHead, err := s.store.Head(ctx) | ||
if err != nil { | ||
switch { | ||
case errors.Is(err, header.ErrEmptyStore): | ||
log.Info("empty store, initializing...") | ||
s.metrics.subjectiveInitialization(s.ctx) | ||
Comment on lines
+82
to
+84
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay so this will make it so that we do not have to manually initialise the store via nodebuilder in celestia-node in order to kick off header syncing - syncer will do it for us here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's was the purpose behind doing #243 |
||
case !storeHead.IsZero() && isExpired(storeHead, s.Params.TrustingPeriod): | ||
log.Infow("stored head header expired", "height", storeHead.Height()) | ||
default: | ||
return storeHead, err | ||
} | ||
// check if the stored header is not expired and use it | ||
if !isExpired(storeHead, s.Params.TrustingPeriod) { | ||
return storeHead, nil | ||
} | ||
// otherwise, request head from a trusted peer | ||
log.Infow("stored head header expired", "height", storeHead.Height()) | ||
|
||
trustHead, err := s.head.Head(ctx) | ||
// fetch a new head from trusted peers if not available locally | ||
newHead, err := s.head.Head(ctx) | ||
if err != nil { | ||
return trustHead, err | ||
return newHead, err | ||
} | ||
s.metrics.subjectiveInitialization(s.ctx) | ||
// and set it as the new subjective head without validation, | ||
// or, in other words, do 'automatic subjective initialization' | ||
// NOTE: we avoid validation as the head expired to prevent possibility of the Long-Range Attack | ||
s.setSubjectiveHead(ctx, trustHead) | ||
switch { | ||
default: | ||
log.Infow("subjective initialization finished", "height", trustHead.Height()) | ||
return trustHead, nil | ||
case isExpired(trustHead, s.Params.TrustingPeriod): | ||
log.Warnw("subjective initialization with an expired header", "height", trustHead.Height()) | ||
case !isRecent(trustHead, s.Params.blockTime, s.Params.recencyThreshold): | ||
log.Warnw("subjective initialization with an old header", "height", trustHead.Height()) | ||
case isExpired(newHead, s.Params.TrustingPeriod): | ||
// forbid initializing off an expired header | ||
err := fmt.Errorf("subjective initialization with an expired header(%d)", newHead.Height()) | ||
log.Error(err, "\n trusted peers are out of sync") | ||
s.metrics.trustedPeersOutOufSync(s.ctx) | ||
return newHead, err | ||
case !isRecent(newHead, s.Params.blockTime, s.Params.recencyThreshold): | ||
log.Warnw("subjective initialization with not recent header", "height", newHead.Height()) | ||
s.metrics.trustedPeersOutOufSync(s.ctx) | ||
} | ||
log.Warn("trusted peer is out of sync") | ||
s.metrics.trustedPeersOutOufSync(s.ctx) | ||
return trustHead, nil | ||
|
||
// and set the fetched head as the new subjective head validating it against the tail | ||
// or, in other words, do 'automatic subjective initialization' | ||
err = s.incomingNetworkHead(ctx, newHead) | ||
if err != nil { | ||
err = fmt.Errorf("subjective initialization failed for head(%d): %w", newHead.Height(), err) | ||
log.Error(err) | ||
return newHead, err | ||
} | ||
|
||
log.Infow("subjective initialization finished", "head", newHead.Height()) | ||
return newHead, nil | ||
} | ||
|
||
// setSubjectiveHead takes already validated head and sets it as the new sync target. | ||
|
@@ -138,7 +155,20 @@ func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, head H) error { | |
s.incomingMu.Lock() | ||
defer s.incomingMu.Unlock() | ||
|
||
err := s.verify(ctx, head) | ||
// TODO(@Wondertan): We need to ensure Tail is available before verification for subj init case and this is fine here | ||
|
||
// however, check if that's ok to trigger Tail moves on network header that hasn't been verified yet | ||
// To be reworked by bsync. | ||
_, err := s.subjectiveTail(ctx, head) | ||
if err != nil { | ||
log.Errorw("subjective tail failed", | ||
"new_head", head.Height(), | ||
"hash", head.Hash().String(), | ||
"err", err, | ||
) | ||
return err | ||
} | ||
|
||
err = s.verify(ctx, head) | ||
if err != nil { | ||
return err | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,24 +48,41 @@ func (s *syncStore[H]) Append(ctx context.Context, headers ...H) error { | |
} | ||
|
||
head, err := s.Head(ctx) | ||
if err != nil && !errors.Is(err, context.Canceled) { | ||
panic(err) | ||
if errors.Is(err, header.ErrEmptyStore) { | ||
// short-circuit for an initialization path | ||
if err := s.Store.Append(ctx, headers...); err != nil { | ||
return err | ||
} | ||
|
||
s.head.Store(&headers[len(headers)-1]) | ||
return nil | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, h := range headers { | ||
if h.Height() != head.Height()+1 { | ||
return &errNonAdjacent{ | ||
Head: head.Height(), | ||
Attempted: h.Height(), | ||
// TODO(@Wondertan): As store evolved, certain invariants it had were removed. | ||
// However, Syncer has yet to be refactored to not assume those invariants and until then | ||
// this method is a shim that allows using store with old assumptions. | ||
// To be reworked by bsync. | ||
if headers[0].Height() >= head.Height() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we extract into separate func here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, we could There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't see much value from doing this tbh, so ignoring |
||
for _, h := range headers { | ||
if h.Height() != head.Height()+1 { | ||
return &errNonAdjacent{ | ||
Head: head.Height(), | ||
Attempted: h.Height(), | ||
} | ||
} | ||
|
||
head = h | ||
} | ||
head = h | ||
|
||
s.head.Store(&head) | ||
} | ||
|
||
if err := s.Store.Append(ctx, headers...); err != nil { | ||
return err | ||
} | ||
|
||
s.head.Store(&headers[len(headers)-1]) | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
relic, will get it back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
returned