Skip to content

Commit 44339f7

Browse files
MarcosNicolauentropidelicPatStilesuri-99avilagaston9
authored
feat: aggregator bump fee if transaction was not included (#1286)
Co-authored-by: Mariano Nicolini <[email protected]> Co-authored-by: PatStiles <[email protected]> Co-authored-by: Urix <[email protected]> Co-authored-by: avilagaston9 <[email protected]>
1 parent b338f32 commit 44339f7

File tree

15 files changed

+830
-498
lines changed

15 files changed

+830
-498
lines changed

.github/workflows/build-and-test-go.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ jobs:
2424
with:
2525
go-version: "1.23"
2626
cache: false
27+
- name: foundry-toolchain
28+
uses: foundry-rs/[email protected]
2729
- name: Build SP1 bindings
2830
run: make build_sp1_linux
2931
- name: Build Old SP1 bindings

aggregator/pkg/aggregator.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/hex"
66
"fmt"
7+
"math/big"
78
"strings"
89
"sync"
910
"time"
@@ -300,7 +301,21 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
300301
"senderAddress", hex.EncodeToString(senderAddress[:]),
301302
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))
302303

303-
txHash, err := agg.avsWriter.SendAggregatedResponse(batchIdentifierHash, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
304+
// This function is a callback that is called when the gas price is bumped on the avsWriter.SendAggregatedResponse
305+
onGasPriceBumped := func(bumpedGasPrice *big.Int) {
306+
agg.metrics.IncBumpedGasPriceForAggregatedResponse()
307+
agg.telemetry.BumpedTaskGasPrice(batchMerkleRoot, bumpedGasPrice.String())
308+
}
309+
receipt, err := agg.avsWriter.SendAggregatedResponse(
310+
batchIdentifierHash,
311+
batchMerkleRoot,
312+
senderAddress,
313+
nonSignerStakesAndSignature,
314+
agg.AggregatorConfig.Aggregator.GasBaseBumpPercentage,
315+
agg.AggregatorConfig.Aggregator.GasBumpIncrementalPercentage,
316+
agg.AggregatorConfig.Aggregator.TimeToWaitBeforeBump,
317+
onGasPriceBumped,
318+
)
304319
if err != nil {
305320
agg.walletMutex.Unlock()
306321
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err)
@@ -311,13 +326,6 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
311326
agg.walletMutex.Unlock()
312327
agg.logger.Infof("- Unlocked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchIdentifierHash[:]))
313328

314-
receipt, err := utils.WaitForTransactionReceipt(
315-
agg.AggregatorConfig.BaseConfig.EthRpcClient, context.Background(), *txHash)
316-
if err != nil {
317-
agg.telemetry.LogTaskError(batchMerkleRoot, err)
318-
return nil, err
319-
}
320-
321329
agg.metrics.IncAggregatedResponses()
322330

323331
return receipt, nil

aggregator/pkg/telemetry.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ type TaskErrorMessage struct {
3030
TaskError string `json:"error"`
3131
}
3232

33+
type TaskGasPriceBumpMessage struct {
34+
MerkleRoot string `json:"merkle_root"`
35+
BumpedGasPrice string `json:"bumped_gas_price"`
36+
}
37+
3338
type TaskSentToEthereumMessage struct {
3439
MerkleRoot string `json:"merkle_root"`
3540
TxHash string `json:"tx_hash"`
@@ -96,6 +101,16 @@ func (t *Telemetry) LogTaskError(batchMerkleRoot [32]byte, taskError error) {
96101
}
97102
}
98103

104+
func (t *Telemetry) BumpedTaskGasPrice(batchMerkleRoot [32]byte, bumpedGasPrice string) {
105+
body := TaskGasPriceBumpMessage{
106+
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
107+
BumpedGasPrice: bumpedGasPrice,
108+
}
109+
if err := t.sendTelemetryMessage("/api/aggregatorTaskGasPriceBump", body); err != nil {
110+
t.logger.Error("[Telemetry] Error in LogOperatorResponse", "error", err)
111+
}
112+
}
113+
99114
func (t *Telemetry) TaskSentToEthereum(batchMerkleRoot [32]byte, txHash string) {
100115
body := TaskSentToEthereumMessage{
101116
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),

config-files/config-aggregator.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ aggregator:
3838
garbage_collector_tasks_age: 20 #The age of tasks that will be removed by the GC, in blocks. Suggested value for prod: '216000' (30 days)
3939
garbage_collector_tasks_interval: 10 #The interval of queried blocks to get an old batch. Suggested value for prod: '900' (3 hours)
4040
bls_service_task_timeout: 168h # The timeout of bls aggregation service tasks. Suggested value for prod '168h' (7 days)
41+
gas_base_bump_percentage: 10 # How much to bump gas price when responding to task. Suggested value 10%
42+
gas_bump_incremental_percentage: 2 # An extra percentage to bump every retry i*2 when responding to task. Suggested value 2%
43+
time_to_wait_before_bump: 36s # The time to wait for the receipt when responding to task. Suggested value 36 seconds (3 blocks)
4144

4245
## Operator Configurations
4346
# operator:

core/chainio/avs_writer.go

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ import (
1515
"github.com/ethereum/go-ethereum/common"
1616
"github.com/ethereum/go-ethereum/core/types"
1717
servicemanager "github.com/yetanotherco/aligned_layer/contracts/bindings/AlignedLayerServiceManager"
18+
retry "github.com/yetanotherco/aligned_layer/core"
1819
"github.com/yetanotherco/aligned_layer/core/config"
20+
"github.com/yetanotherco/aligned_layer/core/utils"
1921
)
2022

2123
type AvsWriter struct {
@@ -70,7 +72,10 @@ func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E
7072
}, nil
7173
}
7274

73-
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*common.Hash, error) {
75+
// Sends AggregatedResponse and waits for the receipt for three blocks, if not received
76+
// it will try again bumping the last tx gas price based on `CalculateGasPriceBump`
77+
// This process happens indefinitely until the transaction is included.
78+
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, timeToWaitBeforeBump time.Duration, onGasPriceBumped func(*big.Int)) (*types.Receipt, error) {
7479
txOpts := *w.Signer.GetTxOpts()
7580
txOpts.NoSend = true // simulate the transaction
7681
tx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
@@ -83,17 +88,61 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
8388
return nil, err
8489
}
8590

86-
// Send the transaction
91+
// Set the nonce, as we might have to replace the transaction with a higher gas price
92+
txNonce := big.NewInt(int64(tx.Nonce()))
93+
txOpts.Nonce = txNonce
94+
txOpts.GasPrice = tx.GasPrice()
8795
txOpts.NoSend = false
88-
txOpts.GasLimit = tx.Gas() * 110 / 100 // Add 10% to the gas limit
89-
tx, err = w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
90-
if err != nil {
91-
return nil, err
96+
i := 0
97+
98+
respondToTaskV2Func := func() (*types.Receipt, error) {
99+
gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback)
100+
if err != nil {
101+
return nil, err
102+
}
103+
104+
bumpedGasPrice := utils.CalculateGasPriceBumpBasedOnRetry(gasPrice, gasBumpPercentage, gasBumpIncrementalPercentage, i)
105+
// new bumped gas price must be higher than the last one (this should hardly ever happen though)
106+
if bumpedGasPrice.Cmp(txOpts.GasPrice) > 0 {
107+
txOpts.GasPrice = bumpedGasPrice
108+
} else {
109+
// bump the last tx gas price a little by `gasBumpIncrementalPercentage` to replace it.
110+
txOpts.GasPrice = utils.CalculateGasPriceBumpBasedOnRetry(txOpts.GasPrice, gasBumpIncrementalPercentage, 0, 0)
111+
}
112+
113+
if i > 0 {
114+
onGasPriceBumped(txOpts.GasPrice)
115+
}
116+
117+
err = w.checkRespondToTaskFeeLimit(tx, txOpts, batchIdentifierHash, senderAddress)
118+
if err != nil {
119+
return nil, retry.PermanentError{Inner: err}
120+
}
121+
122+
w.logger.Infof("Sending RespondToTask transaction with a gas price of %v", txOpts.GasPrice)
123+
124+
tx, err = w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
125+
if err != nil {
126+
return nil, err
127+
}
128+
129+
receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, tx.Hash(), timeToWaitBeforeBump)
130+
if receipt != nil {
131+
return receipt, nil
132+
}
133+
134+
// if we are here, it means we have reached the receipt waiting timeout
135+
// we increment the i here to add an incremental percentage to increase the odds of being included in the next blocks
136+
i++
137+
138+
w.logger.Infof("RespondToTask receipt waiting timeout has passed, will try again...")
139+
if err != nil {
140+
return nil, err
141+
}
142+
return nil, fmt.Errorf("transaction failed")
92143
}
93144

