Skip to content

Commit 635c8f0

Browse files
MarcosNicolauuri-99OppenMauroToscanoJuArce
authored
fix: send aggregated response bump (#1440)
Co-authored-by: Urix <[email protected]> Co-authored-by: Mario Rugiero <[email protected]> Co-authored-by: MauroFab <[email protected]> Co-authored-by: Mauro Toscano <[email protected]> Co-authored-by: Julian Arce <[email protected]>
1 parent 3079ac4 commit 635c8f0

File tree

5 files changed

+90
-25
lines changed

5 files changed

+90
-25
lines changed

aggregator/pkg/aggregator.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,13 @@ func (agg *Aggregator) Start(ctx context.Context) error {
225225
const MaxSentTxRetries = 5
226226

227227
func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
228+
defer func() {
229+
err := recover() //stops panics
230+
if err != nil {
231+
agg.logger.Error("handleBlsAggServiceResponse recovered from panic", "err", err)
232+
}
233+
}()
234+
228235
agg.taskMutex.Lock()
229236
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Fetching task data")
230237
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
@@ -277,10 +284,15 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
277284
}
278285

279286
agg.logger.Info("Sending aggregated response onchain", "taskIndex", blsAggServiceResp.TaskIndex,
280-
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
287+
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]), "merkleRoot", "0x"+hex.EncodeToString(batchData.BatchMerkleRoot[:]))
281288
receipt, err := agg.sendAggregatedResponse(batchIdentifierHash, batchData.BatchMerkleRoot, batchData.SenderAddress, nonSignerStakesAndSignature)
282289
if err == nil {
283-
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, receipt.TxHash.String())
290+
// In some cases, we may fail to retrieve the receipt for the transaction.
291+
txHash := "Unknown"
292+
if receipt != nil {
293+
txHash = receipt.TxHash.String()
294+
}
295+
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, txHash)
284296
agg.logger.Info("Aggregator successfully responded to task",
285297
"taskIndex", blsAggServiceResp.TaskIndex,
286298
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
@@ -428,7 +440,7 @@ func (agg *Aggregator) ClearTasksFromMaps() {
428440
defer func() {
429441
err := recover() //stops panics
430442
if err != nil {
431-
agg.logger.Error("Recovered from panic", "err", err)
443+
agg.logger.Error("ClearTasksFromMaps Recovered from panic", "err", err)
432444
}
433445
}()
434446

config-files/config-aggregator.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ aggregator:
3838
garbage_collector_tasks_age: 20 #The age of tasks that will be removed by the GC, in blocks. Suggested value for prod: '216000' (30 days)
3939
garbage_collector_tasks_interval: 10 #The interval of queried blocks to get an old batch. Suggested value for prod: '900' (3 hours)
4040
bls_service_task_timeout: 168h # The timeout of bls aggregation service tasks. Suggested value for prod '168h' (7 days)
41-
gas_base_bump_percentage: 10 # How much to bump gas price when responding to task. Suggested value 10%
42-
gas_bump_incremental_percentage: 2 # An extra percentage to bump every retry i*2 when responding to task. Suggested value 2%
43-
time_to_wait_before_bump: 36s # The time to wait for the receipt when responding to task. Suggested value 36 seconds (3 blocks)
41+
gas_base_bump_percentage: 25 # Percentage to overestimate gas price when sending a task
42+
gas_bump_incremental_percentage: 20 # An extra percentage to overestimate in each bump of respond to task. This is additive between tries
43+
# Gas used formula = est_gas_by_node * (gas_base_bump_percentage + gas_bum_incremental_percentage * i) / 100, where i is the iteration number.
44+
time_to_wait_before_bump: 72s # The time to wait for the receipt when responding to task. Suggested value 72 seconds (6 blocks)
4445

4546
## Operator Configurations
4647
# operator:

core/chainio/avs_writer.go

Lines changed: 68 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package chainio
22

33
import (
44
"context"
5+
"encoding/hex"
56
"fmt"
67
"math/big"
78
"time"
@@ -75,9 +76,19 @@ func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E
7576
}, nil
7677
}
7778

78-
// Sends AggregatedResponse and waits for the receipt for three blocks, if not received
79-
// it will try again bumping the last tx gas price based on `CalculateGasPriceBump`
80-
// This process happens indefinitely until the transaction is included.
79+
// SendAggregatedResponse continuously sends a RespondToTask transaction until it is included in the blockchain.
80+
// This function:
81+
// 1. Simulates the transaction to calculate the nonce and initial gas price without broadcasting it.
82+
// 2. Repeatedly attempts to send the transaction, bumping the gas price after `timeToWaitBeforeBump` has passed.
83+
// 3. Monitors for the receipt of previously sent transactions or checks the state to confirm if the response
84+
// has already been processed (e.g., by another transaction).
85+
// 4. Validates that the aggregator and batcher have sufficient balance to cover transaction costs before sending.
86+
//
87+
// Returns:
88+
// - A transaction receipt if the transaction is successfully included in the blockchain.
89+
// - If no receipt is found, but the batch state indicates the response has already been processed, it exits
90+
// without an error (returning `nil, nil`).
91+
// - An error if the process encounters a fatal issue (e.g., permanent failure in verifying balances or state).
8192
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, timeToWaitBeforeBump time.Duration, onGasPriceBumped func(*big.Int)) (*types.Receipt, error) {
8293
txOpts := *w.Signer.GetTxOpts()
8394
txOpts.NoSend = true // simulate the transaction
@@ -93,39 +104,73 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
93104
txOpts.NoSend = false
94105
i := 0
95106

107+
var sentTxs []*types.Transaction
108+
109+
batchMerkleRootHashString := hex.EncodeToString(batchMerkleRoot[:])
110+
96111
respondToTaskV2Func := func() (*types.Receipt, error) {
97112
gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback)
98113
if err != nil {
99114
return nil, err
100115
}
101-
102-
bumpedGasPrice := utils.CalculateGasPriceBumpBasedOnRetry(gasPrice, gasBumpPercentage, gasBumpIncrementalPercentage, i)
103-
// new bumped gas price must be higher than the last one (this should hardly ever happen though)
104-
if bumpedGasPrice.Cmp(txOpts.GasPrice) > 0 {
105-
txOpts.GasPrice = bumpedGasPrice
116+
previousTxGasPrice := txOpts.GasPrice
117+
// in order to avoid replacement transaction underpriced
118+
// the bumped gas price has to be at least 10% higher than the previous one.
119+
minimumGasPriceBump := utils.CalculateGasPriceBumpBasedOnRetry(previousTxGasPrice, 10, 0, 0)
120+
suggestedBumpedGasPrice := utils.CalculateGasPriceBumpBasedOnRetry(
121+
gasPrice,
122+
gasBumpPercentage,
123+
gasBumpIncrementalPercentage,
124+
i,
125+
)
126+
// check the new gas price is sufficiently bumped.
127+
// if the suggested bump does not meet the minimum threshold, use a fallback calculation to slightly increment the previous gas price.
128+
if suggestedBumpedGasPrice.Cmp(minimumGasPriceBump) > 0 {
129+
txOpts.GasPrice = suggestedBumpedGasPrice
106130
} else {
107-
// bump the last tx gas price a little by `gasBumpIncrementalPercentage` to replace it.
108-
txOpts.GasPrice = utils.CalculateGasPriceBumpBasedOnRetry(txOpts.GasPrice, gasBumpIncrementalPercentage, 0, 0)
131+
txOpts.GasPrice = minimumGasPriceBump
109132
}
110133

111134
if i > 0 {
135+
w.logger.Infof("Trying to get old sent transaction receipt before sending a new transaction", "merkle root", batchMerkleRootHashString)
136+
for _, tx := range sentTxs {
137+
receipt, _ := w.Client.TransactionReceipt(context.Background(), tx.Hash())
138+
if receipt == nil {
139+
receipt, _ = w.ClientFallback.TransactionReceipt(context.Background(), tx.Hash())
140+
if receipt != nil {
141+
w.checkIfAggregatorHadToPaidForBatcher(tx, batchIdentifierHash)
142+
return receipt, nil
143+
}
144+
}
145+
}
146+
w.logger.Infof("Receipts for old transactions not found, will check if the batch state has been responded", "merkle root", batchMerkleRootHashString)
147+
batchState, _ := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash)
148+
if batchState.Responded {
149+
w.logger.Infof("Batch state has been already responded", "merkle root", batchMerkleRootHashString)
150+
return nil, nil
151+
}
152+
w.logger.Infof("Batch state has not been responded yet, will send a new tx", "merkle root", batchMerkleRootHashString)
153+
112154
onGasPriceBumped(txOpts.GasPrice)
113155
}
114156

115157
// We compare both Aggregator funds and Batcher balance in Aligned against respondToTaskFeeLimit
116158
// Both are required to have some balance, more details inside the function
117159
err = w.checkAggAndBatcherHaveEnoughBalance(simTx, txOpts, batchIdentifierHash, senderAddress)
118160
if err != nil {
161+
w.logger.Errorf("Permanent error when checking aggregator and batcher balances, err %v", err, "merkle root", batchMerkleRootHashString)
119162
return nil, retry.PermanentError{Inner: err}
120163
}
121164

122-
w.logger.Infof("Sending RespondToTask transaction with a gas price of %v", txOpts.GasPrice)
123-
165+
w.logger.Infof("Sending RespondToTask transaction with a gas price of %v", txOpts.GasPrice, "merkle root", batchMerkleRootHashString)
124166
realTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
125167
if err != nil {
168+
w.logger.Errorf("Respond to task transaction err, %v", err, "merkle root", batchMerkleRootHashString)
126169
return nil, err
127170
}
171+
sentTxs = append(sentTxs, realTx)
128172

173+
w.logger.Infof("Transaction sent, waiting for receipt", "merkle root", batchMerkleRootHashString)
129174
receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, realTx.Hash(), timeToWaitBeforeBump)
130175
if receipt != nil {
131176
w.checkIfAggregatorHadToPaidForBatcher(realTx, batchIdentifierHash)
@@ -136,14 +181,18 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
136181
// we increment the i here to add an incremental percentage to increase the odds of being included in the next blocks
137182
i++
138183

139-
w.logger.Infof("RespondToTask receipt waiting timeout has passed, will try again...")
184+
w.logger.Infof("RespondToTask receipt waiting timeout has passed, will try again...", "merkle_root", batchMerkleRootHashString)
140185
if err != nil {
141186
return nil, err
142187
}
143188
return nil, fmt.Errorf("transaction failed")
144189
}
145190

