Skip to content

Commit fa1b39f

Browse files
author
Julian Ventura
committed
Refactor with retries
1 parent 831dee5 commit fa1b39f

File tree

4 files changed

+104
-78
lines changed

4 files changed

+104
-78
lines changed

aggregator/pkg/server.go

Lines changed: 28 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@ import (
44
"context"
55
"encoding/hex"
66
"fmt"
7-
"net/http"
8-
"net/rpc"
9-
"strings"
10-
"time"
11-
127
"github.com/Layr-Labs/eigensdk-go/crypto/bls"
138
eigentypes "github.com/Layr-Labs/eigensdk-go/types"
149
retry "github.com/yetanotherco/aligned_layer/core"
1510
"github.com/yetanotherco/aligned_layer/core/types"
11+
"net/http"
12+
"net/rpc"
13+
"strings"
14+
"time"
1615
)
1716

1817
func (agg *Aggregator) ServeOperators() error {
@@ -37,53 +36,13 @@ func (agg *Aggregator) ServeOperators() error {
3736
return err
3837
}
3938

40-
// Waits for the arrival of task associated with signedTaskResponse and returns true on success or false on failure
41-
// If the task is not present in the internal map, it will try to fetch it from logs and retry.
42-
// The number of retries is specified by `waitForEventRetries`, and the waiting time between each by `waitForEventSleepSeconds`
43-
func (agg *Aggregator) waitForTaskAndFetchIfLost(signedTaskResponse *types.SignedTaskResponse) bool {
44-
for i := 0; i < waitForEventRetries; i++ {
45-
// Lock
46-
agg.taskMutex.Lock()
47-
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Check if task is present")
48-
_, ok := agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash]
49-
// Unlock
50-
agg.logger.Info("- Unlocked Resources: Check if task is present")
51-
agg.taskMutex.Unlock()
52-
if ok {
53-
return true
54-
}
55-
56-
// Task was not found in internal map, let's try to fetch it from logs
57-
agg.logger.Info("Trying to fetch missed task from logs...")
58-
batch, err := agg.avsReader.GetPendingBatchFromMerkleRoot(signedTaskResponse.BatchMerkleRoot)
59-
60-
if err == nil && batch != nil {
61-
agg.logger.Info("Found missed task in logs with merkle root 0x%e", batch.BatchMerkleRoot)
62-
// Adding new task will fail only if it already exists
63-
agg.AddNewTask(batch.BatchMerkleRoot, batch.SenderAddress, batch.TaskCreatedBlock)
64-
return true
65-
}
66-
67-
if err != nil {
68-
agg.logger.Warn("Error fetching task from logs: %v", err)
69-
}
70-
71-
if batch == nil {
72-
agg.logger.Info("Task not found in logs")
73-
}
74-
75-
// Task was not found, wait and retry
76-
time.Sleep(waitForEventSleepSeconds)
77-
}
78-
79-
return false
80-
}
81-
82-
// Aggregator Methods
39+
// ~~ AGGREGATOR METHODS ~~
8340
// This is the list of methods that the Aggregator exposes to the Operator
8441
// The Operator can call these methods to interact with the Aggregator
8542
// This methods are automatically registered by the RPC server
86-
// This takes a response an adds it to the internal. If reaching the quorum, it sends the aggregated signatures to ethereum
43+
44+
// Takes a response from an operator and process it. After processing the response, the associated task may reach quorum, triggering a BLS service response.
45+
// If the task related to the response is not known to the aggregator (not stored in internal map), it will try to fetch it from the logs.
8746
// Returns:
8847
// - 0: Success
8948
// - 1: Error
@@ -93,25 +52,26 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
9352
"SenderAddress", "0x"+hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
9453
"BatchIdentifierHash", "0x"+hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]),
9554
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))
96-
taskIndex := uint32(0)
55+
9756
taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash)
9857

