Skip to content

Commit df446ed

Browse files
committed
major simplificussy
1 parent 70f65de commit df446ed

File tree

7 files changed

+360
-320
lines changed

7 files changed

+360
-320
lines changed

sync/sync.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,6 @@ func (s *Syncer[H]) Start(ctx context.Context) error {
9898
if err != nil {
9999
return err
100100
}
101-
// gets the latest tail and then head, kicking off syncing if necessary
102-
_, err = s.Tail(ctx)
103-
if err != nil {
104-
return fmt.Errorf("error getting tail during Start: %w", err)
105-
}
106101
_, err = s.Head(ctx)
107102
if err != nil {
108103
return fmt.Errorf("error getting latest head during Start: %w", err)
@@ -115,6 +110,7 @@ func (s *Syncer[H]) Start(ctx context.Context) error {
115110
// Stop stops Syncer.
116111
func (s *Syncer[H]) Stop(context.Context) error {
117112
s.cancel()
113+
s.store.Reset()
118114
return s.metrics.Close()
119115
}
120116

sync/sync_head.go

Lines changed: 35 additions & 203 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package sync
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
76
"fmt"
@@ -57,175 +56,6 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err
5756
return s.subjectiveHead(ctx)
5857
}
5958

60-
// Tail returns the current Tail header.
61-
//
62-
// If the underlying header store is not initialized/empty, it lazily performs subjective initialization
63-
// based on either estimated or preconfigured Tail.
64-
//
65-
// If the preconfigured Tail(SyncFromHash/Header) has changed upon Syncer restarts, it lazily sets the new Tail
66-
// and resolves the difference.
67-
func (s *Syncer[H]) Tail(ctx context.Context) (H, error) {
68-
tail, err := s.store.Tail(ctx)
69-
switch {
70-
case errors.Is(err, header.ErrEmptyStore):
71-
// Store is empty, likely the first start - initialize.
72-
log.Info("empty store, initializing...")
73-
// TODO(@Wondertan): Copying the initialization logic here instead of calling the general Head path.
74-
// This is a temporary solution needed to ensure Tail is written to the store first before Head.
75-
// To be reworked by bsync.
76-
head, err := s.head.Head(ctx)
77-
if err != nil {
78-
return head, fmt.Errorf("requesting network head: %w", err)
79-
}
80-
s.metrics.subjectiveInitialization(s.ctx)
81-
82-
switch {
83-
case s.Params.SyncFromHash != nil:
84-
tail, err = s.getter.Get(ctx, s.Params.SyncFromHash)
85-
if err != nil {
86-
return tail, fmt.Errorf(
87-
"getting tail header by hash(%s): %w",
88-
s.Params.SyncFromHash,
89-
err,
90-
)
91-
}
92-
case s.Params.SyncFromHeight != 0:
93-
tail, err = s.getter.GetByHeight(ctx, s.Params.SyncFromHeight)
94-
if err != nil {
95-
return tail, fmt.Errorf("getting tail header(%d): %w", s.Params.SyncFromHeight, err)
96-
}
97-
default:
98-
tailHeight := estimateTail(head, s.Params.blockTime, s.Params.TrustingPeriod)
99-
tail, err = s.getter.GetByHeight(ctx, tailHeight)
100-
if err != nil {
101-
return tail, fmt.Errorf("getting estimated tail header(%d): %w", tailHeight, err)
102-
}
103-
}
104-
105-
err = s.store.Append(ctx, tail)
106-
if err != nil {
107-
return tail, fmt.Errorf("appending tail header: %w", err)
108-
}
109-
110-
err = s.incomingNetworkHead(ctx, head)
111-
if err != nil {
112-
return tail, fmt.Errorf("applying head from trusted peers: %w", err)
113-
}
114-
115-
log.Infow(
116-
"subjective initialization finished",
117-
"tail_height",
118-
tail.Height(),
119-
"head_height",
120-
head.Height(),
121-
)
122-
123-
case !tail.IsZero() && !s.isTailActual(tail):
124-
// Configured Tail has changed - get a new one and resolve the diff
125-
126-
currentTail, newTail := tail, tail
127-
switch {
128-
case s.Params.SyncFromHash != nil:
129-
// check first locally if the new Tail exists
130-
newTail, err = s.store.Get(ctx, s.Params.SyncFromHash)
131-
if err != nil {
132-
// if for whatever reason Tail is not available locally, request the new one from the network.
133-
newTail, err = s.getter.Get(ctx, s.Params.SyncFromHash)
134-
if err != nil {
135-
return tail, fmt.Errorf(
136-
"getting tail header by hash(%s): %w",
137-
s.Params.SyncFromHash,
138-
err,
139-
)
140-
}
141-
err = s.store.Append(ctx, newTail)
142-
if err != nil {
143-
return tail, fmt.Errorf(
144-
"appending the new tail header(%d): %w",
145-
newTail.Height(),
146-
err,
147-
)
148-
}
149-
}
150-
case s.Params.SyncFromHeight != 0:
151-
// check first locally if the new Tail exists
152-
newTail, err = s.store.GetByHeight(ctx, s.Params.SyncFromHeight)
153-
if err != nil {
154-
// if for whatever reason Tail is not available locally, request the new one from the network.
155-
newTail, err = s.getter.GetByHeight(ctx, s.Params.SyncFromHeight)
156-
if err != nil {
157-
return tail, fmt.Errorf(
158-
"getting tail header by hash(%s): %w",
159-
s.Params.SyncFromHash,
160-
err,
161-
)
162-
}
163-
err = s.store.Append(ctx, newTail)
164-
if err != nil {
165-
return tail, fmt.Errorf(
166-
"appending the new tail header(%d): %w",
167-
newTail.Height(),
168-
err,
169-
)
170-
}
171-
}
172-
}
173-
174-
switch {
175-
case currentTail.Height() > newTail.Height():
176-
log.Infof(
177-
"tail header changed from %d to %d, syncing the diff...",
178-
currentTail,
179-
newTail,
180-
)
181-
// TODO(@Wondertan): This works but it assumes this code is only run before syncing routine starts.
182-
// If run after, it may race with other in prog syncs.
183-
// To be reworked by bsync.
184-
err := s.doSync(ctx, newTail, currentTail)
185-
if err != nil {
186-
return tail, fmt.Errorf("syncing the diff between old and new Tail: %w", err)
187-
}
188-
case currentTail.Height() < newTail.Height():
189-
log.Infof(
190-
"tail header changed from %d to %d, pruning the diff...",
191-
currentTail,
192-
newTail,
193-
)
194-
err := s.store.DeleteTo(ctx, newTail.Height())
195-
if err != nil {
196-
return tail, fmt.Errorf(
197-
"deleting headers up to newly configured Tail(%d): %w",
198-
newTail.Height(),
199-
err,
200-
)
201-
}
202-
default:
203-
// equals case, must not happen
204-
panic("currentTail == newTail")
205-
}
206-
207-
case err != nil:
208-
return tail, err
209-
}
210-
211-
return tail, nil
212-
}
213-
214-
// isTailActual checks if the given tail is actual based on the sync parameters.
215-
func (s *Syncer[H]) isTailActual(tail H) bool {
216-
switch {
217-
case s.Params.SyncFromHash == nil && s.Params.SyncFromHeight == 0:
218-
// if both overrides are zero value, then we good with whatever tail there is
219-
return true
220-
case s.Params.SyncFromHash != nil && bytes.Equal(s.Params.SyncFromHash, tail.Hash()):
221-
return true
222-
case s.Params.SyncFromHeight != 0 && s.Params.SyncFromHeight == tail.Height():
223-
return true
224-
default:
225-
return false
226-
}
227-
}
228-
22959
// subjectiveHead returns the latest known local header that is not expired(within trusting period).
23060
// If the header is expired, it is retrieved from a trusted peer without validation;
23161
// in other words, an automatic subjective initialization is performed.
@@ -242,34 +72,41 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) {
24272
storeHead, err := s.store.Head(ctx)
24373
switch {
24474
case errors.Is(err, header.ErrEmptyStore):
245-
log.Info("no stored head, initializing...")
75+
log.Info("empty store, initializing...")
76+
s.metrics.subjectiveInitialization(s.ctx)
24677
case !storeHead.IsZero() && isExpired(storeHead, s.Params.TrustingPeriod):
24778
log.Infow("stored head header expired", "height", storeHead.Height())
24879
default:
24980
return storeHead, err
25081
}
251-
252-
trustHead, err := s.head.Head(ctx)
82+
// fetch a new head from trusted peers if not available locally
83+
newHead, err := s.head.Head(ctx)
25384
if err != nil {
254-
return trustHead, err
85+
return newHead, err
25586
}
256-
s.metrics.subjectiveInitialization(s.ctx)
257-
// and set it as the new subjective head without validation,
258-
// or, in other words, do 'automatic subjective initialization'
259-
// NOTE: we avoid validation as the head expired to prevent possibility of the Long-Range Attack
260-
s.setSubjectiveHead(ctx, trustHead)
26187
switch {
262-
default:
263-
log.Infow("subjective initialization finished", "height", trustHead.Height())
264-
return trustHead, nil
265-
case isExpired(trustHead, s.Params.TrustingPeriod):
266-
log.Warnw("subjective initialization with an expired header", "height", trustHead.Height())
267-
case !isRecent(trustHead, s.Params.blockTime, s.Params.recencyThreshold):
268-
log.Warnw("subjective initialization with an old header", "height", trustHead.Height())
88+
case isExpired(newHead, s.Params.TrustingPeriod):
89+
// forbid initializing off an expired header
90+
err := fmt.Errorf("subjective initialization with an expired header(%d)", newHead.Height())
91+
log.Error(err, "\n trusted peers are out of sync")
92+
s.metrics.trustedPeersOutOufSync(s.ctx)
93+
return newHead, err
94+
case !isRecent(newHead, s.Params.blockTime, s.Params.recencyThreshold):
95+
log.Warnw("subjective initialization with not recent header", "height", newHead.Height())
96+
s.metrics.trustedPeersOutOufSync(s.ctx)
26997
}
270-
log.Warn("trusted peer is out of sync")
271-
s.metrics.trustedPeersOutOufSync(s.ctx)
272-
return trustHead, nil
98+
99+
// and set the fetched head as the new subjective head validating it against the tail
100+
// or, in other words, do 'automatic subjective initialization'
101+
err = s.incomingNetworkHead(ctx, newHead)
102+
if err != nil {
103+
err = fmt.Errorf("subjective initialization failed for head(%d): %w", newHead.Height(), err)
104+
log.Error(err)
105+
return newHead, err
106+
}
107+
108+
log.Infow("subjective initialization finished", "head", newHead.Height())
109+
return newHead, nil
273110
}
274111

275112
// setSubjectiveHead takes already validated head and sets it as the new sync target.
@@ -307,7 +144,15 @@ func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, head H) error {
307144
s.incomingMu.Lock()
308145
defer s.incomingMu.Unlock()
309146

310-
err := s.verify(ctx, head)
147+
// TODO(@Wondertan): We need to ensure Tail is available before verification for subj init case and this is fine here
148+
// however, check if that's ok to trigger Tail moves on network header that hasn't been verified yet
149+
// To be reworked by bsync.
150+
_, err := s.subjectiveTail(ctx, head)
151+
if err != nil {
152+
return err
153+
}
154+
155+
err = s.verify(ctx, head)
311156
if err != nil {
312157
return err
313158
}
@@ -426,16 +271,3 @@ func isRecent[H header.Header[H]](header H, blockTime, recencyThreshold time.Dur
426271
}
427272
return time.Since(header.Time()) <= recencyThreshold
428273
}
429-
430-
func estimateTail[H header.Header[H]](
431-
head H,
432-
blockTime, trustingPeriod time.Duration,
433-
) (height uint64) {
434-
headersToRetain := uint64(trustingPeriod / blockTime) //nolint:gosec
435-
436-
if headersToRetain >= head.Height() {
437-
return 1
438-
}
439-
tail := head.Height() - headersToRetain
440-
return tail
441-
}

0 commit comments

Comments
 (0)