146-
return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, retry.MaxInterval, 0)
191+
// This just retries the bump of a fee in case of a timeout
192+
// The wait is done before on WaitForTransactionReceiptRetryable, and all the functions are retriable,
193+
// so this retry doesn't need to wait more time
194+
maxInterval := time.Millisecond * 500
195+
return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, maxInterval, 0)
147196
}
148197

149198
// Calculates the transaction cost from the receipt and compares it with the batcher respondToTaskFeeLimit
@@ -171,7 +220,9 @@ func (w *AvsWriter) checkIfAggregatorHadToPaidForBatcher(tx *types.Transaction,
171220
func (w *AvsWriter) checkAggAndBatcherHaveEnoughBalance(tx *types.Transaction, txOpts bind.TransactOpts, batchIdentifierHash [32]byte, senderAddress [20]byte) error {
172221
w.logger.Info("Checking if aggregator and batcher have enough balance for the transaction")
173222
aggregatorAddress := txOpts.From
174-
txCost := new(big.Int).Mul(new(big.Int).SetUint64(tx.Gas()), txOpts.GasPrice)
223+
txGasAsBigInt := new(big.Int).SetUint64(tx.Gas())
224+
txGasPrice := txOpts.GasPrice
225+
txCost := new(big.Int).Mul(txGasAsBigInt, txGasPrice)
175226
w.logger.Info("Transaction cost", "cost", txCost)
176227

177228
batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash)
@@ -183,8 +234,8 @@ func (w *AvsWriter) checkAggAndBatcherHaveEnoughBalance(tx *types.Transaction, t
183234
respondToTaskFeeLimit := batchState.RespondToTaskFeeLimit
184235
w.logger.Info("Checking balance against Batch RespondToTaskFeeLimit", "RespondToTaskFeeLimit", respondToTaskFeeLimit)
185236
// Note: we compare both Aggregator funds and Batcher balance in Aligned against respondToTaskFeeLimit
186-
// Batcher will pay up to respondToTaskFeeLimit, for this he needs that amount of funds in Aligned
187-
// Aggregator will pay any extra cost, for this he needs at least respondToTaskFeeLimit in his balance
237+
// Batcher will pay up to respondToTaskFeeLimit, for this he needs that amount of funds in Aligned
238+
// Aggregator will pay any extra cost, for this he needs at least respondToTaskFeeLimit in his balance
188239
return w.compareBalances(respondToTaskFeeLimit, aggregatorAddress, senderAddress)
189240
}
190241

core/chainio/retryable.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkl
3030
// If error try with fallback
3131
tx, err = w.AvsContractBindings.ServiceManagerFallback.RespondToTaskV2(opts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
3232
}
33+
3334
return tx, err
3435
}
3536
return retry.RetryWithData(respondToTaskV2_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime)

core/retry.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Durat
104104
if panic_err, ok := r.(error); ok {
105105
err = panic_err
106106
} else {
107-
err = fmt.Errorf("panicked: %v", panic_err)
107+
err = fmt.Errorf("RetryWithData panicked: %v", panic_err)
108108
}
109109
}
110110
}()
@@ -151,7 +151,7 @@ func Retry(functionToRetry func() error, minDelay time.Duration, factor float64,
151151
if panic_err, ok := r.(error); ok {
152152
err = panic_err
153153
} else {
154-
err = fmt.Errorf("panicked: %v", panic_err)
154+
err = fmt.Errorf("Retry panicked: %v", panic_err)
155155
}
156156
}
157157
}()

0 commit comments

Comments
 (0)