Skip to content

Commit 99c6c88

Browse files
author
Julian Ventura
committed
Merge branch 'staging' into fix/aggregator-recover-lost-batches
2 parents 9072d8b + ba24eba commit 99c6c88

File tree

61 files changed

+1892
-2347
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1892
-2347
lines changed

.github/workflows/build-and-test-go.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ jobs:
2424
with:
2525
go-version: "1.23"
2626
cache: false
27+
- name: foundry-toolchain
28+
uses: foundry-rs/[email protected]
2729
- name: Build SP1 bindings
2830
run: make build_sp1_linux
2931
- name: Build Old SP1 bindings

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,5 @@ volume
1414
config-files/*.last_processed_batch.json
1515

1616
nonce_*.bin
17+
18+
infra/ansible/playbooks/ini/**.ini

Makefile

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1069,3 +1069,21 @@ setup_local_aligned_all:
10691069

10701070
tmux new-window -t aligned_layer -n telemetry
10711071
tmux send-keys -t aligned_layer:telemetry 'docker compose -f telemetry-docker-compose.yaml down && make telemetry_create_env && make telemetry_run_db && make open_telemetry_start && make telemetry_start' C-m
1072+
1073+
__ANSIBLE__: ## ____
1074+
1075+
ansible_batcher_create_env: ## Create empty variables files for the Batcher deploy
1076+
@cp -n infra/ansible/playbooks/ini/caddy-batcher.ini.example infra/ansible/playbooks/ini/caddy-batcher.ini
1077+
@cp -n infra/ansible/playbooks/ini/config-batcher.ini.example infra/ansible/playbooks/ini/config-batcher.ini
1078+
@cp -n infra/ansible/playbooks/ini/env-batcher.ini.example infra/ansible/playbooks/ini/env-batcher.ini
1079+
@echo "Config files for the Batcher created in infra/ansible/playbooks/ini"
1080+
@echo "Please complete the values and run make ansible_batcher_deploy"
1081+
1082+
ansible_batcher_deploy: ## Deploy the Batcher. Parameters: INVENTORY, KEYSTORE
1083+
@if [ -z "$(INVENTORY)" ] || [ -z "$(KEYSTORE)" ]; then \
1084+
echo "Error: Both INVENTORY and KEYSTORE must be set."; \
1085+
exit 1; \
1086+
fi
1087+
@ansible-playbook infra/ansible/playbooks/batcher.yaml \
1088+
-i $(INVENTORY) \
1089+
-e "keystore_path=$(KEYSTORE)"

aggregator/pkg/aggregator.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/hex"
66
"fmt"
7+
"math/big"
78
"strings"
89
"sync"
910
"time"
@@ -300,7 +301,21 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
300301
"senderAddress", hex.EncodeToString(senderAddress[:]),
301302
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))
302303

303-
txHash, err := agg.avsWriter.SendAggregatedResponse(batchIdentifierHash, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
304+
// This function is a callback that is called when the gas price is bumped on the avsWriter.SendAggregatedResponse
305+
onGasPriceBumped := func(bumpedGasPrice *big.Int) {
306+
agg.metrics.IncBumpedGasPriceForAggregatedResponse()
307+
agg.telemetry.BumpedTaskGasPrice(batchMerkleRoot, bumpedGasPrice.String())
308+
}
309+
receipt, err := agg.avsWriter.SendAggregatedResponse(
310+
batchIdentifierHash,
311+
batchMerkleRoot,
312+
senderAddress,
313+
nonSignerStakesAndSignature,
314+
agg.AggregatorConfig.Aggregator.GasBaseBumpPercentage,
315+
agg.AggregatorConfig.Aggregator.GasBumpIncrementalPercentage,
316+
agg.AggregatorConfig.Aggregator.TimeToWaitBeforeBump,
317+
onGasPriceBumped,
318+
)
304319
if err != nil {
305320
agg.walletMutex.Unlock()
306321
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err)
@@ -311,13 +326,6 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
311326
agg.walletMutex.Unlock()
312327
agg.logger.Infof("- Unlocked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchIdentifierHash[:]))
313328

314-
receipt, err := utils.WaitForTransactionReceipt(
315-
agg.AggregatorConfig.BaseConfig.EthRpcClient, context.Background(), *txHash)
316-
if err != nil {
317-
agg.telemetry.LogTaskError(batchMerkleRoot, err)
318-
return nil, err
319-
}
320-
321329
agg.metrics.IncAggregatedResponses()
322330

323331
return receipt, nil

aggregator/pkg/telemetry.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ type TaskErrorMessage struct {
3030
TaskError string `json:"error"`
3131
}
3232

33+
type TaskGasPriceBumpMessage struct {
34+
MerkleRoot string `json:"merkle_root"`
35+
BumpedGasPrice string `json:"bumped_gas_price"`
36+
}
37+
3338
type TaskSentToEthereumMessage struct {
3439
MerkleRoot string `json:"merkle_root"`
3540
TxHash string `json:"tx_hash"`
@@ -96,6 +101,16 @@ func (t *Telemetry) LogTaskError(batchMerkleRoot [32]byte, taskError error) {
96101
}
97102
}
98103

104+
func (t *Telemetry) BumpedTaskGasPrice(batchMerkleRoot [32]byte, bumpedGasPrice string) {
105+
body := TaskGasPriceBumpMessage{
106+
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
107+
BumpedGasPrice: bumpedGasPrice,
108+
}
109+
if err := t.sendTelemetryMessage("/api/aggregatorTaskGasPriceBump", body); err != nil {
110+
t.logger.Error("[Telemetry] Error in LogOperatorResponse", "error", err)
111+
}
112+
}
113+
99114
func (t *Telemetry) TaskSentToEthereum(batchMerkleRoot [32]byte, txHash string) {
100115
body := TaskSentToEthereumMessage{
101116
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
669 KB
Binary file not shown.
1.99 MB
Binary file not shown.

config-files/config-aggregator.yaml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,14 @@ aggregator:
3434
enable_metrics: true
3535
metrics_ip_port_address: localhost:9091
3636
telemetry_ip_port_address: localhost:4001
37-
garbage_collector_period: 2m #The period of the GC process. Suggested value for Prod: '168h' (7 days)
38-
garbage_collector_tasks_age: 20 #The age of tasks that will be removed by the GC, in blocks. Suggested value for prod: '216000' (30 days)
39-
garbage_collector_tasks_interval: 10 #The interval of queried blocks to get an old batch. Suggested value for prod: '900' (3 hours)
37+
garbage_collector_period: 2m # The period of the GC process. Suggested value for Prod: '168h' (7 days)
38+
garbage_collector_tasks_age: 20 # The age of tasks that will be removed by the GC, in blocks. Suggested value for prod: '216000' (30 days)
39+
garbage_collector_tasks_interval: 10 # The interval of queried blocks to get an old batch. Suggested value for prod: '900' (3 hours)
4040
bls_service_task_timeout: 168h # The timeout of bls aggregation service tasks. Suggested value for prod '168h' (7 days)
41-
pending_batch_fetch_block_range: 1000 #The interval of queried blocks to get a pending batch by logs. Suggested valued for prod `1000`
41+
pending_batch_fetch_block_range: 1000 # The interval of queried blocks to get a pending batch by logs. Suggested valued for prod `1000`
42+
gas_base_bump_percentage: 10 # How much to bump gas price when responding to task. Suggested value 10%
43+
gas_bump_incremental_percentage: 2 # An extra percentage to bump every retry i*2 when responding to task. Suggested value 2%
44+
time_to_wait_before_bump: 36s # The time to wait for the receipt when responding to task. Suggested value 36 seconds (3 blocks)
4245

4346
## Operator Configurations
4447
# operator:

core/chainio/avs_writer.go

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ import (
1515
"github.com/ethereum/go-ethereum/common"
1616
"github.com/ethereum/go-ethereum/core/types"
1717
servicemanager "github.com/yetanotherco/aligned_layer/contracts/bindings/AlignedLayerServiceManager"
18+
retry "github.com/yetanotherco/aligned_layer/core"
1819
"github.com/yetanotherco/aligned_layer/core/config"
20+
"github.com/yetanotherco/aligned_layer/core/utils"
1921
)
2022

2123
type AvsWriter struct {
@@ -70,7 +72,10 @@ func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E
7072
}, nil
7173
}
7274

73-
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*common.Hash, error) {
75+
// Sends AggregatedResponse and waits for the receipt for three blocks, if not received
76+
// it will try again bumping the last tx gas price based on `CalculateGasPriceBump`
77+
// This process happens indefinitely until the transaction is included.
78+
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, timeToWaitBeforeBump time.Duration, onGasPriceBumped func(*big.Int)) (*types.Receipt, error) {
7479
txOpts := *w.Signer.GetTxOpts()
7580
txOpts.NoSend = true // simulate the transaction
7681
tx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
@@ -83,17 +88,61 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
8388
return nil, err
8489
}
8590

86-
// Send the transaction
91+
// Set the nonce, as we might have to replace the transaction with a higher gas price
92+
txNonce := big.NewInt(int64(tx.Nonce()))
93+
txOpts.Nonce = txNonce
94+
txOpts.GasPrice = tx.GasPrice()
8795
txOpts.NoSend = false
88-
txOpts.GasLimit = tx.Gas() * 110 / 100 // Add 10% to the gas limit
89-
tx, err = w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
90-
if err != nil {
91-
return nil, err
96+
i := 0
97+
98+
respondToTaskV2Func := func() (*types.Receipt, error) {
99+
gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback)
100+
if err != nil {
101+
return nil, err
102+
}
103+
104+
bumpedGasPrice := utils.CalculateGasPriceBumpBasedOnRetry(gasPrice, gasBumpPercentage, gasBumpIncrementalPercentage, i)
105+
// new bumped gas price must be higher than the last one (this should hardly ever happen though)
106+
if bumpedGasPrice.Cmp(txOpts.GasPrice) > 0 {
107+
txOpts.GasPrice = bumpedGasPrice
108+
} else {
109+
// bump the last tx gas price a little by `gasBumpIncrementalPercentage` to replace it.
110+
txOpts.GasPrice = utils.CalculateGasPriceBumpBasedOnRetry(txOpts.GasPrice, gasBumpIncrementalPercentage, 0, 0)
111+
}
112+
113+
if i > 0 {
114+
onGasPriceBumped(txOpts.GasPrice)
115+
}
116+
117+
err = w.checkRespondToTaskFeeLimit(tx, txOpts, batchIdentifierHash, senderAddress)
118+
if err != nil {
119+
return nil, retry.PermanentError{Inner: err}
120+
}
121+
122+
w.logger.Infof("Sending RespondToTask transaction with a gas price of %v", txOpts.GasPrice)
123+
124+
tx, err = w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
125+
if err != nil {
126+
return nil, err
127+
}
128+
129+
receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, tx.Hash(), timeToWaitBeforeBump)
130+
if receipt != nil {
131+
return receipt, nil
132+
}
133+
134+
// if we are here, it means we have reached the receipt waiting timeout
135+
// we increment the i here to add an incremental percentage to increase the odds of being included in the next blocks
136+
i++
137+
138+
w.logger.Infof("RespondToTask receipt waiting timeout has passed, will try again...")
139+
if err != nil {
140+
return nil, err
141+
}
142+
return nil, fmt.Errorf("transaction failed")
92143
}
93144

94-
txHash := tx.Hash()
95-
96-
return &txHash, nil
145+
return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, retry.MaxInterval, 0)
97146
}
98147

99148
func (w *AvsWriter) checkRespondToTaskFeeLimit(tx *types.Transaction, txOpts bind.TransactOpts, batchIdentifierHash [32]byte, senderAddress [20]byte) error {

core/config/aggregator.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ type AggregatorConfig struct {
2626
GarbageCollectorTasksInterval uint64
2727
BlsServiceTaskTimeout time.Duration
2828
PendingBatchFetchBlockRange uint64
29+
GasBaseBumpPercentage uint
30+
GasBumpIncrementalPercentage uint
31+
TimeToWaitBeforeBump time.Duration
2932
}
3033
}
3134

@@ -42,6 +45,9 @@ type AggregatorConfigFromYaml struct {
4245
GarbageCollectorTasksInterval uint64 `yaml:"garbage_collector_tasks_interval"`
4346
BlsServiceTaskTimeout time.Duration `yaml:"bls_service_task_timeout"`
4447
PendingBatchFetchBlockRange uint64 `yaml:"pending_batch_fetch_block_range"`
48+
GasBaseBumpPercentage uint `yaml:"gas_base_bump_percentage"`
49+
GasBumpIncrementalPercentage uint `yaml:"gas_bump_incremental_percentage"`
50+
TimeToWaitBeforeBump time.Duration `yaml:"time_to_wait_before_bump"`
4551
} `yaml:"aggregator"`
4652
}
4753

@@ -88,6 +94,9 @@ func NewAggregatorConfig(configFilePath string) *AggregatorConfig {
8894
GarbageCollectorTasksInterval uint64
8995
BlsServiceTaskTimeout time.Duration
9096
PendingBatchFetchBlockRange uint64
97+
GasBaseBumpPercentage uint
98+
GasBumpIncrementalPercentage uint
99+
TimeToWaitBeforeBump time.Duration
91100
}(aggregatorConfigFromYaml.Aggregator),
92101
}
93102
}

0 commit comments

Comments
 (0)