Skip to content

Commit 5f437ae

Browse files
authored
Merge branch 'staging' into 1401-1414-aggregator-retry-improvements
2 parents 555233a + fecac6f commit 5f437ae

File tree

6 files changed

+166
-49
lines changed

6 files changed

+166
-49
lines changed

aggregator/pkg/aggregator.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,13 @@ func (agg *Aggregator) Start(ctx context.Context) error {
225225
const MaxSentTxRetries = 5
226226

227227
func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
228+
defer func() {
229+
err := recover() //stops panics
230+
if err != nil {
231+
agg.logger.Error("handleBlsAggServiceResponse recovered from panic", "err", err)
232+
}
233+
}()
234+
228235
agg.taskMutex.Lock()
229236
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Fetching task data")
230237
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
@@ -277,10 +284,15 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
277284
}
278285

279286
agg.logger.Info("Sending aggregated response onchain", "taskIndex", blsAggServiceResp.TaskIndex,
280-
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
287+
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]), "merkleRoot", "0x"+hex.EncodeToString(batchData.BatchMerkleRoot[:]))
281288
receipt, err := agg.sendAggregatedResponse(batchIdentifierHash, batchData.BatchMerkleRoot, batchData.SenderAddress, nonSignerStakesAndSignature)
282289
if err == nil {
283-
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, receipt.TxHash.String())
290+
// In some cases, we may fail to retrieve the receipt for the transaction.
291+
txHash := "Unknown"
292+
if receipt != nil {
293+
txHash = receipt.TxHash.String()
294+
}
295+
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, txHash)
284296
agg.logger.Info("Aggregator successfully responded to task",
285297
"taskIndex", blsAggServiceResp.TaskIndex,
286298
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
@@ -432,7 +444,7 @@ func (agg *Aggregator) ClearTasksFromMaps() {
432444
defer func() {
433445
err := recover() //stops panics
434446
if err != nil {
435-
agg.logger.Error("Recovered from panic", "err", err)
447+
agg.logger.Error("ClearTasksFromMaps Recovered from panic", "err", err)
436448
}
437449
}()
438450

config-files/config-aggregator.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ aggregator:
3838
garbage_collector_tasks_age: 20 #The age of tasks that will be removed by the GC, in blocks. Suggested value for prod: '216000' (30 days)
3939
garbage_collector_tasks_interval: 10 #The interval of queried blocks to get an old batch. Suggested value for prod: '900' (3 hours)
4040
bls_service_task_timeout: 168h # The timeout of bls aggregation service tasks. Suggested value for prod '168h' (7 days)
41-
gas_base_bump_percentage: 10 # How much to bump gas price when responding to task. Suggested value 10%
42-
gas_bump_incremental_percentage: 2 # An extra percentage to bump every retry i*2 when responding to task. Suggested value 2%
43-
time_to_wait_before_bump: 36s # The time to wait for the receipt when responding to task. Suggested value 36 seconds (3 blocks)
41+
gas_base_bump_percentage: 25 # Percentage to overestimate gas price when sending a task
42+
gas_bump_incremental_percentage: 20 # An extra percentage to overestimate in each bump of respond to task. This is additive between tries
43+
# Gas used formula = est_gas_by_node * (gas_base_bump_percentage + gas_bum_incremental_percentage * i) / 100, where i is the iteration number.
44+
time_to_wait_before_bump: 72s # The time to wait for the receipt when responding to task. Suggested value 72 seconds (6 blocks)
4445

4546
## Operator Configurations
4647
# operator:

core/chainio/avs_writer.go

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package chainio
22

33
import (
44
"context"
5+
"encoding/hex"
56
"fmt"
67
"math/big"
78
"time"
@@ -80,9 +81,19 @@ func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E
8081
}, nil
8182
}
8283

83-
// Sends AggregatedResponse and waits for the receipt for three blocks, if not received
84-
// it will try again bumping the last tx gas price based on `CalculateGasPriceBump`
85-
// This process happens indefinitely until the transaction is included.
84+
// SendAggregatedResponse continuously sends a RespondToTask transaction until it is included in the blockchain.
85+
// This function:
86+
// 1. Simulates the transaction to calculate the nonce and initial gas price without broadcasting it.
87+
// 2. Repeatedly attempts to send the transaction, bumping the gas price after `timeToWaitBeforeBump` has passed.
88+
// 3. Monitors for the receipt of previously sent transactions or checks the state to confirm if the response
89+
// has already been processed (e.g., by another transaction).
90+
// 4. Validates that the aggregator and batcher have sufficient balance to cover transaction costs before sending.
91+
//
92+
// Returns:
93+
// - A transaction receipt if the transaction is successfully included in the blockchain.
94+
// - If no receipt is found, but the batch state indicates the response has already been processed, it exits
95+
// without an error (returning `nil, nil`).
96+
// - An error if the process encounters a fatal issue (e.g., permanent failure in verifying balances or state).
8697
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, timeToWaitBeforeBump time.Duration, onGasPriceBumped func(*big.Int)) (*types.Receipt, error) {
8798
txOpts := *w.Signer.GetTxOpts()
8899
txOpts.NoSend = true // simulate the transaction
@@ -108,40 +119,75 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
108119
waitForTxConfig.NumRetries = waitForTxConfigNumRetries
109120
waitForTxConfig.MaxElapsedTime = timeToWaitBeforeBump
110121

122+
var sentTxs []*types.Transaction
123+
124+
batchMerkleRootHashString := hex.EncodeToString(batchMerkleRoot[:])
125+
111126
respondToTaskV2Func := func() (*types.Receipt, error) {
112127
gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback)
113128
if err != nil {
114129
return nil, err
115130
}
116-
117-
bumpedGasPrice := utils.CalculateGasPriceBumpBasedOnRetry(gasPrice, gasBumpPercentage, gasBumpIncrementalPercentage, i)
118-
// new bumped gas price must be higher than the last one (this should hardly ever happen though)
119-
if bumpedGasPrice.Cmp(txOpts.GasPrice) > 0 {
120-
txOpts.GasPrice = bumpedGasPrice
131+
previousTxGasPrice := txOpts.GasPrice
132+
// in order to avoid replacement transaction underpriced
133+
// the bumped gas price has to be at least 10% higher than the previous one.
134+
minimumGasPriceBump := utils.CalculateGasPriceBumpBasedOnRetry(previousTxGasPrice, 10, 0, 0)
135+
suggestedBumpedGasPrice := utils.CalculateGasPriceBumpBasedOnRetry(
136+
gasPrice,
137+
gasBumpPercentage,
138+
gasBumpIncrementalPercentage,
139+
i,
140+
)
141+
// check the new gas price is sufficiently bumped.
142+
// if the suggested bump does not meet the minimum threshold, use a fallback calculation to slightly increment the previous gas price.
143+
if suggestedBumpedGasPrice.Cmp(minimumGasPriceBump) > 0 {
144+
txOpts.GasPrice = suggestedBumpedGasPrice
121145
} else {
122-
// bump the last tx gas price a little by `gasBumpIncrementalPercentage` to replace it.
123-
txOpts.GasPrice = utils.CalculateGasPriceBumpBasedOnRetry(txOpts.GasPrice, gasBumpIncrementalPercentage, 0, 0)
146+
txOpts.GasPrice = minimumGasPriceBump
124147
}
125148

126149
if i > 0 {
150+
w.logger.Infof("Trying to get old sent transaction receipt before sending a new transaction", "merkle root", batchMerkleRootHashString)
151+
for _, tx := range sentTxs {
152+
receipt, _ := w.Client.TransactionReceipt(context.Background(), tx.Hash())
153+
if receipt == nil {
154+
receipt, _ = w.ClientFallback.TransactionReceipt(context.Background(), tx.Hash())
155+
if receipt != nil {
156+
w.checkIfAggregatorHadToPaidForBatcher(tx, batchIdentifierHash)
157+
return receipt, nil
158+
}
159+
}
160+
}
161+
w.logger.Infof("Receipts for old transactions not found, will check if the batch state has been responded", "merkle root", batchMerkleRootHashString)
162+
batchState, _ := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash)
163+
if batchState.Responded {
164+
w.logger.Infof("Batch state has been already responded", "merkle root", batchMerkleRootHashString)
165+
return nil, nil
166+
}
167+
w.logger.Infof("Batch state has not been responded yet, will send a new tx", "merkle root", batchMerkleRootHashString)
168+
127169
onGasPriceBumped(txOpts.GasPrice)
128170
}
129171

130172
// We compare both Aggregator funds and Batcher balance in Aligned against respondToTaskFeeLimit
131173
// Both are required to have some balance, more details inside the function
132174
err = w.checkAggAndBatcherHaveEnoughBalance(simTx, txOpts, batchIdentifierHash, senderAddress)
133175
if err != nil {
176+
w.logger.Errorf("Permanent error when checking aggregator and batcher balances, err %v", err, "merkle root", batchMerkleRootHashString)
134177
return nil, retry.PermanentError{Inner: err}
135178
}
136179

137-
w.logger.Infof("Sending RespondToTask transaction with a gas price of %v", txOpts.GasPrice)
138-
180+
w.logger.Infof("Sending RespondToTask transaction with a gas price of %v", txOpts.GasPrice, "merkle root", batchMerkleRootHashString)
139181
realTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
140182
if err != nil {
183+
w.logger.Errorf("Respond to task transaction err, %v", err, "merkle root", batchMerkleRootHashString)
141184
return nil, err
142185
}
186+
sentTxs = append(sentTxs, realTx)
143187

188+
w.logger.Infof("Transaction sent, waiting for receipt", "merkle root", batchMerkleRootHashString)
144189
receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, realTx.Hash(), waitForTxConfig)
190+
145191
if receipt != nil {
146192
w.checkIfAggregatorHadToPaidForBatcher(realTx, batchIdentifierHash)
147193
return receipt, nil
@@ -151,13 +197,16 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
151197
// we increment the i here to add an incremental percentage to increase the odds of being included in the next blocks
152198
i++
153199

154-
w.logger.Infof("RespondToTask receipt waiting timeout has passed, will try again...")
200+
w.logger.Infof("RespondToTask receipt waiting timeout has passed, will try again...", "merkle_root", batchMerkleRootHashString)
155201
if err != nil {
156202
return nil, err
157203
}
158204
return nil, fmt.Errorf("transaction failed")
159205
}
160206

