Skip to content

Commit ffc14a2

Browse files
PatStilesJuArce
andauthored
feat(retries): Use config params for retry parameters (#1467)
Co-authored-by: JuArce <[email protected]>
1 parent 2408e06 commit ffc14a2

File tree

7 files changed

+203
-121
lines changed

7 files changed

+203
-121
lines changed

aggregator/pkg/server.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
5454
// If that's the case, we won't know about the task at this point
5555
// so we make GetTaskIndex retryable, waiting for some seconds,
5656
// before trying to fetch the task again from the map.
57-
taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash)
57+
taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash, retry.NetworkRetryParams())
5858

5959
if err != nil {
6060
agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum")
@@ -114,10 +114,9 @@ func (agg *Aggregator) ServerRunning(_ *struct{}, reply *int64) error {
114114
Checks Internal mapping for Signed Task Response, returns its TaskIndex.
115115
- All errors are considered Transient Errors
116116
- Retry times (3 retries): 1 sec, 2 sec, 4 sec
117-
118117
TODO: We should refactor the retry duration considering extending it to a larger time or number of retries, at least somewhere between 1 and 2 blocks
119118
*/
120-
func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint32, error) {
119+
func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte, config *retry.RetryParams) (uint32, error) {
121120
getTaskIndex_func := func() (uint32, error) {
122121
agg.taskMutex.Lock()
123122
taskIndex, ok := agg.batchesIdxByIdentifierHash[batchIdentifierHash]
@@ -129,5 +128,5 @@ func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint
129128
}
130129
}
131130

132-
return retry.RetryWithData(getTaskIndex_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
131+
return retry.RetryWithData(getTaskIndex_func, config)
133132
}

core/chainio/avs_subscriber.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,15 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema
6666
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)
6767

6868
// Subscribe to new tasks
69-
sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil)
69+
sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
7070
if err != nil {
71-
s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err)
71+
s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err)
7272
return nil, err
7373
}
7474

75-
subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil)
75+
subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
7676
if err != nil {
77-
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err)
77+
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err)
7878
return nil, err
7979
}
8080
s.logger.Info("Subscribed to new AlignedLayer V2 tasks")
@@ -114,14 +114,14 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema
114114
case err := <-sub.Err():
115115
s.logger.Warn("Error in new task subscription", "err", err)
116116
sub.Unsubscribe()
117-
sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil)
117+
sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
118118
if err != nil {
119119
errorChannel <- err
120120
}
121121
case err := <-subFallback.Err():
122122
s.logger.Warn("Error in fallback new task subscription", "err", err)
123123
subFallback.Unsubscribe()
124-
subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil)
124+
subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
125125
if err != nil {
126126
errorChannel <- err
127127
}
@@ -137,13 +137,13 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema
137137
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3)
138138

139139
// Subscribe to new tasks
140-
sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil)
140+
sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
141141
if err != nil {
142142
s.logger.Error("Primary failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
143143
return nil, err
144144
}
145145

146-
subFallback, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil)
146+
subFallback, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
147147
if err != nil {
148148
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
149149
return nil, err
@@ -185,14 +185,14 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema
185185
case err := <-sub.Err():
186186
s.logger.Warn("Error in new task subscription", "err", err)
187187
sub.Unsubscribe()
188-
sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil)
188+
sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
189189
if err != nil {
190190
errorChannel <- err
191191
}
192192
case err := <-subFallback.Err():
193193
s.logger.Warn("Error in fallback new task subscription", "err", err)
194194
subFallback.Unsubscribe()
195-
subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil)
195+
subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
196196
if err != nil {
197197
errorChannel <- err
198198
}
@@ -258,7 +258,7 @@ func (s *AvsSubscriber) processNewBatchV3(batch *servicemanager.ContractAlignedL
258258
// getLatestNotRespondedTaskFromEthereum queries the blockchain for the latest not responded task using the FilterNewBatch method.
259259
func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, error) {
260260

261-
latestBlock, err := s.BlockNumberRetryable(context.Background())
261+
latestBlock, err := s.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams())
262262
if err != nil {
263263
return nil, err
264264
}
@@ -271,7 +271,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanag
271271
fromBlock = latestBlock - BlockInterval
272272
}
273273