94-
txHash := tx.Hash()
95-
96-
return &txHash, nil
145+
return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, retry.MaxInterval, 0)
97146
}
98147

99148
func (w *AvsWriter) checkRespondToTaskFeeLimit(tx *types.Transaction, txOpts bind.TransactOpts, batchIdentifierHash [32]byte, senderAddress [20]byte) error {

core/config/aggregator.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ type AggregatorConfig struct {
2525
GarbageCollectorTasksAge uint64
2626
GarbageCollectorTasksInterval uint64
2727
BlsServiceTaskTimeout time.Duration
28+
GasBaseBumpPercentage uint
29+
GasBumpIncrementalPercentage uint
30+
TimeToWaitBeforeBump time.Duration
2831
}
2932
}
3033

@@ -40,6 +43,9 @@ type AggregatorConfigFromYaml struct {
4043
GarbageCollectorTasksAge uint64 `yaml:"garbage_collector_tasks_age"`
4144
GarbageCollectorTasksInterval uint64 `yaml:"garbage_collector_tasks_interval"`
4245
BlsServiceTaskTimeout time.Duration `yaml:"bls_service_task_timeout"`
46+
GasBaseBumpPercentage uint `yaml:"gas_base_bump_percentage"`
47+
GasBumpIncrementalPercentage uint `yaml:"gas_bump_incremental_percentage"`
48+
TimeToWaitBeforeBump time.Duration `yaml:"time_to_wait_before_bump"`
4349
} `yaml:"aggregator"`
4450
}
4551

