Skip to content

Commit ef6a851

Browse files
JuArceuri-99MauroToscanoMarcosNicolau
authored
fix(operator): manage subscription to events without panic (#1729)
Co-authored-by: Urix <[email protected]> Co-authored-by: Mauro Toscano <[email protected]> Co-authored-by: Marcos Nicolau <[email protected]> Co-authored-by: Marcos Nicolau <[email protected]>
1 parent 05191dc commit ef6a851

File tree

9 files changed

+124
-89
lines changed

9 files changed

+124
-89
lines changed

aggregator/cmd/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ func aggregatorMain(ctx *cli.Context) error {
6060

6161
// Listen for new task created in the ServiceManager contract in a separate goroutine, both V1 and V2 subscriptions:
6262
go func() {
63-
listenErr := aggregator.SubscribeToNewTasks()
64-
if listenErr != nil {
65-
aggregatorConfig.BaseConfig.Logger.Fatal("Error subscribing for new tasks", "err", listenErr)
63+
listenErrPair := aggregator.SubscribeToNewTasks()
64+
if listenErrPair != nil {
65+
aggregatorConfig.BaseConfig.Logger.Fatal("Error subscribing for new tasks", "err", listenErrPair)
6666
}
6767
}()
6868

aggregator/pkg/aggregator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type Aggregator struct {
4646
avsReader *chainio.AvsReader
4747
avsSubscriber *chainio.AvsSubscriber
4848
avsWriter *chainio.AvsWriter
49-
taskSubscriber chan error
49+
taskSubscriber chan chainio.ErrorPair
5050
blsAggregationService blsagg.BlsAggregationService
5151

5252
// BLS Signature Service returns an Index

aggregator/pkg/subscriber.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
package pkg
22

3-
func (agg *Aggregator) SubscribeToNewTasks() error {
4-
err := agg.subscribeToNewTasks()
5-
if err != nil {
6-
return err
3+
import "github.com/yetanotherco/aligned_layer/core/chainio"
4+
5+
func (agg *Aggregator) SubscribeToNewTasks() *chainio.ErrorPair {
6+
errorPair := agg.subscribeToNewTasks()
7+
if errorPair != nil {
8+
return errorPair
79
}
810

911
for {
1012
select {
1113
case err := <-agg.taskSubscriber:
1214
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to subscribe to new tasks", "err", err)
13-
err = agg.subscribeToNewTasks()
14-
if err != nil {
15-
return err
15+
errorPair = agg.subscribeToNewTasks()
16+
if errorPair != nil {
17+
return errorPair
1618
}
1719
case newBatch := <-agg.NewBatchChan:
1820
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task")
@@ -21,14 +23,12 @@ func (agg *Aggregator) SubscribeToNewTasks() error {
2123
}
2224
}
2325

24-
func (agg *Aggregator) subscribeToNewTasks() error {
25-
var err error
26-
27-
agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan)
26+
func (agg *Aggregator) subscribeToNewTasks() *chainio.ErrorPair {
27+
errorPair := agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan, agg.taskSubscriber)
2828

29-
if err != nil {
30-
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", err)
29+
if errorPair != nil {
30+
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", errorPair)
3131
}
3232

33-
return err
33+
return errorPair
3434
}

config-files/config-operator-1-ethereum-package.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
environment: 'development'
44
aligned_layer_deployment_config_file_path: './contracts/script/output/devnet/alignedlayer_deployment_output.json'
55
eigen_layer_deployment_config_file_path: './contracts/script/output/devnet/eigenlayer_deployment_output.json'
6-
eth_rpc_url: "http://localhost:8545"
7-
eth_rpc_url_fallback: "http://localhost:8552"
8-
eth_ws_url: "ws://localhost:8546"
9-
eth_ws_url_fallback: "ws://localhost:8553"
6+
eth_rpc_url: "http://localhost:8552"
7+
eth_rpc_url_fallback: "http://localhost:8559"
8+
eth_ws_url: "ws://localhost:8553"
9+
eth_ws_url_fallback: "ws://localhost:8560"
1010
eigen_metrics_ip_port_address: 'localhost:9090'
1111

1212
## ECDSA Configurations

core/chainio/avs_reader.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,15 @@ func (r *AvsReader) IsOperatorRegistered(address ethcommon.Address) (bool, error
7272
}
7373

7474
func (r *AvsReader) DisabledVerifiers() (*big.Int, error) {
75-
return r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.DisabledVerifiers(&bind.CallOpts{})
75+
num, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.DisabledVerifiers(&bind.CallOpts{})
76+
if err != nil {
77+
// Retry with fallback client
78+
num, err = r.AvsContractBindings.ServiceManagerFallback.ContractAlignedLayerServiceManagerCaller.DisabledVerifiers(&bind.CallOpts{})
79+
if err != nil {
80+
r.logger.Error("Failed to fetch DisabledVerifiers", "err", err)
81+
}
82+
}
83+
return num, err
7684
}
7785

7886
// Returns all the "NewBatchV3" logs that have not been responded starting from the given block number

core/chainio/avs_subscriber.go

Lines changed: 69 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import (
88

99
"github.com/ethereum/go-ethereum/accounts/abi/bind"
1010
ethcommon "github.com/ethereum/go-ethereum/common"
11+
"github.com/ethereum/go-ethereum/event"
1112

1213
"github.com/ethereum/go-ethereum/core/types"
1314
servicemanager "github.com/yetanotherco/aligned_layer/contracts/bindings/AlignedLayerServiceManager"
1415
retry "github.com/yetanotherco/aligned_layer/core"
1516
"github.com/yetanotherco/aligned_layer/core/config"
1617

18+
"fmt"
1719
sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
1820
"github.com/ethereum/go-ethereum/crypto"
1921
)
@@ -43,6 +45,11 @@ type AvsSubscriber struct {
4345
logger sdklogging.Logger
4446
}
4547

48+
type ErrorPair struct {
49+
ErrorMainRPC error
50+
ErrorFallbackRPC error
51+
}
52+
4653
func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber, error) {
4754
avsContractBindings, err := NewAvsServiceBindings(
4855
baseConfig.AlignedLayerDeploymentConfig.AlignedLayerServiceManagerAddr,
@@ -61,26 +68,27 @@ func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber,
6168
}, nil
6269
}
6370

64-
func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) (chan error, error) {
71+
func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, errorPairChannel chan ErrorPair) *ErrorPair {
6572
// Create a new channel to receive new tasks
6673
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)
6774

6875
// Subscribe to new tasks
69-
sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
70-
if err != nil {
71-
s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err)
72-
return nil, err
76+
sub, errMain := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
77+
if errMain != nil {
78+
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errMain", fmt.Sprintf("%v", errMain))
7379
}
7480

75-
subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
76-
if err != nil {
77-
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err)
78-
return nil, err
81+
subFallback, errFallback := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
82+
if errFallback != nil {
83+
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errFallback", fmt.Sprintf("%v", errFallback))
7984
}
80-
s.logger.Info("Subscribed to new AlignedLayer V2 tasks")
8185

82-
// create a new channel to foward errors
83-
errorChannel := make(chan error)
86+
if errMain != nil && errFallback != nil {
87+
s.logger.Error("Failed to subscribe to new AlignedLayer V2 tasks with both RPCs", "errMain", errMain, "errFallback", errFallback)
88+
return &ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
89+
}
90+
91+
s.logger.Info("Subscribed to new AlignedLayer V2 tasks")
8492

8593
pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)
8694

@@ -109,49 +117,56 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema
109117

110118
// Handle errors and resubscribe
111119
go func() {
112-
for {
120+
var errMain, errFallback error
121+
var auxSub, auxSubFallback event.Subscription
122+
for errMain == nil || errFallback == nil { //while one is active
113123
select {
114124
case err := <-sub.Err():
115-
s.logger.Warn("Error in new task subscription", "err", err)
116-
sub.Unsubscribe()
117-
sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
118-
if err != nil {
119-
errorChannel <- err
125+
s.logger.Warn("Error in new task subscription of main connection", "err", err)
126+
127+
auxSub, errMain = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
128+
if errMain == nil {
129+
sub = auxSub // update the subscription only if it was successful
130+
s.logger.Info("Main connection resubscribed to new task subscription")
120131
}
121132
case err := <-subFallback.Err():
122-
s.logger.Warn("Error in fallback new task subscription", "err", err)
123-
subFallback.Unsubscribe()
124-
subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
125-
if err != nil {
126-
errorChannel <- err
133+
s.logger.Warn("Error in new task subscription of fallback connection", "err", err)
134+
135+
auxSubFallback, errFallback = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
136+
if errFallback == nil {
137+
subFallback = auxSubFallback // update the subscription only if it was successful
138+
s.logger.Info("Resubscribed to fallback new task subscription")
127139
}
128140
}
129141
}
142+
errorPairChannel <- ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
130143
}()
131144

132-
return errorChannel, nil
145+
return nil
133146
}
134147

135-
func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) (chan error, error) {
148+
func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, errorPairChannel chan ErrorPair) *ErrorPair {
136149
// Create a new channel to receive new tasks
137150
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3)
138151

152+
s.logger.Info("Starting subscription to new AlignedLayer V3 tasks")
139153
// Subscribe to new tasks
140-
sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
141-
if err != nil {
142-
s.logger.Error("Primary failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
143-
return nil, err
154+
sub, errMain := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
155+
if errMain != nil {
156+
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "err", fmt.Sprintf("%v", errMain))
144157
}
145158

146-
subFallback, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
147-
if err != nil {
148-
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
149-
return nil, err
159+
subFallback, errFallback := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
160+
if errFallback != nil {
161+
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "err", fmt.Sprintf("%v", errFallback))
150162
}
151-
s.logger.Info("Subscribed to new AlignedLayer V3 tasks")
152163

153-
// create a new channel to foward errors
154-
errorChannel := make(chan error)
164+
if errMain != nil && errFallback != nil {
165+
s.logger.Error("Failed to subscribe to new AlignedLayer V3 tasks with both RPCs", "errMain", errMain, "errFallback", errFallback)
166+
return &ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
167+
}
168+
169+
s.logger.Info("Subscribed to new AlignedLayer V3 tasks")
155170

156171
pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)
157172

@@ -180,27 +195,33 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema
180195

181196
// Handle errors and resubscribe
182197
go func() {
183-
for {
198+
s.logger.Info("Starting error handling goroutine")
199+
var errMain, errFallback error
200+
var auxSub, auxSubFallback event.Subscription
201+
for errMain == nil || errFallback == nil { //while one is active
184202
select {
185203
case err := <-sub.Err():
186-
s.logger.Warn("Error in new task subscription", "err", err)
187-
sub.Unsubscribe()
188-
sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
189-
if err != nil {
190-
errorChannel <- err
204+
s.logger.Warn("Error in new task subscription of main connection", "err", err)
205+
206+
auxSub, errMain = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
207+
if errMain == nil {
208+
sub = auxSub // update the subscription only if it was successful
209+
s.logger.Info("Resubscribed to fallback new task subscription")
191210
}
192211
case err := <-subFallback.Err():
193-
s.logger.Warn("Error in fallback new task subscription", "err", err)
194-
subFallback.Unsubscribe()
195-
subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
196-
if err != nil {
197-
errorChannel <- err
212+
s.logger.Warn("Error in new task subscription of fallback connection", "err", err)
213+
214+
auxSubFallback, errFallback = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
215+
if errFallback == nil {
216+
subFallback = auxSubFallback // update the subscription only if it was successful
217+
s.logger.Info("Resubscribed to fallback new task subscription")
198218
}
199219
}
200220
}
221+
errorPairChannel <- ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
201222
}()
202223

203-
return errorChannel, nil
224+
return nil
204225
}
205226

206227
func (s *AvsSubscriber) processNewBatchV2(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) {

core/chainio/retryable.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package chainio
22

33
import (
44
"context"
5+
"github.com/rs/zerolog/log"
56
"math/big"
67

78
"github.com/ethereum/go-ethereum"
@@ -206,6 +207,7 @@ func SubscribeToNewTasksV2Retryable(
206207
config *retry.RetryParams,
207208
) (event.Subscription, error) {
208209
subscribe_func := func() (event.Subscription, error) {
210+
log.Info().Msg("Subscribing to NewBatchV2")
209211
return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot)
210212
}
211213
return retry.RetryWithData(subscribe_func, config)
@@ -225,6 +227,7 @@ func SubscribeToNewTasksV3Retryable(
225227
config *retry.RetryParams,
226228
) (event.Subscription, error) {
227229
subscribe_func := func() (event.Subscription, error) {
230+
log.Info().Msg("Subscribing to NewBatchV3")
228231
return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot)
229232
}
230233
return retry.RetryWithData(subscribe_func, config)

network_params.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
participants:
22
- el_type: reth
33
cl_type: lighthouse
4-
count: 2
4+
count: 3
55
validator_count: 32
66

77
ethereum_metrics_exporter_enabled: true

0 commit comments

Comments
 (0)