Skip to content

Commit 3471245

Browse files
zsfelfoldijorgemmsilva
authored andcommitted
beacon/light: request finality update explicitly when necessary (ethereum#29567)
This PR adds an extra mechanism to sync.HeadSync that tries to retrieve the latest finality update from every server each time it sends an optimistic update in a new epoch (unless we already have a validated finality update attested in the same epoch). Note that this is not necessary and does not happen if the new finality update is delivered before the optimistic update. The spec only mandates light_client_finality_update events when a new epoch is finalized. If the chain does not finalize for a while then we might need an explicit request that returns a finality proof that proves the same finality epoch from the latest attested epoch.
1 parent 2238cab commit 3471245

14 files changed

+316
-159
lines changed

beacon/blsync/block_sync.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type beaconBlockSync struct {
4141

4242
type headTracker interface {
4343
PrefetchHead() types.HeadInfo
44-
ValidatedHead() (types.SignedHeader, bool)
44+
ValidatedOptimistic() (types.OptimisticUpdate, bool)
4545
ValidatedFinality() (types.FinalityUpdate, bool)
4646
}
4747

@@ -66,6 +66,7 @@ func (s *beaconBlockSync) Process(requester request.Requester, events []request.
6666
case request.EvResponse, request.EvFail, request.EvTimeout:
6767
sid, req, resp := event.RequestInfo()
6868
blockRoot := common.Hash(req.(sync.ReqBeaconBlock))
69+
log.Debug("Beacon block event", "type", event.Type.Name, "hash", blockRoot)
6970
if resp != nil {
7071
s.recentBlocks.Add(blockRoot, resp.(*types.BeaconBlock))
7172
}
@@ -80,8 +81,8 @@ func (s *beaconBlockSync) Process(requester request.Requester, events []request.
8081
}
8182
s.updateEventFeed()
8283
// request validated head block if unavailable and not yet requested
83-
if vh, ok := s.headTracker.ValidatedHead(); ok {
84-
s.tryRequestBlock(requester, vh.Header.Hash(), false)
84+
if vh, ok := s.headTracker.ValidatedOptimistic(); ok {
85+
s.tryRequestBlock(requester, vh.Attested.Hash(), false)
8586
}
8687
// request prefetch head if the given server has announced it
8788
if prefetchHead := s.headTracker.PrefetchHead().BlockRoot; prefetchHead != (common.Hash{}) {
@@ -114,31 +115,30 @@ func blockHeadInfo(block *types.BeaconBlock) types.HeadInfo {
114115
}
115116

116117
func (s *beaconBlockSync) updateEventFeed() {
117-
head, ok := s.headTracker.ValidatedHead()
118+
optimistic, ok := s.headTracker.ValidatedOptimistic()
118119
if !ok {
119120
return
120121
}
121122

122-
validatedHead := head.Header.Hash()
123+
validatedHead := optimistic.Attested.Hash()
123124
headBlock, ok := s.recentBlocks.Get(validatedHead)
124125
if !ok {
125126
return
126127
}
127128

128129
var finalizedHash common.Hash
129130
if finality, ok := s.headTracker.ValidatedFinality(); ok {
130-
he := head.Header.Epoch()
131+
he := optimistic.Attested.Epoch()
131132
fe := finality.Attested.Header.Epoch()
132133
switch {
133134
case he == fe:
134135
finalizedHash = finality.Finalized.PayloadHeader.BlockHash()
135136
case he < fe:
136137
return
137138
case he == fe+1:
138-
parent, ok := s.recentBlocks.Get(head.Header.ParentRoot)
139+
parent, ok := s.recentBlocks.Get(optimistic.Attested.ParentRoot)
139140
if !ok || parent.Slot()/params.EpochLength == fe {
140141
return // head is at first slot of next epoch, wait for finality update
141-
//TODO: try to fetch finality update directly if subscription does not deliver
142142
}
143143
}
144144
}
@@ -156,7 +156,7 @@ func (s *beaconBlockSync) updateEventFeed() {
156156
return
157157
}
158158
s.chainHeadFeed.Send(types.ChainHeadEvent{
159-
BeaconHead: head.Header,
159+
BeaconHead: optimistic.Attested.Header,
160160
Block: execBlock,
161161
Finalized: finalizedHash,
162162
})

beacon/blsync/block_sync_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,12 @@ func (h *testHeadTracker) PrefetchHead() types.HeadInfo {
140140
return h.prefetch
141141
}
142142

143-
func (h *testHeadTracker) ValidatedHead() (types.SignedHeader, bool) {
144-
return h.validated, h.validated.Header != (types.Header{})
143+
func (h *testHeadTracker) ValidatedOptimistic() (types.OptimisticUpdate, bool) {
144+
return types.OptimisticUpdate{
145+
Attested: types.HeaderWithExecProof{Header: h.validated.Header},
146+
Signature: h.validated.Signature,
147+
SignatureSlot: h.validated.SignatureSlot,
148+
}, h.validated.Header != (types.Header{})
145149
}
146150

147151
// TODO add test case for finality

beacon/blsync/engineclient.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ func (ec *engineClient) updateLoop(headCh <-chan types.ChainHeadEvent) {
6262
for {
6363
select {
6464
case <-ec.rootCtx.Done():
65+
log.Debug("Stopping engine API update loop")
6566
return
6667

6768
case event := <-headCh:
@@ -73,12 +74,14 @@ func (ec *engineClient) updateLoop(headCh <-chan types.ChainHeadEvent) {
7374
fork := ec.config.ForkAtEpoch(event.BeaconHead.Epoch())
7475
forkName := strings.ToLower(fork.Name)
7576

77+
log.Debug("Calling NewPayload", "number", event.Block.NumberU64(), "hash", event.Block.Hash())
7678
if status, err := ec.callNewPayload(forkName, event); err == nil {
7779
log.Info("Successful NewPayload", "number", event.Block.NumberU64(), "hash", event.Block.Hash(), "status", status)
7880
} else {
7981
log.Error("Failed NewPayload", "number", event.Block.NumberU64(), "hash", event.Block.Hash(), "error", err)
8082
}
8183

84+
log.Debug("Calling ForkchoiceUpdated", "head", event.Block.Hash())
8285
if status, err := ec.callForkchoiceUpdated(forkName, event); err == nil {
8386
log.Info("Successful ForkchoiceUpdated", "head", event.Block.Hash(), "status", status)
8487
} else {

beacon/light/api/api_server.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ func (s *ApiServer) Subscribe(eventCallback func(event request.Event)) {
4646
log.Debug("New head received", "slot", slot, "blockRoot", blockRoot)
4747
eventCallback(request.Event{Type: sync.EvNewHead, Data: types.HeadInfo{Slot: slot, BlockRoot: blockRoot}})
4848
},
49-
OnSignedHead: func(head types.SignedHeader) {
50-
log.Debug("New signed head received", "slot", head.Header.Slot, "blockRoot", head.Header.Hash(), "signerCount", head.Signature.SignerCount())
51-
eventCallback(request.Event{Type: sync.EvNewSignedHead, Data: head})
49+
OnOptimistic: func(update types.OptimisticUpdate) {
50+
log.Debug("New optimistic update received", "slot", update.Attested.Slot, "blockRoot", update.Attested.Hash(), "signerCount", update.Signature.SignerCount())
51+
eventCallback(request.Event{Type: sync.EvNewOptimisticUpdate, Data: update})
5252
},
53-
OnFinality: func(head types.FinalityUpdate) {
54-
log.Debug("New finality update received", "slot", head.Attested.Slot, "blockRoot", head.Attested.Hash(), "signerCount", head.Signature.SignerCount())
55-
eventCallback(request.Event{Type: sync.EvNewFinalityUpdate, Data: head})
53+
OnFinality: func(update types.FinalityUpdate) {
54+
log.Debug("New finality update received", "slot", update.Attested.Slot, "blockRoot", update.Attested.Hash(), "signerCount", update.Signature.SignerCount())
55+
eventCallback(request.Event{Type: sync.EvNewFinalityUpdate, Data: update})
5656
},
5757
OnError: func(err error) {
5858
log.Warn("Head event stream error", "err", err)
@@ -83,13 +83,17 @@ func (s *ApiServer) SendRequest(id request.ID, req request.Request) {
8383
case sync.ReqBeaconBlock:
8484
log.Debug("Beacon API: requesting block", "reqid", id, "hash", common.Hash(data))
8585
resp, err = s.api.GetBeaconBlock(common.Hash(data))
86+
case sync.ReqFinality:
87+
log.Debug("Beacon API: requesting finality update")
88+
resp, err = s.api.GetFinalityUpdate()
8689
default:
8790
}
8891

8992
if err != nil {
9093
log.Warn("Beacon API request failed", "type", reflect.TypeOf(req), "reqid", id, "err", err)
9194
s.eventCallback(request.Event{Type: request.EvFail, Data: request.RequestResponse{ID: id, Request: req}})
9295
} else {
96+
log.Debug("Beacon API request answered", "type", reflect.TypeOf(req), "reqid", id)
9397
s.eventCallback(request.Event{Type: request.EvResponse, Data: request.RequestResponse{ID: id, Request: req, Response: resp}})
9498
}
9599
}()

beacon/light/api/light_api.go

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/ethereum/go-ethereum/beacon/types"
3333
"github.com/ethereum/go-ethereum/common"
3434
"github.com/ethereum/go-ethereum/common/hexutil"
35+
"github.com/ethereum/go-ethereum/log"
3536
)
3637

3738
var (
@@ -184,46 +185,56 @@ func (api *BeaconLightApi) GetBestUpdatesAndCommittees(firstPeriod, count uint64
184185
return updates, committees, nil
185186
}
186187

187-
// GetOptimisticHeadUpdate fetches a signed header based on the latest available
188-
// optimistic update. Note that the signature should be verified by the caller
189-
// as its validity depends on the update chain.
188+
// GetOptimisticUpdate fetches the latest available optimistic update.
189+
// Note that the signature should be verified by the caller as its validity
190+
// depends on the update chain.
190191
//
191192
// See data structure definition here:
192193
// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/sync-protocol.md#lightclientoptimisticupdate
193-
func (api *BeaconLightApi) GetOptimisticHeadUpdate() (types.SignedHeader, error) {
194+
func (api *BeaconLightApi) GetOptimisticUpdate() (types.OptimisticUpdate, error) {
194195
resp, err := api.httpGet("/eth/v1/beacon/light_client/optimistic_update")
195196
if err != nil {
196-
return types.SignedHeader{}, err
197+
return types.OptimisticUpdate{}, err
197198
}
198-
return decodeOptimisticHeadUpdate(resp)
199+
return decodeOptimisticUpdate(resp)
199200
}
200201

201-
func decodeOptimisticHeadUpdate(enc []byte) (types.SignedHeader, error) {
202+
func decodeOptimisticUpdate(enc []byte) (types.OptimisticUpdate, error) {
202203
var data struct {
203-
Data struct {
204-
Header jsonBeaconHeader `json:"attested_header"`
205-
Aggregate types.SyncAggregate `json:"sync_aggregate"`
206-
SignatureSlot common.Decimal `json:"signature_slot"`
204+
Version string
205+
Data struct {
206+
Attested jsonHeaderWithExecProof `json:"attested_header"`
207+
Aggregate types.SyncAggregate `json:"sync_aggregate"`
208+
SignatureSlot common.Decimal `json:"signature_slot"`
207209
} `json:"data"`
208210
}
209211
if err := json.Unmarshal(enc, &data); err != nil {
210-
return types.SignedHeader{}, err
212+
return types.OptimisticUpdate{}, err
213+
}
214+
// Decode the execution payload headers.
215+
attestedExecHeader, err := types.ExecutionHeaderFromJSON(data.Version, data.Data.Attested.Execution)
216+
if err != nil {
217+
return types.OptimisticUpdate{}, fmt.Errorf("invalid attested header: %v", err)
211218
}
212-
if data.Data.Header.Beacon.StateRoot == (common.Hash{}) {
219+
if data.Data.Attested.Beacon.StateRoot == (common.Hash{}) {
213220
// workaround for different event encoding format in Lodestar
214221
if err := json.Unmarshal(enc, &data.Data); err != nil {
215-
return types.SignedHeader{}, err
222+
return types.OptimisticUpdate{}, err
216223
}
217224
}
218225

219226
if len(data.Data.Aggregate.Signers) != params.SyncCommitteeBitmaskSize {
220-
return types.SignedHeader{}, errors.New("invalid sync_committee_bits length")
227+
return types.OptimisticUpdate{}, errors.New("invalid sync_committee_bits length")
221228
}
222229
if len(data.Data.Aggregate.Signature) != params.BLSSignatureSize {
223-
return types.SignedHeader{}, errors.New("invalid sync_committee_signature length")
230+
return types.OptimisticUpdate{}, errors.New("invalid sync_committee_signature length")
224231
}
225-
return types.SignedHeader{
226-
Header: data.Data.Header.Beacon,
232+
return types.OptimisticUpdate{
233+
Attested: types.HeaderWithExecProof{
234+
Header: data.Data.Attested.Beacon,
235+
PayloadHeader: attestedExecHeader,
236+
PayloadBranch: data.Data.Attested.ExecutionBranch,
237+
},
227238
Signature: data.Data.Aggregate,
228239
SignatureSlot: uint64(data.Data.SignatureSlot),
229240
}, nil
@@ -411,7 +422,7 @@ func decodeHeadEvent(enc []byte) (uint64, common.Hash, error) {
411422

412423
type HeadEventListener struct {
413424
OnNewHead func(slot uint64, blockRoot common.Hash)
414-
OnSignedHead func(head types.SignedHeader)
425+
OnOptimistic func(head types.OptimisticUpdate)
415426
OnFinality func(head types.FinalityUpdate)
416427
OnError func(err error)
417428
}
@@ -449,21 +460,35 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
449460
defer wg.Done()
450461

451462
// Request initial data.
463+
log.Trace("Requesting initial head header")
452464
if head, _, _, err := api.GetHeader(common.Hash{}); err == nil {
465+
log.Trace("Retrieved initial head header", "slot", head.Slot, "hash", head.Hash())
453466
listener.OnNewHead(head.Slot, head.Hash())
467+
} else {
468+
log.Debug("Failed to retrieve initial head header", "error", err)
454469
}
455-
if signedHead, err := api.GetOptimisticHeadUpdate(); err == nil {
456-
listener.OnSignedHead(signedHead)
470+
log.Trace("Requesting initial optimistic update")
471+
if optimisticUpdate, err := api.GetOptimisticUpdate(); err == nil {
472+
log.Trace("Retrieved initial optimistic update", "slot", optimisticUpdate.Attested.Slot, "hash", optimisticUpdate.Attested.Hash())
473+
listener.OnOptimistic(optimisticUpdate)
474+
} else {
475+
log.Debug("Failed to retrieve initial optimistic update", "error", err)
457476
}
477+
log.Trace("Requesting initial finality update")
458478
if finalityUpdate, err := api.GetFinalityUpdate(); err == nil {
479+
log.Trace("Retrieved initial finality update", "slot", finalityUpdate.Finalized.Slot, "hash", finalityUpdate.Finalized.Hash())
459480
listener.OnFinality(finalityUpdate)
481+
} else {
482+
log.Debug("Failed to retrieve initial finality update", "error", err)
460483
}
461484

485+
log.Trace("Starting event stream processing loop")
462486
// Receive the stream.
463487
var stream *eventsource.Stream
464488
select {
465489
case stream = <-streamCh:
466490
case <-ctx.Done():
491+
log.Trace("Stopping event stream processing loop")
467492
return
468493
}
469494

@@ -474,8 +499,10 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
474499

475500
case event, ok := <-stream.Events:
476501
if !ok {
502+
log.Trace("Event stream closed")
477503
return
478504
}
505+
log.Trace("New event received from event stream", "type", event.Event())
479506
switch event.Event() {
480507
case "head":
481508
slot, blockRoot, err := decodeHeadEvent([]byte(event.Data()))
@@ -485,9 +512,9 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
485512
listener.OnError(fmt.Errorf("error decoding head event: %v", err))
486513
}
487514
case "light_client_optimistic_update":
488-
signedHead, err := decodeOptimisticHeadUpdate([]byte(event.Data()))
515+
optimisticUpdate, err := decodeOptimisticUpdate([]byte(event.Data()))
489516
if err == nil {
490-
listener.OnSignedHead(signedHead)
517+
listener.OnOptimistic(optimisticUpdate)
491518
} else {
492519
listener.OnError(fmt.Errorf("error decoding optimistic update event: %v", err))
493520
}
@@ -521,7 +548,8 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
521548
// established. It can only return nil when the context is canceled.
522549
func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadEventListener) *eventsource.Stream {
523550
for retry := true; retry; retry = ctxSleep(ctx, 5*time.Second) {
524-
path := "/eth/v1/events?topics=head&topics=light_client_optimistic_update&topics=light_client_finality_update"
551+
path := "/eth/v1/events?topics=head&topics=light_client_finality_update&topics=light_client_optimistic_update"
552+
log.Trace("Sending event subscription request")
525553
req, err := http.NewRequestWithContext(ctx, "GET", api.url+path, nil)
526554
if err != nil {
527555
listener.OnError(fmt.Errorf("error creating event subscription request: %v", err))
@@ -535,6 +563,7 @@ func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadE
535563
listener.OnError(fmt.Errorf("error creating event subscription: %v", err))
536564
continue
537565
}
566+
log.Trace("Successfully created event stream")
538567
return stream
539568
}
540569
return nil

0 commit comments

Comments
 (0)