274-
logs, err := s.FilterBatchV2Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil)
274+
logs, err := s.FilterBatchV2Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil, retry.NetworkRetryParams())
275275
if err != nil {
276276
return nil, err
277277
}
@@ -293,7 +293,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanag
293293

294294
batchIdentifier := append(lastLog.BatchMerkleRoot[:], lastLog.SenderAddress[:]...)
295295
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
296-
state, err := s.BatchesStateRetryable(nil, batchIdentifierHash)
296+
state, err := s.BatchesStateRetryable(nil, batchIdentifierHash, retry.NetworkRetryParams())
297297
if err != nil {
298298
return nil, err
299299
}
@@ -307,7 +307,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanag
307307

308308
// getLatestNotRespondedTaskFromEthereum queries the blockchain for the latest not responded task using the FilterNewBatch method.
309309
func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
310-
latestBlock, err := s.BlockNumberRetryable(context.Background())
310+
latestBlock, err := s.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams())
311311
if err != nil {
312312
return nil, err
313313
}
@@ -320,7 +320,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanag
320320
fromBlock = latestBlock - BlockInterval
321321
}
322322

323-
logs, err := s.FilterBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil)
323+
logs, err := s.FilterBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil, retry.NetworkRetryParams())
324324
if err != nil {
325325
return nil, err
326326
}
@@ -342,7 +342,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanag
342342

343343
batchIdentifier := append(lastLog.BatchMerkleRoot[:], lastLog.SenderAddress[:]...)
344344
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
345-
state, err := s.BatchesStateRetryable(nil, batchIdentifierHash)
345+
state, err := s.BatchesStateRetryable(nil, batchIdentifierHash, retry.NetworkRetryParams())
346346
if err != nil {
347347
return nil, err
348348
}
@@ -355,15 +355,15 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanag
355355
}
356356

357357
func (s *AvsSubscriber) WaitForOneBlock(startBlock uint64) error {
358-
currentBlock, err := s.BlockNumberRetryable(context.Background())
358+
currentBlock, err := s.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams())
359359
if err != nil {
360360
return err
361361
}
362362

363363
if currentBlock <= startBlock { // should really be == but just in case
364364
// Subscribe to new head
365365
c := make(chan *types.Header)
366-
sub, err := s.SubscribeNewHeadRetryable(context.Background(), c)
366+
sub, err := s.SubscribeNewHeadRetryable(context.Background(), c, retry.NetworkRetryParams())
367367
if err != nil {
368368
return err
369369
}

core/chainio/avs_writer.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E
9292
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, timeToWaitBeforeBump time.Duration, onGasPriceBumped func(*big.Int)) (*types.Receipt, error) {
9393
txOpts := *w.Signer.GetTxOpts()
9494
txOpts.NoSend = true // simulate the transaction
95-
simTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
95+
simTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature, retry.SendToChainRetryParams())
9696
if err != nil {
9797
return nil, err
9898
}
@@ -109,7 +109,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
109109
batchMerkleRootHashString := hex.EncodeToString(batchMerkleRoot[:])
110110

111111
respondToTaskV2Func := func() (*types.Receipt, error) {
112-
gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback)
112+
gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback, retry.NetworkRetryParams())
113113
if err != nil {
114114
return nil, err
115115
}
@@ -144,7 +144,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
144144
}
145145
}
146146
w.logger.Infof("Receipts for old transactions not found, will check if the batch state has been responded", "merkle root", batchMerkleRootHashString)
147-
batchState, _ := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash)
147+
batchState, _ := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash, retry.NetworkRetryParams())
148148
if batchState.Responded {
149149
w.logger.Infof("Batch state has been already responded", "merkle root", batchMerkleRootHashString)
150150
return nil, nil
@@ -163,15 +163,15 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
163163
}
164164

