Skip to content

Commit 6ca5bf7

Browse files
gsoraDiogoSantoss
authored andcommitted
app/eth2wrap: fallback beacon nodes (#3342)
Implement a way to provide eth2wrap with two classes of beacon nodes addresses: standard and fallback beacon nodes. When one of the multi BN calls fails, eth2wrap wrappers will try to get an available fallback BN from a list and re-do the call on that. If no fallback BNs is specified, return the original error. If the fallback BN call fails, return fallback error instead. This PR firstly introduces concepts and code, will introduce CLI parameters and initialization code later. category: feature ticket: #3328
1 parent d6f8178 commit 6ca5bf7

File tree

7 files changed

+859
-164
lines changed

7 files changed

+859
-164
lines changed

app/eth2wrap/eth2wrap.go

Lines changed: 65 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,21 @@ func Instrument(clients ...Client) (Client, error) {
5757
return nil, errors.New("clients empty")
5858
}
5959

60-
return newMulti(clients), nil
60+
// TODO(gsora): remove once the implementation is agreed upon and
61+
// wiring is complete.
62+
fb := NewFallbackClient(0, [4]byte{}, nil)
63+
64+
return newMulti(clients, fb), nil
65+
}
66+
67+
// InstrumentWithFallback returns a new multi instrumented client using the provided clients as backends and fallback
68+
// respectively.
69+
func InstrumentWithFallback(fallback *FallbackClient, clients ...Client) (Client, error) {
70+
if len(clients) == 0 {
71+
return nil, errors.New("clients empty")
72+
}
73+
74+
return newMulti(clients, fallback), nil
6175
}
6276

6377
// WithSyntheticDuties wraps the provided client adding synthetic duties.
@@ -71,43 +85,58 @@ func WithSyntheticDuties(cl Client) Client {
7185

7286
// NewMultiHTTP returns a new instrumented multi eth2 http client.
7387
func NewMultiHTTP(timeout time.Duration, forkVersion [4]byte, addresses ...string) (Client, error) {
88+
return Instrument(newClients(timeout, forkVersion, addresses)...)
89+
}
90+
91+
// newClients returns a slice of Client initialized with the provided settings.
92+
func newClients(timeout time.Duration, forkVersion [4]byte, addresses []string) []Client {
7493
var clients []Client
7594
for _, address := range addresses {
76-
parameters := []eth2http.Parameter{
77-
eth2http.WithLogLevel(zeroLogInfo),
78-
eth2http.WithAddress(address),
79-
eth2http.WithTimeout(timeout),
80-
eth2http.WithAllowDelayedStart(true),
81-
eth2http.WithEnforceJSON(featureset.Enabled(featureset.JSONRequests)),
82-
}
95+
clients = append(clients, newBeaconClient(timeout, forkVersion, address))
96+
}
8397

84-
cl := newLazy(func(ctx context.Context) (Client, error) {
85-
eth2Svc, err := eth2http.New(ctx, parameters...)
86-
if err != nil {
87-
return nil, wrapError(ctx, err, "new eth2 client", z.Str("address", address))
88-
}
89-
eth2Http, ok := eth2Svc.(*eth2http.Service)
90-
if !ok {
91-
return nil, errors.New("invalid eth2 http service")
92-
}
98+
return clients
99+
}
93100

94-
adaptedCl := AdaptEth2HTTP(eth2Http, timeout)
95-
adaptedCl.SetForkVersion(forkVersion)
101+
// newBeaconClient returns a Client with the provided settings.
102+
func newBeaconClient(timeout time.Duration, forkVersion [4]byte, address string) Client {
103+
parameters := []eth2http.Parameter{
104+
eth2http.WithLogLevel(zeroLogInfo),
105+
eth2http.WithAddress(address),
106+
eth2http.WithTimeout(timeout),
107+
eth2http.WithAllowDelayedStart(true),
108+
eth2http.WithEnforceJSON(featureset.Enabled(featureset.JSONRequests)),
109+
}
96110

97-
return adaptedCl, nil
98-
})
111+
cl := newLazy(func(ctx context.Context) (Client, error) {
112+
eth2Svc, err := eth2http.New(ctx, parameters...)
113+
if err != nil {
114+
return nil, wrapError(ctx, err, "new eth2 client", z.Str("address", address))
115+
}
116+
eth2Http, ok := eth2Svc.(*eth2http.Service)
117+
if !ok {
118+
return nil, errors.New("invalid eth2 http service")
119+
}
99120

100-
clients = append(clients, cl)
101-
}
121+
adaptedCl := AdaptEth2HTTP(eth2Http, timeout)
122+
adaptedCl.SetForkVersion(forkVersion)
123+
124+
return adaptedCl, nil
125+
})
102126

103-
return Instrument(clients...)
127+
return cl
128+
}
129+
130+
type provideArgs struct {
131+
client Client
132+
fallback *FallbackClient
104133
}
105134

106135
// provide calls the work function with each client in parallel, returning the
107136
// first successful result or first error.
108137
// The bestIdxFunc is called with the index of the client returning a successful response.
109-
func provide[O any](ctx context.Context, clients []Client,
110-
work forkjoin.Work[Client, O], isSuccessFunc func(O) bool, bestSelector *bestSelector,
138+
func provide[O any](ctx context.Context, clients []Client, fallback *FallbackClient,
139+
work forkjoin.Work[provideArgs, O], isSuccessFunc func(O) bool, bestSelector *bestSelector,
111140
) (O, error) {
112141
if isSuccessFunc == nil {
113142
isSuccessFunc = func(O) bool { return true }
@@ -118,12 +147,15 @@ func provide[O any](ctx context.Context, clients []Client,
118147
forkjoin.WithWorkers(len(clients)),
119148
)
120149
for _, client := range clients {
121-
fork(client)
150+
fork(provideArgs{
151+
client: client,
152+
fallback: fallback,
153+
})
122154
}
123155
defer cancel()
124156

125157
var (
126-
nokResp forkjoin.Result[Client, O]
158+
nokResp forkjoin.Result[provideArgs, O]
127159
hasNokResp bool
128160
zero O
129161
)
@@ -132,7 +164,7 @@ func provide[O any](ctx context.Context, clients []Client,
132164
return zero, ctx.Err()
133165
} else if res.Err == nil && isSuccessFunc(res.Output) {
134166
if bestSelector != nil {
135-
bestSelector.Increment(res.Input.Address())
167+
bestSelector.Increment(res.Input.client.Address())
136168
}
137169

138170
return res.Output, nil
@@ -154,10 +186,10 @@ func provide[O any](ctx context.Context, clients []Client,
154186
type empty struct{}
155187

156188
// submit proxies provide, but returns nil instead of a successful result.
157-
func submit(ctx context.Context, clients []Client, work func(context.Context, Client) error, selector *bestSelector) error {
158-
_, err := provide(ctx, clients,
159-
func(ctx context.Context, cl Client) (empty, error) {
160-
return empty{}, work(ctx, cl)
189+
func submit(ctx context.Context, clients []Client, fallback *FallbackClient, work func(context.Context, provideArgs) error, selector *bestSelector) error {
190+
_, err := provide(ctx, clients, fallback,
191+
func(ctx context.Context, args provideArgs) (empty, error) {
192+
return empty{}, work(ctx, args)
161193
},
162194
nil, selector,
163195
)

0 commit comments

Comments
 (0)