Skip to content

Commit f760fff

Browse files
authored
Merge branch 'staging' into fix/batcher_user_balance_checks
2 parents ed3e589 + 2c45fb2 commit f760fff

File tree

66 files changed

+1935
-2358
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+1935
-2358
lines changed

.github/workflows/build-and-test-go.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ jobs:
2424
with:
2525
go-version: "1.23"
2626
cache: false
27+
- name: foundry-toolchain
28+
uses: foundry-rs/[email protected]
2729
- name: Build SP1 bindings
2830
run: make build_sp1_linux
2931
- name: Build Old SP1 bindings

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,5 @@ volume
1414
config-files/*.last_processed_batch.json
1515

1616
nonce_*.bin
17+
18+
infra/ansible/playbooks/ini/**.ini

Makefile

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1069,3 +1069,21 @@ setup_local_aligned_all:
10691069

10701070
tmux new-window -t aligned_layer -n telemetry
10711071
tmux send-keys -t aligned_layer:telemetry 'docker compose -f telemetry-docker-compose.yaml down && make telemetry_create_env && make telemetry_run_db && make open_telemetry_start && make telemetry_start' C-m
1072+
1073+
__ANSIBLE__: ## ____
1074+
1075+
ansible_batcher_create_env: ## Create empty variables files for the Batcher deploy
1076+
@cp -n infra/ansible/playbooks/ini/caddy-batcher.ini.example infra/ansible/playbooks/ini/caddy-batcher.ini
1077+
@cp -n infra/ansible/playbooks/ini/config-batcher.ini.example infra/ansible/playbooks/ini/config-batcher.ini
1078+
@cp -n infra/ansible/playbooks/ini/env-batcher.ini.example infra/ansible/playbooks/ini/env-batcher.ini
1079+
@echo "Config files for the Batcher created in infra/ansible/playbooks/ini"
1080+
@echo "Please complete the values and run make ansible_batcher_deploy"
1081+
1082+
ansible_batcher_deploy: ## Deploy the Batcher. Parameters: INVENTORY, KEYSTORE
1083+
@if [ -z "$(INVENTORY)" ] || [ -z "$(KEYSTORE)" ]; then \
1084+
echo "Error: Both INVENTORY and KEYSTORE must be set."; \
1085+
exit 1; \
1086+
fi
1087+
@ansible-playbook infra/ansible/playbooks/batcher.yaml \
1088+
-i $(INVENTORY) \
1089+
-e "keystore_path=$(KEYSTORE)"

aggregator/pkg/aggregator.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/hex"
66
"fmt"
7+
"math/big"
78
"strings"
89
"sync"
910
"time"
@@ -300,7 +301,21 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
300301
"senderAddress", hex.EncodeToString(senderAddress[:]),
301302
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))
302303

303-
txHash, err := agg.avsWriter.SendAggregatedResponse(batchIdentifierHash, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
304+
// This function is a callback that is called when the gas price is bumped on the avsWriter.SendAggregatedResponse
305+
onGasPriceBumped := func(bumpedGasPrice *big.Int) {
306+
agg.metrics.IncBumpedGasPriceForAggregatedResponse()
307+
agg.telemetry.BumpedTaskGasPrice(batchMerkleRoot, bumpedGasPrice.String())
308+
}
309+
receipt, err := agg.avsWriter.SendAggregatedResponse(
310+
batchIdentifierHash,
311+
batchMerkleRoot,
312+
senderAddress,
313+
nonSignerStakesAndSignature,
314+
agg.AggregatorConfig.Aggregator.GasBaseBumpPercentage,
315+
agg.AggregatorConfig.Aggregator.GasBumpIncrementalPercentage,
316+
agg.AggregatorConfig.Aggregator.TimeToWaitBeforeBump,
317+
onGasPriceBumped,
318+
)
304319
if err != nil {
305320
agg.walletMutex.Unlock()
306321
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err)
@@ -311,13 +326,6 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
311326
agg.walletMutex.Unlock()
312327
agg.logger.Infof("- Unlocked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchIdentifierHash[:]))
313328

314-
receipt, err := utils.WaitForTransactionReceipt(
315-
agg.AggregatorConfig.BaseConfig.EthRpcClient, context.Background(), *txHash)
316-
if err != nil {
317-
agg.telemetry.LogTaskError(batchMerkleRoot, err)
318-
return nil, err
319-
}
320-
321329
agg.metrics.IncAggregatedResponses()
322330

323331
return receipt, nil

aggregator/pkg/telemetry.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ type TaskErrorMessage struct {
3030
TaskError string `json:"error"`
3131
}
3232

33+
type TaskGasPriceBumpMessage struct {
34+
MerkleRoot string `json:"merkle_root"`
35+
BumpedGasPrice string `json:"bumped_gas_price"`
36+
}
37+
3338
type TaskSentToEthereumMessage struct {
3439
MerkleRoot string `json:"merkle_root"`
3540
TxHash string `json:"tx_hash"`
@@ -96,6 +101,16 @@ func (t *Telemetry) LogTaskError(batchMerkleRoot [32]byte, taskError error) {
96101
}
97102
}
98103

104+
func (t *Telemetry) BumpedTaskGasPrice(batchMerkleRoot [32]byte, bumpedGasPrice string) {
105+
body := TaskGasPriceBumpMessage{
106+
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
107+
BumpedGasPrice: bumpedGasPrice,
108+
}
109+
if err := t.sendTelemetryMessage("/api/aggregatorTaskGasPriceBump", body); err != nil {
110+
t.logger.Error("[Telemetry] Error in LogOperatorResponse", "error", err)
111+
}
112+
}
113+
99114
func (t *Telemetry) TaskSentToEthereum(batchMerkleRoot [32]byte, txHash string) {
100115
body := TaskSentToEthereumMessage{
101116
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
669 KB
Binary file not shown.
1.99 MB
Binary file not shown.

batcher/aligned-batcher/src/eth/utils.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99
},
1010
};
1111
use aligned_sdk::core::constants::{
12-
DEFAULT_BACKOFF_FACTOR, DEFAULT_MAX_RETRIES, DEFAULT_MIN_RETRY_DELAY,
12+
DEFAULT_BACKOFF_FACTOR, DEFAULT_MAX_RETRIES, DEFAULT_MAX_RETRY_DELAY, DEFAULT_MIN_RETRY_DELAY,
1313
GAS_PRICE_INCREMENT_PERCENTAGE_PER_ITERATION, OVERRIDE_GAS_PRICE_PERCENTAGE_MULTIPLIER,
1414
PERCENTAGE_DIVIDER,
1515
};
@@ -61,6 +61,9 @@ pub fn calculate_bumped_gas_price(
6161
bumped_current_gas_price.max(bumped_previous_gas_price)
6262
}
6363

64+
/// Gets the current nonce from Ethereum.
65+
/// Retries on recoverable errors using exponential backoff up to `DEFAULT_MAX_RETRIES` times:
66+
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
6467
pub async fn get_current_nonce(
6568
eth_http_provider: &Provider<Http>,
6669
eth_http_provider_fallback: &Provider<Http>,
@@ -71,6 +74,7 @@ pub async fn get_current_nonce(
7174
DEFAULT_MIN_RETRY_DELAY,
7275
DEFAULT_BACKOFF_FACTOR,
7376
DEFAULT_MAX_RETRIES,
77+
DEFAULT_MAX_RETRY_DELAY,
7478
)
7579
.await
7680
.map_err(|e| {
@@ -79,7 +83,9 @@ pub async fn get_current_nonce(
7983
})
8084
}
8185

82-
/// Gets the current gas price from Ethereum using exponential backoff.
86+
/// Gets the current gas price from Ethereum.
87+
/// Retries on recoverable errors using exponential backoff up to `DEFAULT_MAX_RETRIES` times:
88+
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
8389
pub async fn get_gas_price(
8490
eth_http_provider: &Provider<Http>,
8591
eth_http_provider_fallback: &Provider<Http>,
@@ -89,6 +95,7 @@ pub async fn get_gas_price(
8995
DEFAULT_MIN_RETRY_DELAY,
9096
DEFAULT_BACKOFF_FACTOR,
9197
DEFAULT_MAX_RETRIES,
98+
DEFAULT_MAX_RETRY_DELAY,
9299
)
93100
.await
94101
.map_err(|e| {

batcher/aligned-batcher/src/lib.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,9 @@ impl Batcher {
271271
Ok(())
272272
}
273273

274+
/// Listen for Ethereum new blocks.
275+
/// Retries on recoverable errors using exponential backoff
276+
/// with the maximum number of retries and a `MAX_DELAY` of 1 hour.
274277
pub async fn listen_new_blocks(self: Arc<Self>) -> Result<(), BatcherError> {
275278
retry_function(
276279
|| {
@@ -280,6 +283,7 @@ impl Batcher {
280283
DEFAULT_MIN_RETRY_DELAY,
281284
DEFAULT_BACKOFF_FACTOR,
282285
LISTEN_NEW_BLOCKS_MAX_TIMES,
286+
DEFAULT_MAX_RETRY_DELAY,
283287
)
284288
.await
285289
.map_err(|e| e.inner())
@@ -916,7 +920,9 @@ impl Batcher {
916920
}
917921
}
918922

919-
/// Gets the user nonce from Ethereum using exponential backoff.
923+
/// Gets the user nonce from Ethereum.
924+
/// Retries on recoverable errors using exponential backoff up to `DEFAULT_MAX_RETRIES` times:
925+
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
920926
async fn get_user_nonce_from_ethereum(
921927
&self,
922928
addr: Address,
@@ -932,6 +938,7 @@ impl Batcher {
932938
DEFAULT_MIN_RETRY_DELAY,
933939
DEFAULT_BACKOFF_FACTOR,
934940
DEFAULT_MAX_RETRIES,
941+
DEFAULT_MAX_RETRY_DELAY,
935942
)
936943
.await
937944
}
@@ -1371,6 +1378,10 @@ impl Batcher {
13711378
}
13721379
}
13731380

1381+
/// Sends a `create_new_task` transaction to Ethereum and waits for a maximum of 3 blocks for the receipt.
1382+
/// Retries up to `DEFAULT_MAX_RETRIES` times using exponential backoff on recoverable errors while trying to send the transaction:
1383+
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
1384+
/// `ReceiptNotFoundError` is treated as non-recoverable, and the transaction will be canceled using `cancel_create_new_task_tx` in that case.
13741385
async fn create_new_task(
13751386
&self,
13761387
batch_merkle_root: [u8; 32],
@@ -1393,6 +1404,7 @@ impl Batcher {
13931404
DEFAULT_MIN_RETRY_DELAY,
13941405
DEFAULT_BACKOFF_FACTOR,
13951406
DEFAULT_MAX_RETRIES,
1407+
DEFAULT_MAX_RETRY_DELAY,
13961408
)
13971409
.await;
13981410
match result {
@@ -1416,11 +1428,10 @@ impl Batcher {
14161428
}
14171429

14181430
/// Sends a transaction to Ethereum with the same nonce as the previous one to override it.
1419-
/// In case of a recoverable error, it will retry with an exponential backoff up to CANCEL_TRANSACTION_MAX_RETRIES times.
1420-
/// A tx not included in 3 blocks will be considered an error, and will trigger a bump of the fee, with the rules on ```calculate_bumped_gas_price```
1421-
/// This will do 5 bumps every 3 blocks, and then the exponential backoff will dominate, doing bumps at 8,13,24,45,89 and so on.
1422-
/// Errors on ```get_gas_price``` calls inside this function are considered transient,
1423-
/// so they won't stop the retries.
1431+
/// Retries on recoverable errors with exponential backoff.
1432+
/// Bumps the fee if not included in 3 blocks, using `calculate_bumped_gas_price`.
1433+
/// In the first 5 attemps, bumps the fee every 3 blocks. Then exponential backoff takes over.
1434+
/// After 2 hours (attempt 13), retries occur hourly for 1 day (33 retries).
14241435
pub async fn cancel_create_new_task_tx(&self, old_tx_gas_price: U256) {
14251436
info!("Cancelling createNewTask transaction...");
14261437
let iteration = Arc::new(Mutex::new(0));
@@ -1458,6 +1469,7 @@ impl Batcher {
14581469
DEFAULT_MIN_RETRY_DELAY,
14591470
DEFAULT_BACKOFF_FACTOR,
14601471
CANCEL_TRANSACTION_MAX_RETRIES,
1472+
DEFAULT_MAX_RETRY_DELAY,
14611473
)
14621474
.await
14631475
{
@@ -1550,7 +1562,9 @@ impl Batcher {
15501562
Ok(())
15511563
}
15521564

1553-
/// Gets the balance of user with address `addr` from Ethereum using exponential backoff.
1565+
/// Gets the balance of user with address `addr` from Ethereum.
1566+
/// Retries on recoverable errors using exponential backoff up to `DEFAULT_MAX_RETRIES` times:
1567+
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs)
15541568
/// Returns `None` if the balance couldn't be returned
15551569
/// FIXME: This should return a `Result` instead.
15561570
async fn get_user_balance(&self, addr: &Address) -> Option<U256> {
@@ -1565,12 +1579,15 @@ impl Batcher {
15651579
DEFAULT_MIN_RETRY_DELAY,
15661580
DEFAULT_BACKOFF_FACTOR,
15671581
DEFAULT_MAX_RETRIES,
1582+
DEFAULT_MAX_RETRY_DELAY,
15681583
)
15691584
.await
15701585
.ok()
15711586
}
15721587

1573-
/// Checks if the user's balance is unlocked for a given address using exponential backoff.
1588+
/// Checks if the user's balance is unlocked for a given address.
1589+
/// Retries on recoverable errors using exponential backoff up to `DEFAULT_MAX_RETRIES` times:
1590+
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
15741591
/// Returns `false` if an error occurs during the retries.
15751592
async fn user_balance_is_unlocked(&self, addr: &Address) -> bool {
15761593
let Ok(unlocked) = retry_function(
@@ -1584,6 +1601,7 @@ impl Batcher {
15841601
DEFAULT_MIN_RETRY_DELAY,
15851602
DEFAULT_BACKOFF_FACTOR,
15861603
DEFAULT_MAX_RETRIES,
1604+
DEFAULT_MAX_RETRY_DELAY,
15871605
)
15881606
.await
15891607
else {
@@ -1593,7 +1611,9 @@ impl Batcher {
15931611
unlocked
15941612
}
15951613

1596-
/// Uploads the batch to s3 using exponential backoff.
1614+
/// Uploads the batch to s3.
1615+
/// Retries on recoverable errors using exponential backoff up to `DEFAULT_MAX_RETRIES` times:
1616+
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
15971617
async fn upload_batch_to_s3(
15981618
&self,
15991619
batch_bytes: &[u8],
@@ -1611,6 +1631,7 @@ impl Batcher {
16111631
DEFAULT_MIN_RETRY_DELAY,
16121632
DEFAULT_BACKOFF_FACTOR,
16131633
DEFAULT_MAX_RETRIES,
1634+
DEFAULT_MAX_RETRY_DELAY,
16141635
)
16151636
.await
16161637
.map_err(|e| BatcherError::BatchUploadError(e.to_string()))

batcher/aligned-batcher/src/retry/batcher_retryables.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ pub async fn create_new_task_retryable(
170170
})?
171171
.map_err(|e| {
172172
warn!("Error while waiting for batch inclusion: {e}");
173-
RetryError::Transient(BatcherError::TransactionSendError(e.to_string()))
173+
RetryError::Permanent(BatcherError::ReceiptNotFoundError)
174174
})?
175175
.ok_or(RetryError::Permanent(BatcherError::ReceiptNotFoundError))
176176
}

0 commit comments

Comments
 (0)