Skip to content

Commit 5473b09

Browse files
committed
feat(sync): subjectiveTail
1 parent 72b0cce commit 5473b09

File tree

3 files changed

+142
-7
lines changed

3 files changed

+142
-7
lines changed

sync/options.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package sync
33
import (
44
"fmt"
55
"time"
6+
7+
"github.com/celestiaorg/go-header"
68
)
79

810
// Option is the functional option that is applied to the Syner instance
@@ -20,6 +22,22 @@ type Parameters struct {
2022
// needed to report and punish misbehavior should be less than the unbonding
2123
// period.
2224
TrustingPeriod time.Duration
25+
// SyncFromHash is the hash of the header from which the syncer should start syncing.
26+
//
27+
// By default, the syncer will start syncing from Tail, height of which is identified by the
28+
// network head time minus TrustingPeriod. SyncFromHash overrides this default, allowing
29+
// user to specify a custom starting point.
30+
//
31+
// SyncFromHash has higher priority than SyncFromHeight.
32+
SyncFromHash header.Hash
33+
// SyncFromHeight is the height of the header from which the syncer should start syncing.
34+
//
35+
// By default, the syncer will start syncing from Tail, height of which is identified by the
36+
// network head time minus TrustingPeriod. SyncFromHeight overrides this default, allowing
37+
// user to specify a custom starting point.
38+
//
39+
// SyncFromHeight has lower priority than SyncFromHash.
40+
SyncFromHeight uint64
2341
// blockTime provides a reference point for the Syncer to determine
2442
// whether its subjective head is outdated.
2543
// Keeping it private to disable serialization for it.

sync/sync_head.go

Lines changed: 91 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package sync
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
67
"fmt"
@@ -56,6 +57,80 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err
5657
return s.subjectiveHead(ctx)
5758
}
5859