@@ -85,6 +91,9 @@ func NewAggregatorConfig(configFilePath string) *AggregatorConfig {
8591
GarbageCollectorTasksAge uint64
8692
GarbageCollectorTasksInterval uint64
8793
BlsServiceTaskTimeout time.Duration
94+
GasBaseBumpPercentage uint
95+
GasBumpIncrementalPercentage uint
96+
TimeToWaitBeforeBump time.Duration
8897
}(aggregatorConfigFromYaml.Aggregator),
8998
}
9099
}

core/retry_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ func TestWaitForTransactionReceipt(t *testing.T) {
152152
t.Errorf("Error setting up Anvil: %s\n", err)
153153
}
154154

155-
_, err = utils.WaitForTransactionReceipt(*client, context.Background(), hash)
155+
// Assert Call succeeds when Anvil running
156+
_, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45)
156157
assert.NotNil(t, err, "Error Waiting for Transaction with Anvil Running: %s\n", err)
157158
if !strings.Contains(err.Error(), "not found") {
158159
t.Errorf("WaitForTransactionReceipt Emitted incorrect error: %s\n", err)
@@ -164,7 +165,7 @@ func TestWaitForTransactionReceipt(t *testing.T) {
164165
return
165166
}
166167

167-
_, err = utils.WaitForTransactionReceipt(*client, context.Background(), hash)
168+
_, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45)
168169
assert.NotNil(t, err)
169170
if _, ok := err.(retry.PermanentError); ok {
170171
t.Errorf("WaitForTransactionReceipt Emitted non Transient error: %s\n", err)
@@ -180,7 +181,7 @@ func TestWaitForTransactionReceipt(t *testing.T) {
180181
t.Errorf("Error setting up Anvil: %s\n", err)
181182
}
182183

183-
_, err = utils.WaitForTransactionReceipt(*client, context.Background(), hash)
184+
_, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45)
184185
assert.NotNil(t, err)
185186
if !strings.Contains(err.Error(), "not found") {
186187
t.Errorf("WaitForTransactionReceipt Emitted incorrect error: %s\n", err)

core/utils/eth_client_utils.go

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package utils
22

33
import (
44
"context"
5+
"math/big"
6+
"time"
57

68
"github.com/Layr-Labs/eigensdk-go/chainio/clients/eth"
79
eigentypes "github.com/Layr-Labs/eigensdk-go/types"
@@ -10,11 +12,25 @@ import (
1012
retry "github.com/yetanotherco/aligned_layer/core"
1113
)
1214

13-
func WaitForTransactionReceipt(client eth.InstrumentedClient, ctx context.Context, txHash gethcommon.Hash) (*types.Receipt, error) {
15+
// WaitForTransactionReceiptRetryable repeatedly attempts to fetch the transaction receipt for a given transaction hash.
16+
// If the receipt is not found, the function will retry with exponential backoff until the specified `waitTimeout` duration is reached.
17+
// If the receipt is still unavailable after `waitTimeout`, it will return an error.
18+
//
19+
// Note: The `time.Second * 2` is set as the max interval in the retry mechanism because we can't reliably measure the specific time the tx will be included in a block.
20+
// Setting a higher value will imply doing less retries across the waitTimeout and so we might lose the receipt
21+
func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, waitTimeout time.Duration) (*types.Receipt, error) {
1422
receipt_func := func() (*types.Receipt, error) {
15-
return client.TransactionReceipt(ctx, txHash)
23+
receipt, err := client.TransactionReceipt(context.Background(), txHash)
24+
if err != nil {
25+
receipt, err = client.TransactionReceipt(context.Background(), txHash)
26+
if err != nil {
27+
return nil, err
28+
}
29+
return receipt, nil
30+
}
31+
return receipt, nil
1632
}
17-
return retry.RetryWithData(receipt_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
33+
return retry.RetryWithData(receipt_func, retry.MinDelay, retry.RetryFactor, 0, time.Second*2, waitTimeout)
1834
}
1935

2036
func BytesToQuorumNumbers(quorumNumbersBytes []byte) eigentypes.QuorumNums {
@@ -32,3 +48,44 @@ func BytesToQuorumThresholdPercentages(quorumThresholdPercentagesBytes []byte) e
3248
}
3349
return quorumThresholdPercentages
3450
}
51+
52+
// Simple algorithm to calculate the gasPrice bump based on:
53+
// the currentGasPrice, a base bump percentage, a retry percentage, and the retry count.
54+
// Formula: currentGasPrice + (currentGasPrice * (baseBumpPercentage + retryCount * incrementalRetryPercentage) / 100)
55+
func CalculateGasPriceBumpBasedOnRetry(currentGasPrice *big.Int, baseBumpPercentage uint, retryAttemptPercentage uint, retryCount int) *big.Int {
56+
// Incremental percentage increase for each retry attempt (i*retryAttemptPercentage)
57+
incrementalRetryPercentage := new(big.Int).Mul(big.NewInt(int64(retryAttemptPercentage)), big.NewInt(int64(retryCount)))
58+
59+
// Total bump percentage: base bump + incremental retry percentage
60+
totalBumpPercentage := new(big.Int).Add(big.NewInt(int64(baseBumpPercentage)), incrementalRetryPercentage)
61+
62+
// Calculate the bump amount: currentGasPrice * totalBumpPercentage / 100
63+
bumpAmount := new(big.Int).Mul(currentGasPrice, totalBumpPercentage)
64+
bumpAmount = new(big.Int).Div(bumpAmount, big.NewInt(100))
65+
66+
// Final bumped gas price: currentGasPrice + bumpAmount
67+
bumpedGasPrice := new(big.Int).Add(currentGasPrice, bumpAmount)
68+
69+
return bumpedGasPrice
70+
}
71+
72+
/*
73+
GetGasPriceRetryable
74+
Get the gas price from the client with retry logic.
75+
- All errors are considered Transient Errors
76+
- Retry times: 1 sec, 2 sec, 4 sec
77+
*/
78+
func GetGasPriceRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient) (*big.Int, error) {
79+
respondToTaskV2_func := func() (*big.Int, error) {
80+
gasPrice, err := client.SuggestGasPrice(context.Background())
81+
if err != nil {
82+
gasPrice, err = fallbackClient.SuggestGasPrice(context.Background())
83+
if err != nil {
84+
return nil, err
85+
}
86+
}
87+
88+
return gasPrice, nil
89+
}
90+
return retry.RetryWithData(respondToTaskV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
91+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package utils_test
2+
3+
import (
4+
"math/big"
5+
"testing"
6+
7+
"github.com/yetanotherco/aligned_layer/core/utils"
8+
)
9+
10+
func TestCalculateGasPriceBumpBasedOnRetry(t *testing.T) {
11+
baseBumpPercentage := uint(20)
12+
incrementalRetryPercentage := uint(5)
13+
14+
gasPrices := [5]*big.Int{
15+
big.NewInt(3000000000),
16+
big.NewInt(3000000000),
17+
big.NewInt(4000000000),
18+
big.NewInt(4000000000),
19+
big.NewInt(5000000000)}
20+
21+
expectedBumpedGasPrices := [5]*big.Int{
22+
big.NewInt(3600000000),
23+
big.NewInt(3750000000),
24+
big.NewInt(5200000000),
25+
big.NewInt(5400000000),
26+
big.NewInt(7000000000)}
27+
28+
for i := 0; i < len(gasPrices); i++ {
29+
currentGasPrice := gasPrices[i]
30+
bumpedGasPrice := utils.CalculateGasPriceBumpBasedOnRetry(currentGasPrice, baseBumpPercentage, incrementalRetryPercentage, i)
31+
expectedGasPrice := expectedBumpedGasPrices[i]
32+
33+
if bumpedGasPrice.Cmp(expectedGasPrice) != 0 {
34+
t.Errorf("Bumped gas price does not match expected gas price, expected value %v, got: %v", expectedGasPrice, bumpedGasPrice)
35+
}
36+
}
37+
38+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212

1313
require (
1414
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6
15+
github.com/cenkalti/backoff/v4 v4.3.0
1516
github.com/consensys/gnark v0.10.0
1617
github.com/consensys/gnark-crypto v0.12.2-0.20240215234832-d72fcb379d3e
1718
github.com/fxamacker/cbor/v2 v2.7.0

0 commit comments

Comments
 (0)