Skip to content

Commit 34072ea

Browse files
committed
fix(sync): avoid Tail double requesting (#290)
To avoid requesting tail twice, I had to extract new `networkHead` method out of `Head` method. This refactoring lead to a few adjacent refactors: * avoids double verification of a recent head request. Previously, we verified it in exchange and then in the syncer's code. This double verification is actually a bad one that verifies all the signatures and not just `NextValidatorHash` * prevents duplicate request right after subjective init for a more recent head, if the subjective initialization yielded non recent header. * extract `localHead` out of `subjectiveHead` * improves logging
1 parent 1cb09d9 commit 34072ea

File tree

5 files changed

+143
-100
lines changed

5 files changed

+143
-100
lines changed

local/exchange.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,24 @@ func (l *Exchange[H]) Stop(context.Context) error {
2626
return nil
2727
}
2828

29-
func (l *Exchange[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) {
30-
return l.store.Head(ctx)
29+
func (l *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (H, error) {
30+
params := &header.HeadParams[H]{}
31+
for _, opt := range opts {
32+
opt(params)
33+
}
34+
35+
head, err := l.store.Head(ctx)
36+
if err != nil {
37+
return head, err
38+
}
39+
40+
if !params.TrustedHead.IsZero() {
41+
if err := header.Verify(params.TrustedHead, head); err != nil {
42+
return head, err
43+
}
44+
}
45+
46+
return head, nil
3147
}
3248

3349
func (l *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {

sync/syncer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func (s *Syncer[H]) syncLoop() {
203203

204204
// sync ensures we are synced from the Store's head up to the new subjective head.
205205
func (s *Syncer[H]) sync(ctx context.Context) {
206-
subjHead, err := s.subjectiveHead(ctx)
206+
subjHead, err := s.localHead(ctx)
207207
if err != nil {
208208
log.Errorw("getting subjective head", "err", err)
209209
return

sync/syncer_head.go

Lines changed: 114 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -9,126 +9,150 @@ import (
99
"github.com/celestiaorg/go-header"
1010
)
1111

12-
// headRequestTimeout is the amount of time the syncer is willing to wait for
12+
// NetworkHeadRequestTimeout is the amount of time the syncer is willing to wait for
1313
// the exchange to request the head of the chain from the network.
14-
var headRequestTimeout = time.Second * 2
14+
var NetworkHeadRequestTimeout = time.Second * 2
1515

16-
// Head returns the Network Head or an error. It will try to get the most recent header until it fails entirely.
17-
// It may return an error with a header which caused it.
18-
//
19-
// Known subjective head is considered network head if it is recent enough(now-timestamp<=blocktime)
20-
// Otherwise, we attempt to request recent network head from a trusted peer and
21-
// set as the new subjective head, assuming that trusted peer is always fully synced.
22-
//
23-
// The request is limited with 2 seconds and otherwise potentially unrecent header is returned.
16+
// Head returns the network head or an error. It will try to get the most recent network head or return the current
17+
// non-expired subjective head as a fallback.
18+
// If the head has changed, it will update the tail with the new head.
2419
func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) {
25-
sbjHead, err := s.subjectiveHead(ctx)
20+
netHead, err := s.networkHead(ctx)
21+
if err != nil {
22+
return netHead, err
23+
}
24+
25+
if _, err = s.subjectiveTail(ctx, netHead); err != nil {
26+
return netHead, fmt.Errorf(
27+
"subjective tail for head %d: %w",
28+
netHead.Height(),
29+
err,
30+
)
31+
}
32+
33+
// attempt to set the (potentially) new network head
34+
// it doesn't matter for the caller setting succeeds or not
35+
_ = s.incomingNetworkHead(ctx, netHead)
36+
// so return whatever is the current highest head
37+
return s.localHead(ctx)
38+
}
39+
40+
// networkHead returns subjective head ensuring its recency.
41+
// If the subjective head is not recent, it attempts to request the most recent network head from trusted peers
42+
// assuming that trusted peers are always fully synced.
43+
// The request is limited with [NetworkHeadRequestTimeout], otherwise the unrecent subjective header is returned.
44+
func (s *Syncer[H]) networkHead(ctx context.Context) (H, error) {
45+
sbjHead, initialized, err := s.subjectiveHead(ctx)
2646
if err != nil {
2747
return sbjHead, err
2848
}
29-
defer func() {
30-
// always ensure tail is up to date
31-
_, err = s.subjectiveTail(ctx, sbjHead)
32-
if err != nil {
33-
log.Errorw("subjective tail", "head_height", sbjHead.Height(), "err", err)
34-
}
35-
}()
36-
// if subjective header is recent enough (relative to the network's block time) - just use it
37-
if isRecent(sbjHead, s.Params.blockTime, s.Params.recencyThreshold) {
49+
if isRecent(sbjHead, s.Params.blockTime, s.Params.recencyThreshold) || initialized {
3850
return sbjHead, nil
3951
}
4052

41-
s.metrics.outdatedHead(s.ctx)
53+
s.metrics.outdatedHead(ctx)
54+
log.Warnw("outdated subjective head", "outdated_height", sbjHead.Height())
55+
log.Warnw("attempting to request the most recent network head...")
4256

43-
reqCtx, cancel := context.WithTimeout(ctx, headRequestTimeout)
57+
// cap the max blocking time for the request call
58+
ctx, cancel := context.WithTimeout(ctx, NetworkHeadRequestTimeout)
4459
defer cancel()
45-
netHead, err := s.head.Head(reqCtx, header.WithTrustedHead[H](sbjHead))
60+
61+
newHead, err := s.head.Head(ctx, header.WithTrustedHead[H](sbjHead))
62+
var verErr *header.VerifyError
63+
if errors.As(err, &verErr) && verErr.SoftFailure {
64+
// if we have a soft failure, try to bifurcate
65+
err = s.incomingNetworkHead(ctx, newHead)
66+
}
4667
if err != nil {
68+
// if we have a non-expired subjective head, but failed to get a more recent network head
69+
// still return the current subjective head
4770
log.Warnw(
48-
"failed to get recent head, returning current subjective",
49-
"sbjHead",
50-
sbjHead.Height(),
71+
"error requesting the most recent network head, using the current subjective",
5172
"err",
5273
err,
74+
"subjective_height",
75+
sbjHead.Height(),
5376
)
54-
return s.subjectiveHead(ctx)
77+
78+
return sbjHead, nil
79+
}
80+
// still check if even the newly requested head is outdated
81+
if !isRecent(newHead, s.Params.blockTime, s.Params.recencyThreshold) {
82+
log.Warnw("non recent head from trusted peers", "height", newHead.Height())
83+
log.Error("trusted peers are out of sync")
84+
s.metrics.trustedPeersOutOufSync(ctx)
5585
}
5686

57-
// process and validate netHead fetched from trusted peers
58-
// NOTE: We could trust the netHead like we do during 'automatic subjective initialization'
59-
// but in this case our subjective head is not expired, so we should verify netHead
60-
// and only if it is valid, set it as new head
61-
_ = s.incomingNetworkHead(ctx, netHead)
62-
// netHead was either accepted or rejected as the new subjective
63-
// anyway return most current known subjective head
64-
return s.subjectiveHead(ctx)
87+
if newHead.Height() <= sbjHead.Height() {
88+
// nothing new, just return what we have already
89+
return sbjHead, nil
90+
}
91+
// set the new head as subjective, skipping expensive verification
92+
// as it was already verified by the Exchange.
93+
s.setLocalHead(ctx, newHead)
94+
95+
log.Infow(
96+
"successfully requested a more recent network head",
97+
"height",
98+
newHead.Height(),
99+
)
100+
return newHead, nil
65101
}
66102

67-
// subjectiveHead returns the latest known local header that is not expired(within trusting period).
68-
// If the header is expired, it is retrieved from a trusted peer without validation;
69-
// in other words, an automatic subjective initialization is performed.
70-
func (s *Syncer[H]) subjectiveHead(ctx context.Context) (sbjHead H, err error) {
71-
// pending head is the latest known subjective head and sync target, so try to get it
72-
// NOTES:
73-
// * Empty when no sync is in progress
74-
// * Pending cannot be expired, guaranteed
75-
pendHead := s.pending.Head()
76-
if !pendHead.IsZero() {
77-
return pendHead, nil
78-
}
79-
// if pending is empty - get the latest stored/synced head
80-
storeHead, err := s.store.Head(ctx)
103+
// subjectiveHead returns the highest known non-expired subjective Head.
104+
// If the current subjective head is expired or does not exist,
105+
// it performs automatic subjective (re) initialization by requesting the most recent head from trusted peers.
106+
// Reports true if initialization was performed, false otherwise.
107+
func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, bool, error) {
108+
sbjHead, err := s.localHead(ctx)
81109
switch {
82110
case errors.Is(err, header.ErrEmptyStore):
83111
log.Info("empty store, initializing...")
84-
s.metrics.subjectiveInitialization(s.ctx)
85-
case !storeHead.IsZero() && isExpired(storeHead, s.Params.TrustingPeriod):
86-
log.Infow("stored head header expired", "height", storeHead.Height())
112+
case !sbjHead.IsZero() && isExpired(sbjHead, s.Params.TrustingPeriod):
113+
log.Infow(
114+
"subjective head expired, reinitializing...",
115+
"expired_height",
116+
sbjHead.Height(),
117+
)
87118
default:
88-
return storeHead, err
119+
// success or unknown error case
120+
return sbjHead, false, err
89121
}
90-
// fetch a new head from trusted peers if not available locally
122+
123+
s.metrics.subjectiveInitialization(ctx)
91124
newHead, err := s.head.Head(ctx)
92125
if err != nil {
93-
return newHead, err
126+
return newHead, false, err
94127
}
95-
switch {
96-
case isExpired(newHead, s.Params.TrustingPeriod):
128+
// still check if even the newly requested head is expired
129+
if isExpired(newHead, s.Params.TrustingPeriod) {
97130
// forbid initializing off an expired header
98131
err := fmt.Errorf("subjective initialization with an expired header(%d)", newHead.Height())
99-
log.Error(err, "\n trusted peers are out of sync")
100-
s.metrics.trustedPeersOutOufSync(s.ctx)
101-
return newHead, err
102-
case !isRecent(newHead, s.Params.blockTime, s.Params.recencyThreshold):
103-
// it's not the most recent, buts its good enough - allow initialization
104-
log.Warnw("subjective initialization with not recent header", "height", newHead.Height())
105-
s.metrics.trustedPeersOutOufSync(s.ctx)
132+
log.Error(err)
133+
log.Error("trusted peers are out of sync")
134+
s.metrics.trustedPeersOutOufSync(ctx)
135+
return newHead, false, err
106136
}
107137

108-
_, err = s.subjectiveTail(ctx, newHead)
109-
if err != nil {
110-
return newHead, fmt.Errorf(
111-
"subjective tail during subjective initialization for head %d: %w",
112-
newHead.Height(),
113-
err,
114-
)
115-
}
138+
log.Infow("subjective initialization finished", "height", newHead.Height())
139+
return newHead, false, nil
140+
}
116141

117-
// and set the fetched head as the new subjective head validating it against the tail
118-
// or, in other words, do 'automatic subjective initialization'
119-
err = s.incomingNetworkHead(ctx, newHead)
120-
if err != nil {
121-
err = fmt.Errorf("subjective initialization failed for head(%d): %w", newHead.Height(), err)
122-
log.Error(err)
123-
return newHead, err
142+
// localHead reports the current highest locally known head.
143+
func (s *Syncer[H]) localHead(ctx context.Context) (H, error) {
144+
// pending head is the latest known subjective head and a sync target
145+
// if it is empty, no sync is in progress
146+
pendHead := s.pending.Head()
147+
if !pendHead.IsZero() {
148+
return pendHead, nil
124149
}
125-
126-
log.Infow("subjective initialization finished", "head", newHead.Height())
127-
return newHead, nil
150+
// if pending is empty - get the latest stored/synced head
151+
return s.store.Head(ctx)
128152
}
129153

130-
// setSubjectiveHead takes already validated head and sets it as the new sync target.
131-
func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) {
154+
// setLocalHead takes the already validated head and sets it as the new sync target.
155+
func (s *Syncer[H]) setLocalHead(ctx context.Context, netHead H) {
132156
// TODO(@Wondertan): Right now, we can only store adjacent headers, instead we should:
133157
// * Allow storing any valid header here in Store
134158
// * Remove ErrNonAdjacent
@@ -155,24 +179,25 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) {
155179
log.Infow("new network head", "height", netHead.Height(), "hash", netHead.Hash())
156180
}
157181

158-
// incomingNetworkHead processes new potential network headers.
159-
// If the header valid, sets as new subjective header.
182+
// incomingNetworkHead processes new potential network heads.
183+
// If the header is valid, sets as the new subjective header.
160184
func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, head H) error {
161185
// ensure there is no racing between network head candidates
186+
// additionally ensures there is only one bifurcation attempt at a time
162187
s.incomingMu.Lock()
163188
defer s.incomingMu.Unlock()
164189

165190
if err := s.verify(ctx, head); err != nil {
166191
return err
167192
}
168193

169-
s.setSubjectiveHead(ctx, head)
194+
s.setLocalHead(ctx, head)
170195
return nil
171196
}
172197

173198
// verify verifies given network head candidate.
174199
func (s *Syncer[H]) verify(ctx context.Context, newHead H) error {
175-
sbjHead, err := s.subjectiveHead(ctx)
200+
sbjHead, _, err := s.subjectiveHead(ctx)
176201
if err != nil {
177202
log.Errorw("getting subjective head during new network head verification", "err", err)
178203
return err
@@ -250,7 +275,7 @@ func (s *Syncer[H]) verifyBifurcating(ctx context.Context, subjHead, newHead H)
250275
// candidate was validated properly, update subjHead.
251276
log.Infow("bifurcation: found new subjective head", "height", candidateHeight)
252277
subjHead = candidateHeader
253-
s.setSubjectiveHead(ctx, subjHead)
278+
s.setLocalHead(ctx, subjHead)
254279

255280
err = header.Verify(subjHead, newHead)
256281
if err == nil {

sync/syncer_head_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func TestSyncer_HeadWithTrustedHead(t *testing.T) {
125125
}
126126

127127
// Test will simulate a case with upto `iters` failures before we will get to
128-
// the header that can be verified against subjectiveHead.
128+
// the header that can be verified against localHead.
129129
func TestSyncer_verifyBifurcatingSuccess(t *testing.T) {
130130
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
131131
t.Cleanup(cancel)
@@ -196,15 +196,15 @@ func TestSyncer_verifyBifurcatingSuccess(t *testing.T) {
196196
headers[total-1].VerifyFailure = true
197197
headers[total-1].SoftFailure = true
198198

199-
subjHead, err := syncer.subjectiveHead(ctx)
199+
subjHead, err := syncer.localHead(ctx)
200200
require.NoError(t, err)
201201

202202
err = syncer.verifyBifurcating(ctx, subjHead, headers[total-1])
203203
require.NoError(t, err)
204204
}
205205

206206
// Test will simulate a case with upto `iters` failures before we will get to
207-
// the header that can be verified against subjectiveHead.
207+
// the header that can be verified against localHead.
208208
func TestSyncer_verifyBifurcatingSuccessWithBadCandidates(t *testing.T) {
209209
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
210210
t.Cleanup(cancel)
@@ -272,14 +272,14 @@ func TestSyncer_verifyBifurcatingSuccessWithBadCandidates(t *testing.T) {
272272
headers[total-1].VerifyFailure = true
273273
headers[total-1].SoftFailure = true
274274

275-
subjHead, err := syncer.subjectiveHead(ctx)
275+
subjHead, err := syncer.localHead(ctx)
276276
require.NoError(t, err)
277277

278278
err = syncer.verifyBifurcating(ctx, subjHead, headers[total-1])
279279
require.NoError(t, err)
280280
}
281281

282-
// Test will simulate a case when no headers can be verified against subjectiveHead.
282+
// Test will simulate a case when no headers can be verified against localHead.
283283
// As a result the [NewValidatorSetCantBeTrustedError] error will be returned.
284284
func TestSyncer_verifyBifurcatingCannotVerify(t *testing.T) {
285285
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
@@ -340,7 +340,7 @@ func TestSyncer_verifyBifurcatingCannotVerify(t *testing.T) {
340340
headers[total-1].VerifyFailure = true
341341
headers[total-1].SoftFailure = true
342342

343-
subjHead, err := syncer.subjectiveHead(ctx)
343+
subjHead, err := syncer.localHead(ctx)
344344
require.NoError(t, err)
345345

346346
err = syncer.verifyBifurcating(ctx, subjHead, headers[total-1])

sync/syncer_tail_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,10 @@ func TestSyncer_TailInitialization(t *testing.T) {
223223
remoteStore,
224224
localStore,
225225
headertest.NewDummySubscriber(),
226-
WithBlockTime(time.Second*6),
227-
WithRecencyThreshold(time.Nanosecond),
226+
// make sure the blocktime is set for proper tail estimation
227+
WithBlockTime(headertest.HeaderTime),
228+
// make sure that recency check is not triggered
229+
WithRecencyThreshold(time.Minute),
228230
test.option,
229231
)
230232
require.NoError(t, err)

0 commit comments

Comments
 (0)