60+
func (s *Syncer[H]) Tail(ctx context.Context) (H, error) {
61+
tail, err := s.store.Tail(ctx)
62+
switch {
63+
case errors.Is(err, header.ErrEmptyStore):
64+
switch {
65+
case s.Params.SyncFromHash != nil:
66+
tail, err = s.getter.Get(ctx, s.Params.SyncFromHash)
67+
if err != nil {
68+
return tail, fmt.Errorf("getting tail header by hash(%s): %w", s.Params.SyncFromHash, err)
69+
}
70+
case s.Params.SyncFromHeight != 0:
71+
tail, err = s.getter.GetByHeight(ctx, s.Params.SyncFromHeight)
72+
if err != nil {
73+
return tail, fmt.Errorf("getting tail header(%d): %w", s.Params.SyncFromHeight, err)
74+
}
75+
default:
76+
head, err := s.Head(ctx)
77+
if err != nil {
78+
return head, err
79+
}
80+
81+
tailHeight := estimateTail(head, s.Params.blockTime, s.Params.TrustingPeriod)
82+
tail, err = s.getter.GetByHeight(ctx, tailHeight)
83+
if err != nil {
84+
return tail, fmt.Errorf("getting estimated tail header(%d): %w", tailHeight, err)
85+
}
86+
}
87+
88+
err = s.store.Store.Append(ctx, tail)
89+
if err != nil {
90+
return tail, fmt.Errorf("appending tail header: %w", err)
91+
}
92+
93+
case !s.isTailActual(tail):
94+
if s.Params.SyncFromHash != nil {
95+
tail, err = s.getter.Get(ctx, s.Params.SyncFromHash)
96+
if err != nil {
97+
return tail, fmt.Errorf("getting tail header by hash(%s): %w", s.Params.SyncFromHash, err)
98+
}
99+
} else if s.Params.SyncFromHeight != 0 {
100+
tail, err = s.getter.GetByHeight(ctx, s.Params.SyncFromHeight)
101+
if err != nil {
102+
return tail, fmt.Errorf("getting tail header(%d): %w", s.Params.SyncFromHeight, err)
103+
}
104+
}
105+
106+
// TODO: Delete or sync up the diff
107+
108+
case err != nil:
109+
return tail, err
110+
}
111+
112+
return tail, nil
113+
}
114+
115+
// isTailActual checks if the given tail is actual based on the sync parameters.
116+
func (s *Syncer[H]) isTailActual(tail H) bool {
117+
if tail.IsZero() {
118+
return false
119+
}
120+
121+
switch {
122+
case s.Params.SyncFromHash == nil && s.Params.SyncFromHeight == 0:
123+
// if both overrides are zero value, then we good with whatever tail there is
124+
return true
125+
case s.Params.SyncFromHash != nil && bytes.Equal(s.Params.SyncFromHash, tail.Hash()):
126+
return true
127+
case s.Params.SyncFromHeight != 0 && s.Params.SyncFromHeight == tail.Height():
128+
return true
129+
default:
130+
return false
131+
}
132+
}
133+
59134
// subjectiveHead returns the latest known local header that is not expired(within trusting period).
60135
// If the header is expired, it is retrieved from a trusted peer without validation;
61136
// in other words, an automatic subjective initialization is performed.
@@ -70,15 +145,14 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) {
70145
}
71146
// if pending is empty - get the latest stored/synced head
72147
storeHead, err := s.store.Head(ctx)
73-
if err != nil {
148+
switch {
149+
case errors.Is(err, header.ErrEmptyStore):
150+
log.Infow("no stored head, initializing...", "height")
151+
case !storeHead.IsZero() && isExpired(storeHead, s.Params.TrustingPeriod):
152+
log.Infow("stored head header expired", "height", storeHead.Height())
153+
default:
74154
return storeHead, err
75155
}
76-
// check if the stored header is not expired and use it
77-
if !isExpired(storeHead, s.Params.TrustingPeriod) {
78-
return storeHead, nil
79-
}
80-
// otherwise, request head from a trusted peer
81-
log.Infow("stored head header expired", "height", storeHead.Height())
82156

83157
trustHead, err := s.head.Head(ctx)
84158
if err != nil {
@@ -257,3 +331,13 @@ func isRecent[H header.Header[H]](header H, blockTime, recencyThreshold time.Dur
257331
}
258332
return time.Since(header.Time()) <= recencyThreshold
259333
}
334+
335+
func estimateTail[H header.Header[H]](head H, blockTime, trustingPeriod time.Duration) (height uint64) {
336+
headersToRetain := uint64(trustingPeriod / blockTime)
337+
338+
if headersToRetain >= head.Height() {
339+
return 1
340+
}
341+
tail := head.Height() - headersToRetain
342+
return tail
343+
}

sync/sync_head_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,47 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/ipfs/go-datastore"
12+
dssync "github.com/ipfs/go-datastore/sync"
1113
"github.com/stretchr/testify/assert"
1214
"github.com/stretchr/testify/require"
1315

1416
"github.com/celestiaorg/go-header"
1517
"github.com/celestiaorg/go-header/headertest"
1618
"github.com/celestiaorg/go-header/local"
19+
"github.com/celestiaorg/go-header/store"
1720
)
1821

22+
func TestSyncer_Tail(t *testing.T) {
23+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
24+
t.Cleanup(cancel)
25+
26+
remoteStore := headertest.NewDummyStore(t)
27+
28+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
29+
localStore, err := store.NewStore[*headertest.DummyHeader](ds)
30+
require.NoError(t, err)
31+
err = localStore.Start(ctx)
32+
require.NoError(t, err)
33+
34+
syncer, err := NewSyncer[*headertest.DummyHeader](
35+
remoteStore,
36+
localStore,
37+
headertest.NewDummySubscriber(),
38+
WithRecencyThreshold(time.Nanosecond), // force recent requests
39+
WithBlockTime(time.Second*6),
40+
)
41+
require.NoError(t, err)
42+
43+
tail, err := syncer.Tail(ctx)
44+
require.NoError(t, err)
45+
assert.NotNil(t, tail)
46+
47+
storeTail, err := localStore.Tail(ctx)
48+
require.NoError(t, err)
49+
assert.EqualValues(t, tail.Height(), storeTail.Height())
50+
}
51+
1952
func TestSyncer_HeadConcurrencyError(t *testing.T) {
2053
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
2154
t.Cleanup(cancel)

0 commit comments

Comments
 (0)