207+
// This just retries the bump of a fee in case of a timeout
208+
// The wait is done before on WaitForTransactionReceiptRetryable, and all the functions are retriable,
209+
// so this retry doesn't need to wait more time
161210
return retry.RetryWithData(respondToTaskV2Func, respondToTaskV2Config)
162211
}
163212

@@ -186,7 +235,9 @@ func (w *AvsWriter) checkIfAggregatorHadToPaidForBatcher(tx *types.Transaction,
186235
func (w *AvsWriter) checkAggAndBatcherHaveEnoughBalance(tx *types.Transaction, txOpts bind.TransactOpts, batchIdentifierHash [32]byte, senderAddress [20]byte) error {
187236
w.logger.Info("Checking if aggregator and batcher have enough balance for the transaction")
188237
aggregatorAddress := txOpts.From
189-
txCost := new(big.Int).Mul(new(big.Int).SetUint64(tx.Gas()), txOpts.GasPrice)
238+
txGasAsBigInt := new(big.Int).SetUint64(tx.Gas())
239+
txGasPrice := txOpts.GasPrice
240+
txCost := new(big.Int).Mul(txGasAsBigInt, txGasPrice)
190241
w.logger.Info("Transaction cost", "cost", txCost)
191242

192243
batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash)

core/chainio/retryable.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func RespondToTaskV2Func(w *AvsWriter, opts *bind.TransactOpts, batchMerkleRoot
2323
// If error try with fallback
2424
tx, err = w.AvsContractBindings.ServiceManagerFallback.RespondToTaskV2(opts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
2525
}
26+
2627
return tx, err
2728
}
2829
return respondToTaskV2_func

core/retry.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func RetryWithData[T any](functionToRetry func() (T, error), config *RetryConfig
137137
if panic_err, ok := r.(error); ok {
138138
err = panic_err
139139
} else {
140-
err = fmt.Errorf("panicked: %v", panic_err)
140+
err = fmt.Errorf("RetryWithData panicked: %v", panic_err)
141141
}
142142
}
143143
}()
@@ -184,7 +184,7 @@ func Retry(functionToRetry func() error, config *RetryConfig) error {
184184
if panic_err, ok := r.(error); ok {
185185
err = panic_err
186186
} else {
187-
err = fmt.Errorf("panicked: %v", panic_err)
187+
err = fmt.Errorf("Retry panicked: %v", panic_err)
188188
}
189189
}
190190
}()

0 commit comments

Comments
 (0)