Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
}

sseListener.SubscribeChainReorgEvent(sched.HandleChainReorgEvent)
sseListener.SubscribeBlockEvent(sched.HandleBlockEvent)

feeRecipientFunc := func(pubkey core.PubKey) string {
return feeRecipientAddrByCorePubkey[pubkey]
Expand Down
2 changes: 2 additions & 0 deletions app/eth2wrap/eth2wrap_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions app/eth2wrap/httpwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,8 @@ func (h *httpAdapter) Proxy(ctx context.Context, req *http.Request) (*http.Respo
log.Debug(ctx, "Proxying request to beacon node", z.Any("url", h.address))
return h.Service.Proxy(ctx, req)
}

// ClientForAddress returns the same client since httpAdapter wraps a single address.
func (h *httpAdapter) ClientForAddress(addr string) Client {
return h
}
10 changes: 10 additions & 0 deletions app/eth2wrap/lazy.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ func (l *lazy) Address() string {
return cl.Address()
}

// ClientForAddress returns a scoped client that queries only the specified address.
func (l *lazy) ClientForAddress(addr string) Client {
cl, ok := l.getClient()
if !ok {
return l
}

return cl.ClientForAddress(addr)
}

func (l *lazy) IsActive() bool {
cl, ok := l.getClient()
if !ok {
Expand Down
20 changes: 20 additions & 0 deletions app/eth2wrap/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions app/eth2wrap/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,39 @@ func (m multi) Address() string {
return address
}

// ClientForAddress returns a scoped multi client that only queries the specified address.
// Returns the original multi client if the address is not found or is empty.
func (m multi) ClientForAddress(addr string) Client {
if addr == "" {
return m
}

// Find client matching the address
for _, cl := range m.clients {
if cl.Address() == addr {
return multi{
clients: []Client{cl},
fallbacks: m.fallbacks,
selector: m.selector,
}
}
}

// Address not found in clients, check fallbacks
for _, cl := range m.fallbacks {
if cl.Address() == addr {
return multi{
clients: []Client{cl},
fallbacks: nil,
selector: m.selector,
}
}
}

// Address not found, return original multi client
return m
}

func (m multi) IsActive() bool {
for _, cl := range m.clients {
if cl.IsActive() {
Expand Down
12 changes: 6 additions & 6 deletions app/featureset/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ func EnableForT(t *testing.T, feature Feature) {

initMu.Lock()
defer initMu.Unlock()

cache := state[feature]
state[feature] = enable

t.Cleanup(func() {
initMu.Lock()
defer initMu.Unlock()
state[feature] = cache
})

state[feature] = enable
}

// DisableForT disables a feature for testing.
Expand All @@ -133,12 +133,12 @@ func DisableForT(t *testing.T, feature Feature) {

initMu.Lock()
defer initMu.Unlock()

cache := state[feature]
state[feature] = disable

t.Cleanup(func() {
initMu.Lock()
defer initMu.Unlock()
state[feature] = cache
})

state[feature] = disable
}
5 changes: 5 additions & 0 deletions app/featureset/featureset.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ const (
// ChainSplitHalt compares locally fetched attestation's target and source to leader's proposed target and source attestation.
// In case they differ, Charon does not sign the attestation.
ChainSplitHalt = "chain_split_halt"

// FetchAttOnBlock enables fetching attestation data upon block processing event from beacon node via SSE.
// Fallback to T=1/3+300ms if block event is not received in time.
FetchAttOnBlock = "fetch_att_on_block"
)

var (
Expand All @@ -88,6 +92,7 @@ var (
QUIC: statusAlpha,
FetchOnlyCommIdx0: statusAlpha,
ChainSplitHalt: statusAlpha,
FetchAttOnBlock: statusAlpha,
// Add all features and their status here.
}

Expand Down
22 changes: 22 additions & 0 deletions app/sse/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ import (
)

type ChainReorgEventHandlerFunc func(ctx context.Context, epoch eth2p0.Epoch)
type BlockEventHandlerFunc func(ctx context.Context, slot eth2p0.Slot, bnAddr string)

type Listener interface {
SubscribeChainReorgEvent(ChainReorgEventHandlerFunc)
SubscribeBlockEvent(BlockEventHandlerFunc)
}

type listener struct {
sync.Mutex

chainReorgSubs []ChainReorgEventHandlerFunc
blockSubs []BlockEventHandlerFunc
lastReorgEpoch eth2p0.Epoch

// immutable fields
Expand All @@ -55,6 +58,7 @@ func StartListener(ctx context.Context, eth2Cl eth2wrap.Client, addresses, heade

l := &listener{
chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0),
blockSubs: make([]BlockEventHandlerFunc, 0),
genesisTime: genesisTime,
slotDuration: slotDuration,
slotsPerEpoch: slotsPerEpoch,
Expand Down Expand Up @@ -94,6 +98,13 @@ func (p *listener) SubscribeChainReorgEvent(handler ChainReorgEventHandlerFunc)
p.chainReorgSubs = append(p.chainReorgSubs, handler)
}

func (p *listener) SubscribeBlockEvent(handler BlockEventHandlerFunc) {
p.Lock()
defer p.Unlock()

p.blockSubs = append(p.blockSubs, handler)
}

func (p *listener) eventHandler(ctx context.Context, event *event, addr string) error {
switch event.Event {
case sseHeadEvent:
Expand Down Expand Up @@ -247,6 +258,8 @@ func (p *listener) handleBlockEvent(ctx context.Context, event *event, addr stri

sseBlockHistogram.WithLabelValues(addr).Observe(delay.Seconds())

p.notifyBlockEvent(ctx, eth2p0.Slot(slot), addr)

return nil
}

Expand All @@ -264,6 +277,15 @@ func (p *listener) notifyChainReorg(ctx context.Context, epoch eth2p0.Epoch) {
}
}

func (p *listener) notifyBlockEvent(ctx context.Context, slot eth2p0.Slot, bnAddr string) {
p.Lock()
defer p.Unlock()

for _, sub := range p.blockSubs {
sub(ctx, slot, bnAddr)
}
}

// computeDelay computes the delay between start of the slot and receiving the event.
func (p *listener) computeDelay(slot uint64, eventTS time.Time, delayOKFunc func(delay time.Duration) bool) (time.Duration, bool) {
slotStartTime := p.genesisTime.Add(time.Duration(slot) * p.slotDuration)
Expand Down
50 changes: 50 additions & 0 deletions app/sse/listener_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,40 @@ func TestHandleEvents(t *testing.T) {
},
err: errors.New("parse depth to uint64"),
},
{
name: "block event happy path",
event: &event{
Event: sseBlockEvent,
Data: []byte(`{"slot":"42", "block":"0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "execution_optimistic": false}`),
Timestamp: time.Now(),
},
err: nil,
},
{
name: "block event incompatible data payload",
event: &event{
Event: sseBlockEvent,
Data: []byte(`"error"`),
Timestamp: time.Now(),
},
err: errors.New("unmarshal SSE block event"),
},
{
name: "block event parse slot",
event: &event{
Event: sseBlockEvent,
Data: []byte(`{"slot":"invalid", "block":"0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "execution_optimistic": false}`),
Timestamp: time.Now(),
},
err: errors.New("parse slot to uint64"),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
l := &listener{
chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0),
blockSubs: make([]BlockEventHandlerFunc, 0),
slotDuration: 12 * time.Second,
slotsPerEpoch: 32,
genesisTime: time.Date(2020, 12, 1, 12, 0, 23, 0, time.UTC),
Expand Down Expand Up @@ -133,6 +161,28 @@ func TestSubscribeNotifyChainReorg(t *testing.T) {
require.Equal(t, eth2p0.Epoch(10), reportedEpochs[1])
}

func TestSubscribeNotifyBlockEvent(t *testing.T) {
ctx := t.Context()
l := &listener{
blockSubs: make([]BlockEventHandlerFunc, 0),
}

reportedSlots := make([]eth2p0.Slot, 0)

l.SubscribeBlockEvent(func(_ context.Context, slot eth2p0.Slot, bnAddr string) {
reportedSlots = append(reportedSlots, slot)
})

l.notifyBlockEvent(ctx, eth2p0.Slot(100), "http://test-bn:5052")
l.notifyBlockEvent(ctx, eth2p0.Slot(100), "http://test-bn:5052") // Duplicate should be reported (no dedup for block events)
l.notifyBlockEvent(ctx, eth2p0.Slot(101), "http://test-bn:5052")

require.Len(t, reportedSlots, 3)
require.Equal(t, eth2p0.Slot(100), reportedSlots[0])
require.Equal(t, eth2p0.Slot(100), reportedSlots[1])
require.Equal(t, eth2p0.Slot(101), reportedSlots[2])
}

func TestComputeDelay(t *testing.T) {
genesisTimeString := "2020-12-01T12:00:23+00:00"
genesisTime, err := time.Parse(time.RFC3339, genesisTimeString)
Expand Down
Loading
Loading