9958
if err != nil {
100-
agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum")
101-
*reply = 1
102-
return nil
103-
}
104-
105-
agg.taskMutex.Lock()
106-
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Get task taskIndex")
107-
taskIndex, ok := agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash]
108-
// Unlock
109-
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Get task taskIndex")
110-
agg.taskMutex.Unlock()
111-
if !ok {
112-
agg.logger.Errorf("Unexpected error fetching for task with merkle root 0x%x", signedTaskResponse.BatchMerkleRoot)
113-
*reply = 1
114-
return nil
59+
agg.logger.Warn("Task not found in the internal map, might have been missed. Trying to fetch it from logs")
60+
batch, err := agg.avsReader.GetPendingBatchFromMerkleRoot(signedTaskResponse.BatchMerkleRoot)
61+
if err != nil {
62+
agg.logger.Warn("Pending task with merkle root 0x%x not found in logs")
63+
*reply = 1
64+
return nil
65+
}
66+
agg.logger.Info("Task was found in the logs, adding it to the internal map")
67+
agg.AddNewTask(batch.BatchMerkleRoot, batch.SenderAddress, batch.TaskCreatedBlock)
68+
taskIndex, err = agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash)
69+
if err != nil {
70+
// This shouldn't happen, since we just added the task
71+
agg.logger.Error("Unexpected error trying to get taskIndex from internal map")
72+
*reply = 1
73+
return nil
74+
}
11575
}
11676

11777
agg.telemetry.LogOperatorResponse(signedTaskResponse.BatchMerkleRoot, signedTaskResponse.OperatorId)
@@ -194,11 +154,11 @@ func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32
194154
func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) {
195155
getTaskIndex_func := func() (uint32, error) {
196156
agg.taskMutex.Lock()
197-
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response")
157+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Get task index")
198158
taskIndex, ok := agg.batchesIdxByIdentifierHash[batchIdentifierHash]
159+
agg.taskMutex.Unlock()
160+
agg.logger.Info("- Unlocked Resources: Get task index")
199161
if !ok {
200-
agg.taskMutex.Unlock()
201-
agg.logger.Info("- Unlocked Resources: Task not found in the internal map")
202162
return taskIndex, fmt.Errorf("Task not found in the internal map")
203163
} else {
204164
return taskIndex, nil

core/chainio/avs_reader.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -158,21 +158,18 @@ func (r *AvsReader) GetOldTaskHash(nBlocksOld uint64, interval uint64) (*[32]byt
158158
// Returns a pending batch from its merkle root or nil if it doesn't exist
159159
// Searches the last `BatchFetchBlocksRange` blocks at most
160160
func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
161-
latestBlock, err := r.AvsContractBindings.ethClient.BlockNumber(context.Background())
161+
latestBlock, err := r.BlockNumberRetryable(context.Background())
162162
if err != nil {
163-
latestBlock, err = r.AvsContractBindings.ethClientFallback.BlockNumber(context.Background())
164-
if err != nil {
165-
return nil, fmt.Errorf("failed to get latest block number: %w", err)
166-
}
163+
return nil, fmt.Errorf("Failed to get latest block number: %w", err)
167164
}
168165

169166
var fromBlock uint64 = 0
170167

171168
if latestBlock > BatchFetchBlocksRange {
172-
fromBlock = latestBlock - BatchFetchBlocksRange
169+
fromBlock = latestBlock - BatchFetchBlocksRange // TODO: Add this to config
173170
}
174171

175-
logs, err := r.AvsContractBindings.ServiceManager.FilterNewBatchV3(&bind.FilterOpts{Start: fromBlock, End: &latestBlock, Context: context.Background()}, [][32]byte{merkleRoot})
172+
logs, err := r.FilterBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: &latestBlock, Context: context.Background()}, [][32]byte{merkleRoot})
176173
if err != nil {
177174
return nil, err
178175
}
@@ -181,17 +178,19 @@ func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte) (*service
181178
}
182179

183180
if !logs.Next() {
184-
return nil, nil //not an error, but no tasks found
181+
return nil, nil //Not an error, but no tasks found
185182
}
186183

187184
batch := logs.Event
188185

189186
batchIdentifier := append(batch.BatchMerkleRoot[:], batch.SenderAddress[:]...)
190187
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
191-
state, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(nil, batchIdentifierHash)
188+
state, err := r.BatchesStateRetryable(nil, batchIdentifierHash)
189+
192190
if err != nil {
193191
return nil, err
194192
}
193+
195194
if state.Responded {
196195
return nil, nil
197196
}

core/chainio/retryable.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,70 @@ func SubscribeToNewTasksV3Retryable(
202202
}
203203
return retry.RetryWithData(subscribe_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
204204
}
205+
206+
// |---AVS_READER---|
207+
208+
// TODO: These functions are being copied from AvsSubscriber and should be refactorized
209+
// we don't actually need access to the AvsReader, AvsSubscriber or AbsWriter, but instead to the AvsContractBindings
210+
211+
// TODO: We should also add the fallback calls to the functions which are missing it
212+
213+
/*
214+
- All errors are considered Transient Errors
215+
- Retry times (3 retries): 1 sec, 2 sec, 4 sec.
216+
*/
217+
func (r *AvsReader) BlockNumberRetryable(ctx context.Context) (uint64, error) {
218+
latestBlock_func := func() (uint64, error) {
219+
// Try with main connection
220+
latestBlock, err := r.AvsContractBindings.ethClient.BlockNumber(ctx)
221+
if err != nil {
222+
// If error try with fallback connection
223+
latestBlock, err = r.AvsContractBindings.ethClientFallback.BlockNumber(ctx)
224+
}
225+
return latestBlock, err
226+
}
227+
return retry.RetryWithData(latestBlock_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
228+
}
229+
230+
/*
231+
- All errors are considered Transient Errors
232+
- Retry times (3 retries): 1 sec, 2 sec, 4 sec.
233+
*/
234+
func (r *AvsReader) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) {
235+
filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) {
236+
// Try with main connection
237+
batch, err := r.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot)
238+
if err != nil {
239+
// If error try with fallback connection
240+
batch, err = r.AvsContractBindings.ServiceManagerFallback.FilterNewBatchV3(opts, batchMerkleRoot)
241+
}
242+
return batch, err
243+
}
244+
return retry.RetryWithData(filterNewBatchV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
245+
}
246+
247+
/*
248+
- All errors are considered Transient Errors
249+
- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks)
250+
*/
251+
func (r *AvsReader) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct {
252+
TaskCreatedBlock uint32
253+
Responded bool
254+
RespondToTaskFeeLimit *big.Int
255+
}, error) {
256+
batchState_func := func() (struct {
257+
TaskCreatedBlock uint32
258+
Responded bool
259+
RespondToTaskFeeLimit *big.Int
260+
}, error) {
261+
// Try with main connection
262+
state, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0)
263+
if err != nil {
264+
// If error try with fallback connection
265+
state, err = r.AvsContractBindings.ServiceManagerFallback.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0)
266+
}
267+
return state, err
268+
}
269+
270+
return retry.RetryWithData(batchState_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime)
271+
}

