Skip to content

Commit 817429e

Browse files
authored
feat(sync): bifurcation for syncTarget (#219)
Fixes #217
1 parent 816cf0e commit 817429e

File tree

5 files changed

+327
-4
lines changed

5 files changed

+327
-4
lines changed

header.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ type Header[H any] interface {
1212
// New creates new instance of a header.
1313
// It exists to overcome limitation of Go's type system.
1414
// See:
15-
// https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#pointer-method-example
15+
//
16+
//https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#pointer-method-example
1617
New() H
1718
// IsZero reports whether Header is a zero value of it's concrete type.
1819
IsZero() bool

headertest/dummy_header.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ type DummyHeader struct {
3030
// SoftFailure allows for testing scenarios where a header would fail
3131
// verification with SoftFailure set to true
3232
SoftFailure bool
33+
34+
// VerifyFn can be used to change header.Verify behavior per header.
35+
VerifyFn func(hdr *DummyHeader) error `json:"-"`
3336
}
3437

3538
func RandDummyHeader(t *testing.T) *DummyHeader {
@@ -100,6 +103,9 @@ func (d *DummyHeader) IsExpired(period time.Duration) bool {
100103
}
101104

102105
func (d *DummyHeader) Verify(hdr *DummyHeader) error {
106+
if d.VerifyFn != nil {
107+
return d.VerifyFn(hdr)
108+
}
103109
if hdr.VerifyFailure {
104110
return &header.VerifyError{Reason: ErrDummyVerify, SoftFailure: hdr.SoftFailure}
105111
}

sync/metrics.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type metrics struct {
2626
trustedPeersOutOfSync metric.Int64Counter
2727
outdatedHeader metric.Int64Counter
2828
subjectiveInit metric.Int64Counter
29+
failedBifurcations metric.Int64Counter
2930

3031
subjectiveHead atomic.Uint64
3132

@@ -75,6 +76,16 @@ func newMetrics() (*metrics, error) {
7576
return nil, err
7677
}
7778

79+
failedBifurcations, err := meter.Int64Counter(
80+
"hdr_failed_bifurcations_total",
81+
metric.WithDescription(
82+
"tracks how many times bifurcation failed against subjective head",
83+
),
84+
)
85+
if err != nil {
86+
return nil, err
87+
}
88+
7889
subjectiveHead, err := meter.Int64ObservableGauge(
7990
"hdr_sync_subjective_head_gauge",
8091
metric.WithDescription("subjective head height"),
@@ -116,6 +127,7 @@ func newMetrics() (*metrics, error) {
116127
trustedPeersOutOfSync: trustedPeersOutOfSync,
117128
outdatedHeader: outdatedHeader,
118129
subjectiveInit: subjectiveInit,
130+
failedBifurcations: failedBifurcations,
119131
syncLoopDurationHist: syncLoopDurationHist,
120132
syncLoopRunningInst: syncLoopRunningInst,
121133
requestRangeTimeHist: requestRangeTimeHist,
@@ -198,6 +210,17 @@ func (m *metrics) newSubjectiveHead(ctx context.Context, height uint64, timestam
198210
})
199211
}
200212

213+
func (m *metrics) failedBifurcation(ctx context.Context, height uint64, hash string) {
214+
m.observe(ctx, func(ctx context.Context) {
215+
m.failedBifurcations.Add(ctx, 1,
216+
metric.WithAttributes(
217+
attribute.Int64("height", int64(height)), //nolint:gosec
218+
attribute.String("hash", hash),
219+
),
220+
)
221+
})
222+
}
223+
201224
func (m *metrics) rangeRequestStart() {
202225
if m == nil {
203226
return

sync/sync_head.go

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sync
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"time"
78

89
"github.com/celestiaorg/go-header"
@@ -164,6 +165,7 @@ func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, head H) error {
164165
}
165166

166167
// verify verifies given network head candidate.
168+
// bool reports whether the returned error is a soft error.
167169
func (s *Syncer[H]) verify(ctx context.Context, newHead H) (bool, error) {
168170
sbjHead, err := s.subjectiveHead(ctx)
169171
if err != nil {
@@ -178,7 +180,12 @@ func (s *Syncer[H]) verify(ctx context.Context, newHead H) (bool, error) {
178180
}
179181

180182
var verErr *header.VerifyError
181-
if errors.As(err, &verErr) && !verErr.SoftFailure {
183+
if errors.As(err, &verErr) {
184+
if verErr.SoftFailure {
185+
err := s.verifyBifurcating(ctx, sbjHead, newHead)
186+
return err != nil, err
187+
}
188+
182189
logF := log.Warnw
183190
if errors.Is(err, header.ErrKnownHeader) {
184191
logF = log.Debugw
@@ -194,6 +201,69 @@ func (s *Syncer[H]) verify(ctx context.Context, newHead H) (bool, error) {
194201
return verErr.SoftFailure, err
195202
}
196203

204+
// verifyBifurcating verifies networkHead against subjHead via the interim headers when direct
205+
// verification is impossible.
206+
// It tries to find a header (or several headers if necessary) between the networkHead and
207+
// the subjectiveHead such that non-adjacent (or in the worst case adjacent) verification
208+
// passes and the networkHead can be verified as a valid sync target against the syncer's
209+
// subjectiveHead.
210+
// A non-nil error is returned when networkHead can't be verified.
211+
func (s *Syncer[H]) verifyBifurcating(ctx context.Context, subjHead, networkHead H) error {
212+
log.Warnw("header bifurcation started",
213+
"height", networkHead.Height(),
214+
"hash", networkHead.Hash().String(),
215+
)
216+
217+
subjHeight := subjHead.Height()
218+
219+
diff := networkHead.Height() - subjHeight
220+
221+
for diff > 1 {
222+
candidateHeight := subjHeight + diff/2
223+
224+
candidateHeader, err := s.getter.GetByHeight(ctx, candidateHeight)
225+
if err != nil {
226+
return err
227+
}
228+
229+
if err := header.Verify(subjHead, candidateHeader); err != nil {
230+
var verErr *header.VerifyError
231+
if errors.As(err, &verErr) && !verErr.SoftFailure {
232+
return err
233+
}
234+
235+
// candidate failed, go deeper in 1st half.
236+
diff /= 2
237+
continue
238+
}
239+
240+
// candidate was validated properly, update subjHead.
241+
subjHead = candidateHeader
242+
243+
if err := header.Verify(subjHead, networkHead); err == nil {
244+
// network head validate properly, return success.
245+
return nil
246+
}
247+
248+
// new subjHead failed, go deeper in 2nd half.
249+
subjHeight = subjHead.Height()
250+
diff = networkHead.Height() - subjHeight
251+
}
252+
253+
s.metrics.failedBifurcation(ctx, networkHead.Height(), networkHead.Hash().String())
254+
log.Errorw("header bifurcation failed",
255+
"height", networkHead.Height(),
256+
"hash", networkHead.Hash().String(),
257+
)
258+
259+
return &header.VerifyError{
260+
Reason: fmt.Errorf("sync: header validation against subjHead height:%d hash:%s",
261+
networkHead.Height(), networkHead.Hash().String(),
262+
),
263+
SoftFailure: false,
264+
}
265+
}
266+
197267
// isExpired checks if header is expired against trusting period.
198268
func isExpired[H header.Header[H]](header H, period time.Duration) bool {
199269
expirationTime := header.Time().Add(period)

0 commit comments

Comments
 (0)