165165
w.logger.Infof("Sending RespondToTask transaction with a gas price of %v", txOpts.GasPrice, "merkle root", batchMerkleRootHashString)
166-
realTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
166+
realTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature, retry.SendToChainRetryParams())
167167
if err != nil {
168168
w.logger.Errorf("Respond to task transaction err, %v", err, "merkle root", batchMerkleRootHashString)
169169
return nil, err
170170
}
171171
sentTxs = append(sentTxs, realTx)
172172

173173
w.logger.Infof("Transaction sent, waiting for receipt", "merkle root", batchMerkleRootHashString)
174-
receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, realTx.Hash(), timeToWaitBeforeBump)
174+
receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, realTx.Hash(), retry.WaitForTxRetryParams(timeToWaitBeforeBump))
175175
if receipt != nil {
176176
w.checkIfAggregatorHadToPaidForBatcher(realTx, batchIdentifierHash)
177177
return receipt, nil
@@ -191,15 +191,14 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
191191
// This just retries the bump of a fee in case of a timeout
192192
// The wait is done before on WaitForTransactionReceiptRetryable, and all the functions are retriable,
193193
// so this retry doesn't need to wait more time
194-
maxInterval := time.Millisecond * 500
195-
return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, maxInterval, 0)
194+
return retry.RetryWithData(respondToTaskV2Func, retry.RespondToTaskV2())
196195
}
197196

198197
// Calculates the transaction cost from the receipt and compares it with the batcher respondToTaskFeeLimit
199198
// if the tx cost was higher, then it means the aggregator has paid the difference for the batcher (txCost - respondToTaskFeeLimit) and so metrics are updated accordingly.
200199
// otherwise nothing is done.
201200
func (w *AvsWriter) checkIfAggregatorHadToPaidForBatcher(tx *types.Transaction, batchIdentifierHash [32]byte) {
202-
batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash)
201+
batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash, retry.NetworkRetryParams())
203202
if err != nil {
204203
return
205204
}
@@ -225,7 +224,7 @@ func (w *AvsWriter) checkAggAndBatcherHaveEnoughBalance(tx *types.Transaction, t
225224
txCost := new(big.Int).Mul(txGasAsBigInt, txGasPrice)
226225
w.logger.Info("Transaction cost", "cost", txCost)
227226

228-
batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash)
227+
batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash, retry.NetworkRetryParams())
229228
if err != nil {
230229
w.logger.Error("Failed to get batch state", "error", err)
231230
w.logger.Info("Proceeding to check balances against transaction cost")
@@ -253,7 +252,7 @@ func (w *AvsWriter) compareAggregatorBalance(amount *big.Int, aggregatorAddress
253252
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
254253
defer cancel()
255254

256-
aggregatorBalance, err := w.BalanceAtRetryable(ctx, aggregatorAddress, nil)
255+
aggregatorBalance, err := w.BalanceAtRetryable(ctx, aggregatorAddress, nil, retry.NetworkRetryParams())
257256
if err != nil {
258257
// Ignore and continue.
259258
w.logger.Error("failed to get aggregator balance: %v", err)
@@ -268,7 +267,7 @@ func (w *AvsWriter) compareAggregatorBalance(amount *big.Int, aggregatorAddress
268267

269268
func (w *AvsWriter) compareBatcherBalance(amount *big.Int, senderAddress [20]byte) error {
270269
// Get batcher balance
271-
batcherBalance, err := w.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress)
270+
batcherBalance, err := w.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress, retry.NetworkRetryParams())
272271
if err != nil {
273272
// Ignore and continue.
274273
w.logger.Error("Failed to get batcherBalance", "error", err)

0 commit comments

Comments
 (0)