explorer/mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
"floki": {:hex, :floki, "0.36.2", "a7da0193538c93f937714a6704369711998a51a6164a222d710ebd54020aa7a3", [:mix], [], "hexpm", "a8766c0bc92f074e5cb36c4f9961982eda84c5d2b8e979ca67f5c268ec8ed580"},
2424
"gettext": {:hex, :gettext, "0.24.0", "6f4d90ac5f3111673cbefc4ebee96fe5f37a114861ab8c7b7d5b30a1108ce6d8", [:mix], [{:expo, "~> 0.5.1", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "bdf75cdfcbe9e4622dd18e034b227d77dd17f0f133853a1c73b97b3d6c770e8b"},
2525
"hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"},
26-
"heroicons": {:git, "https://github.com/tailwindlabs/heroicons.git", "88ab3a0d790e6a47404cba02800a6b25d2afae50", [tag: "v2.1.1", sparse: "optimized"]},
26+
"heroicons": {:git, "https://github.com/tailwindlabs/heroicons.git", "88ab3a0d790e6a47404cba02800a6b25d2afae50", [tag: "v2.1.1", sparse: "optimized", depth: 1]},
2727
"hpax": {:hex, :hpax, "0.2.0", "5a58219adcb75977b2edce5eb22051de9362f08236220c9e859a47111c194ff5", [:mix], [], "hexpm", "bea06558cdae85bed075e6c036993d43cd54d447f76d8190a8db0dc5893fa2f1"},
2828
"httpoison": {:hex, :httpoison, "2.2.1", "87b7ed6d95db0389f7df02779644171d7319d319178f6680438167d7b69b1f3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "51364e6d2f429d80e14fe4b5f8e39719cacd03eb3f9a9286e61e216feac2d2df"},
2929
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},

0 commit comments

Comments
 (0)