@@ -57,7 +57,21 @@ func Instrument(clients ...Client) (Client, error) {
5757 return nil , errors .New ("clients empty" )
5858 }
5959
60- return newMulti (clients ), nil
60+ // TODO(gsora): remove once the implementation is agreed upon and
61+ // wiring is complete.
62+ fb := NewFallbackClient (0 , [4 ]byte {}, nil )
63+
64+ return newMulti (clients , fb ), nil
65+ }
66+
67+ // InstrumentWithFallback returns a new multi instrumented client using the provided clients as backends and fallback
68+ // respectively.
69+ func InstrumentWithFallback (fallback * FallbackClient , clients ... Client ) (Client , error ) {
70+ if len (clients ) == 0 {
71+ return nil , errors .New ("clients empty" )
72+ }
73+
74+ return newMulti (clients , fallback ), nil
6175}
6276
6377// WithSyntheticDuties wraps the provided client adding synthetic duties.
@@ -71,43 +85,58 @@ func WithSyntheticDuties(cl Client) Client {
7185
7286// NewMultiHTTP returns a new instrumented multi eth2 http client.
7387func NewMultiHTTP (timeout time.Duration , forkVersion [4 ]byte , addresses ... string ) (Client , error ) {
88+ return Instrument (newClients (timeout , forkVersion , addresses )... )
89+ }
90+
91+ // newClients returns a slice of Client initialized with the provided settings.
92+ func newClients (timeout time.Duration , forkVersion [4 ]byte , addresses []string ) []Client {
7493 var clients []Client
7594 for _ , address := range addresses {
76- parameters := []eth2http.Parameter {
77- eth2http .WithLogLevel (zeroLogInfo ),
78- eth2http .WithAddress (address ),
79- eth2http .WithTimeout (timeout ),
80- eth2http .WithAllowDelayedStart (true ),
81- eth2http .WithEnforceJSON (featureset .Enabled (featureset .JSONRequests )),
82- }
95+ clients = append (clients , newBeaconClient (timeout , forkVersion , address ))
96+ }
8397
84- cl := newLazy (func (ctx context.Context ) (Client , error ) {
85- eth2Svc , err := eth2http .New (ctx , parameters ... )
86- if err != nil {
87- return nil , wrapError (ctx , err , "new eth2 client" , z .Str ("address" , address ))
88- }
89- eth2Http , ok := eth2Svc .(* eth2http.Service )
90- if ! ok {
91- return nil , errors .New ("invalid eth2 http service" )
92- }
98+ return clients
99+ }
93100
94- adaptedCl := AdaptEth2HTTP (eth2Http , timeout )
95- adaptedCl .SetForkVersion (forkVersion )
101+ // newBeaconClient returns a Client with the provided settings.
102+ func newBeaconClient (timeout time.Duration , forkVersion [4 ]byte , address string ) Client {
103+ parameters := []eth2http.Parameter {
104+ eth2http .WithLogLevel (zeroLogInfo ),
105+ eth2http .WithAddress (address ),
106+ eth2http .WithTimeout (timeout ),
107+ eth2http .WithAllowDelayedStart (true ),
108+ eth2http .WithEnforceJSON (featureset .Enabled (featureset .JSONRequests )),
109+ }
96110
97- return adaptedCl , nil
98- })
111+ cl := newLazy (func (ctx context.Context ) (Client , error ) {
112+ eth2Svc , err := eth2http .New (ctx , parameters ... )
113+ if err != nil {
114+ return nil , wrapError (ctx , err , "new eth2 client" , z .Str ("address" , address ))
115+ }
116+ eth2Http , ok := eth2Svc .(* eth2http.Service )
117+ if ! ok {
118+ return nil , errors .New ("invalid eth2 http service" )
119+ }
99120
100- clients = append (clients , cl )
101- }
121+ adaptedCl := AdaptEth2HTTP (eth2Http , timeout )
122+ adaptedCl .SetForkVersion (forkVersion )
123+
124+ return adaptedCl , nil
125+ })
102126
103- return Instrument (clients ... )
127+ return cl
128+ }
129+
130+ type provideArgs struct {
131+ client Client
132+ fallback * FallbackClient
104133}
105134
106135// provide calls the work function with each client in parallel, returning the
107136// first successful result or first error.
108137// The bestIdxFunc is called with the index of the client returning a successful response.
109- func provide [O any ](ctx context.Context , clients []Client ,
110- work forkjoin.Work [Client , O ], isSuccessFunc func (O ) bool , bestSelector * bestSelector ,
138+ func provide [O any ](ctx context.Context , clients []Client , fallback * FallbackClient ,
139+ work forkjoin.Work [provideArgs , O ], isSuccessFunc func (O ) bool , bestSelector * bestSelector ,
111140) (O , error ) {
112141 if isSuccessFunc == nil {
113142 isSuccessFunc = func (O ) bool { return true }
@@ -118,12 +147,15 @@ func provide[O any](ctx context.Context, clients []Client,
118147 forkjoin .WithWorkers (len (clients )),
119148 )
120149 for _ , client := range clients {
121- fork (client )
150+ fork (provideArgs {
151+ client : client ,
152+ fallback : fallback ,
153+ })
122154 }
123155 defer cancel ()
124156
125157 var (
126- nokResp forkjoin.Result [Client , O ]
158+ nokResp forkjoin.Result [provideArgs , O ]
127159 hasNokResp bool
128160 zero O
129161 )
@@ -132,7 +164,7 @@ func provide[O any](ctx context.Context, clients []Client,
132164 return zero , ctx .Err ()
133165 } else if res .Err == nil && isSuccessFunc (res .Output ) {
134166 if bestSelector != nil {
135- bestSelector .Increment (res .Input .Address ())
167+ bestSelector .Increment (res .Input .client . Address ())
136168 }
137169
138170 return res .Output , nil
@@ -154,10 +186,10 @@ func provide[O any](ctx context.Context, clients []Client,
154186type empty struct {}
155187
156188// submit proxies provide, but returns nil instead of a successful result.
157- func submit (ctx context.Context , clients []Client , work func (context.Context , Client ) error , selector * bestSelector ) error {
158- _ , err := provide (ctx , clients ,
159- func (ctx context.Context , cl Client ) (empty , error ) {
160- return empty {}, work (ctx , cl )
189+ func submit (ctx context.Context , clients []Client , fallback * FallbackClient , work func (context.Context , provideArgs ) error , selector * bestSelector ) error {
190+ _ , err := provide (ctx , clients , fallback ,
191+ func (ctx context.Context , args provideArgs ) (empty , error ) {
192+ return empty {}, work (ctx , args )
161193 },
162194 nil , selector ,
163195 )
0 commit comments