Skip to content

Commit 00e2e9b

Browse files
committed
add diff handling and minor other improvements
1 parent 0a9dcef commit 00e2e9b

File tree

4 files changed

+180
-20
lines changed

4 files changed

+180
-20
lines changed

sync/options.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,19 @@ func WithParams(params Parameters) Option {
101101
*old = params
102102
}
103103
}
104+
105+
// WithSyncFromHash sets given header hash a starting point for syncing.
106+
// See [Parameters.SyncFromHash] for details.
107+
func WithSyncFromHash(hash header.Hash) Option {
108+
return func(p *Parameters) {
109+
p.SyncFromHash = hash
110+
}
111+
}
112+
113+
// WithSyncFromHeight sets given height a starting point for syncing.
114+
// See [Parameters.SyncFromHeight] for details.
115+
func WithSyncFromHeight(height uint64) Option {
116+
return func(p *Parameters) {
117+
p.SyncFromHeight = height
118+
}
119+
}

sync/sync.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,11 @@ func (s *Syncer[H]) Start(ctx context.Context) error {
9898
if err != nil {
9999
return err
100100
}
101-
// gets the latest head and kicks off syncing if necessary
101+
// gets the latest tail and then head, kicking off syncing if necessary
102+
_, err = s.Tail(ctx)
103+
if err != nil {
104+
return fmt.Errorf("error getting tail during Start: %w", err)
105+
}
102106
_, err = s.Head(ctx)
103107
if err != nil {
104108
return fmt.Errorf("error getting latest head during Start: %w", err)

sync/sync_head.go

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,25 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err
5757
return s.subjectiveHead(ctx)
5858
}
5959

60+
// Tail returns the current Tail header.
61+
//
62+
// If the underlying header store is not initialized/empty, it lazily performs subjective initialization
63+
// based on either estimated or preconfigured Tail.
64+
//
65+
// If the preconfigured Tail(SyncFromHash/Header) has changed upon Syncer restarts, it lazily sets the new Tail
66+
// and resolves the difference.
6067
func (s *Syncer[H]) Tail(ctx context.Context) (H, error) {
6168
tail, err := s.store.Tail(ctx)
6269
switch {
6370
case errors.Is(err, header.ErrEmptyStore):
64-
// TODO(@Wondertan): This is a temporary solution requesting the head directly from the network instead of
65-
// calling general Head path. This is needed to ensure Tail is written to the store first.
71+
// Store is empty, likely the first start - initialize.
72+
log.Info("empty store, initializing...")
73+
// TODO(@Wondertan): Requesting the head directly from the network instead of
74+
// calling general Head path. This is a temporary solution needed to ensure Tail is written to the store first
75+
// before Head. To be reworked by bsync.
6676
head, err := s.head.Head(ctx)
6777
if err != nil {
68-
return head, err
78+
return head, fmt.Errorf("requesting network head: %w", err)
6979
}
7080

7181
switch {
@@ -101,24 +111,64 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) {
101111
return tail, fmt.Errorf("applying head from trusted peers: %w", err)
102112
}
103113

104-
case !s.isTailActual(tail):
114+
log.Infof("initialized with Tail %d and Head %d", tail.Height(), head.Height())
115+
116+
// TODO: Make sure all the metrics for this alternative subjective init path are added
117+
118+
case !tail.IsZero() && !s.isTailActual(tail):
119+
// Configured Tail has changed - get a new one and resolve the diff
120+
121+
currentTail, newTail := tail, tail
122+
105123
if s.Params.SyncFromHash != nil {
106-
tail, err = s.getter.Get(ctx, s.Params.SyncFromHash)
124+
// check first locally if the new Tail exists
125+
newTail, err = s.store.Get(ctx, s.Params.SyncFromHash)
107126
if err != nil {
108-
return tail, fmt.Errorf(
109-
"getting tail header by hash(%s): %w",
110-
s.Params.SyncFromHash,
111-
err,
112-
)
127+
// if for whatever reason Tail is not available locally, request the new one from the network.
128+
newTail, err = s.getter.Get(ctx, s.Params.SyncFromHash)
129+
if err != nil {
130+
return tail, fmt.Errorf(
131+
"getting tail header by hash(%s): %w",
132+
s.Params.SyncFromHash,
133+
err,
134+
)
135+
}
113136
}
114137
} else if s.Params.SyncFromHeight != 0 {
115-
tail, err = s.getter.GetByHeight(ctx, s.Params.SyncFromHeight)
138+
// check first locally if the new Tail exists
139+
newTail, err = s.store.GetByHeight(ctx, s.Params.SyncFromHeight)
116140
if err != nil {
117-
return tail, fmt.Errorf("getting tail header(%d): %w", s.Params.SyncFromHeight, err)
141+
// if for whatever reason Tail is not available locally, request the new one from the network.
142+
newTail, err = s.getter.GetByHeight(ctx, s.Params.SyncFromHeight)
143+
if err != nil {
144+
return tail, fmt.Errorf(
145+
"getting tail header by hash(%s): %w",
146+
s.Params.SyncFromHash,
147+
err,
148+
)
149+
}
118150
}
119151
}
120152

121-
// TODO: Delete or sync up the diff
153+
if currentTail.Height() > newTail.Height() {
154+
log.Infow("tail header changed from %d to %d, syncing the diff...", currentTail, newTail)
155+
// TODO(@Wondertan): This works but it assumes this code is only run before syncing routine starts.
156+
// If run after, it may race with other in prog syncs.
157+
// To be reworked by bsync.
158+
err := s.doSync(ctx, newTail, currentTail)
159+
if err != nil {
160+
return tail, fmt.Errorf("syncing the diff between old and new Tail: %w", err)
161+
}
162+
} else if currentTail.Height() < newTail.Height() {
163+
log.Infow("Tail header changed from %d to %d, pruning the diff...", currentTail, newTail)
164+
err := s.store.DeleteTo(ctx, newTail.Height())
165+
if err != nil {
166+
return tail, fmt.Errorf("deleting headers up to newly configured Tail(%d): %w", newTail.Height(), err)
167+
}
168+
} else {
169+
// equals case, must not happen
170+
panic("currentTail == newTail")
171+
}
122172

123173
case err != nil:
124174
return tail, err
@@ -129,10 +179,6 @@ func (s *Syncer[H]) Tail(ctx context.Context) (H, error) {
129179

130180
// isTailActual checks if the given tail is actual based on the sync parameters.
131181
func (s *Syncer[H]) isTailActual(tail H) bool {
132-
if tail.IsZero() {
133-
return false
134-
}
135-
136182
switch {
137183
case s.Params.SyncFromHash == nil && s.Params.SyncFromHeight == 0:
138184
// if both overrides are zero value, then we good with whatever tail there is

sync/sync_head_test.go

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,23 +31,117 @@ func TestSyncer_Tail(t *testing.T) {
3131
err = localStore.Start(ctx)
3232
require.NoError(t, err)
3333

34+
syncer, err := NewSyncer[*headertest.DummyHeader](
35+
remoteStore,
36+
localStore,
37+
headertest.NewDummySubscriber(),
38+
WithBlockTime(time.Second*6),
39+
)
40+
require.NoError(t, err)
41+
42+
tail, err := syncer.Tail(ctx)
43+
require.NoError(t, err)
44+
assert.NotNil(t, tail)
45+
46+
time.Sleep(time.Millisecond * 10)
47+
48+
err = syncer.Start(ctx)
49+
require.NoError(t, err)
50+
51+
time.Sleep(time.Millisecond * 10)
52+
53+
storeTail, err := localStore.Tail(ctx)
54+
require.NoError(t, err)
55+
assert.EqualValues(t, tail.Height(), storeTail.Height())
56+
57+
storeHead, err := localStore.Head(ctx)
58+
require.NoError(t, err)
59+
assert.EqualValues(t, remoteStore.Height(), storeHead.Height())
60+
}
61+
62+
func TestSyncer_TailInitFromHash(t *testing.T) {
63+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
64+
t.Cleanup(cancel)
65+
66+
remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100)
67+
68+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
69+
localStore, err := store.NewStore[*headertest.DummyHeader](ds)
70+
require.NoError(t, err)
71+
err = localStore.Start(ctx)
72+
require.NoError(t, err)
73+
74+
expectedTail, err := remoteStore.GetByHeight(ctx, 69)
75+
require.NoError(t, err)
76+
3477
syncer, err := NewSyncer[*headertest.DummyHeader](
3578
remoteStore,
3679
localStore,
3780
headertest.NewDummySubscriber(),
3881
WithRecencyThreshold(time.Nanosecond), // force recent requests
3982
WithBlockTime(time.Second*6),
83+
WithSyncFromHash(expectedTail.Hash()),
4084
)
4185
require.NoError(t, err)
4286

4387
tail, err := syncer.Tail(ctx)
4488
require.NoError(t, err)
4589
assert.NotNil(t, tail)
90+
assert.EqualValues(t, tail.Height(), expectedTail.Height())
91+
92+
time.Sleep(time.Millisecond * 10)
4693

4794
err = syncer.Start(ctx)
4895
require.NoError(t, err)
4996

50-
time.Sleep(time.Millisecond * 100)
97+
time.Sleep(time.Millisecond * 10)
98+
99+
storeTail, err := localStore.Tail(ctx)
100+
require.NoError(t, err)
101+
assert.EqualValues(t, tail.Height(), storeTail.Height())
102+
103+
storeHead, err := localStore.Head(ctx)
104+
require.NoError(t, err)
105+
assert.EqualValues(t, remoteStore.Height(), storeHead.Height())
106+
}
107+
108+
func TestSyncer_TailInitFromHeight(t *testing.T) {
109+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
110+
t.Cleanup(cancel)
111+
112+
remoteStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 100)
113+
114+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
115+
localStore, err := store.NewStore[*headertest.DummyHeader](ds)
116+
require.NoError(t, err)
117+
err = localStore.Start(ctx)
118+
require.NoError(t, err)
119+
120+
expectedTail, err := remoteStore.GetByHeight(ctx, 69)
121+
require.NoError(t, err)
122+
123+
syncer, err := NewSyncer[*headertest.DummyHeader](
124+
remoteStore,
125+
localStore,
126+
headertest.NewDummySubscriber(),
127+
WithRecencyThreshold(time.Nanosecond), // force recent requests
128+
WithBlockTime(time.Second*6),
129+
WithSyncFromHeight(expectedTail.Height()),
130+
)
131+
require.NoError(t, err)
132+
133+
tail, err := syncer.Tail(ctx)
134+
require.NoError(t, err)
135+
assert.NotNil(t, tail)
136+
assert.EqualValues(t, tail.Height(), expectedTail.Height())
137+
138+
time.Sleep(time.Millisecond * 10)
139+
140+
err = syncer.Start(ctx)
141+
require.NoError(t, err)
142+
143+
time.Sleep(time.Millisecond * 10)
144+
51145
storeTail, err := localStore.Tail(ctx)
52146
require.NoError(t, err)
53147
assert.EqualValues(t, tail.Height(), storeTail.Height())
@@ -443,7 +537,7 @@ type errorGetter struct{}
443537

444538
func (e errorGetter) Head(
445539
context.Context,
446-
...header.HeadOption[*headertest.DummyHeader],
540+
...header.HeadOption[*headertest.DummyHeader],
447541
) (*headertest.DummyHeader, error) {
448542
time.Sleep(time.Millisecond * 1)
449543
return nil, fmt.Errorf("error")

0 commit comments

Comments
 (0)