Skip to content

Commit 9c964bb

Browse files
author
Julian Ventura
committed
Refactor with retries
1 parent 27bf9c1 commit 9c964bb

File tree

3 files changed

+102
-76
lines changed

3 files changed

+102
-76
lines changed

aggregator/pkg/server.go

Lines changed: 27 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@ import (
44
"context"
55
"encoding/hex"
66
"fmt"
7-
"net/http"
8-
"net/rpc"
9-
"strings"
10-
"time"
11-
127
"github.com/Layr-Labs/eigensdk-go/crypto/bls"
138
eigentypes "github.com/Layr-Labs/eigensdk-go/types"
149
retry "github.com/yetanotherco/aligned_layer/core"
1510
"github.com/yetanotherco/aligned_layer/core/types"
11+
"net/http"
12+
"net/rpc"
13+
"strings"
14+
"time"
1615
)
1716

1817
func (agg *Aggregator) ServeOperators() error {
@@ -37,53 +36,13 @@ func (agg *Aggregator) ServeOperators() error {
3736
return err
3837
}
3938

40-
// Waits for the arrival of task associated with signedTaskResponse and returns true on success or false on failure
41-
// If the task is not present in the internal map, it will try to fetch it from logs and retry.
42-
// The number of retries is specified by `waitForEventRetries`, and the waiting time between each by `waitForEventSleepSeconds`
43-
func (agg *Aggregator) waitForTaskAndFetchIfLost(signedTaskResponse *types.SignedTaskResponse) bool {
44-
for i := 0; i < waitForEventRetries; i++ {
45-
// Lock
46-
agg.taskMutex.Lock()
47-
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Check if task is present")
48-
_, ok := agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash]
49-
// Unlock
50-
agg.logger.Info("- Unlocked Resources: Check if task is present")
51-
agg.taskMutex.Unlock()
52-
if ok {
53-
return true
54-
}
55-
56-
// Task was not found in internal map, let's try to fetch it from logs
57-
agg.logger.Info("Trying to fetch missed task from logs...")
58-
batch, err := agg.avsReader.GetPendingBatchFromMerkleRoot(signedTaskResponse.BatchMerkleRoot)
59-
60-
if err == nil && batch != nil {
61-
agg.logger.Info("Found missed task in logs with merkle root 0x%e", batch.BatchMerkleRoot)
62-
// Adding new task will fail only if it already exists
63-
agg.AddNewTask(batch.BatchMerkleRoot, batch.SenderAddress, batch.TaskCreatedBlock)
64-
return true
65-
}
66-
67-
if err != nil {
68-
agg.logger.Warn("Error fetching task from logs: %v", err)
69-
}
70-
71-
if batch == nil {
72-
agg.logger.Info("Task not found in logs")
73-
}
74-
75-
// Task was not found, wait and retry
76-
time.Sleep(waitForEventSleepSeconds)
77-
}
78-
79-
return false
80-
}
81-
82-
// Aggregator Methods
39+
// ~~ AGGREGATOR METHODS ~~
8340
// This is the list of methods that the Aggregator exposes to the Operator
8441
// The Operator can call these methods to interact with the Aggregator
8542
// This methods are automatically registered by the RPC server
86-
// This takes a response an adds it to the internal. If reaching the quorum, it sends the aggregated signatures to ethereum
43+
44+
// Takes a response from an operator and process it. After processing the response, the associated task may reach quorum, triggering a BLS service response.
45+
// If the task related to the response is not known to the aggregator (not stored in internal map), it will try to fetch it from the logs.
8746
// Returns:
8847
// - 0: Success
8948
// - 1: Error
@@ -97,21 +56,22 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
9756
taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash)
9857

9958
if err != nil {
100-
agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum")
101-
*reply = 1
102-
return nil
103-
}
104-
105-
agg.taskMutex.Lock()
106-
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Get task taskIndex")
107-
taskIndex, ok := agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchIdentifierHash]
108-
// Unlock
109-
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Get task taskIndex")
110-
agg.taskMutex.Unlock()
111-
if !ok {
112-
agg.logger.Errorf("Unexpected error fetching for task with merkle root 0x%x", signedTaskResponse.BatchMerkleRoot)
113-
*reply = 1
114-
return nil
59+
agg.logger.Warn("Task not found in the internal map, might have been missed. Trying to fetch it from logs")
60+
batch, err := agg.avsReader.GetPendingBatchFromMerkleRoot(signedTaskResponse.BatchMerkleRoot)
61+
if err != nil {
62+
agg.logger.Warn("Pending task with merkle root 0x%x not found in logs")
63+
*reply = 1
64+
return nil
65+
}
66+
agg.logger.Info("Task was found in the logs, adding it to the internal map")
67+
agg.AddNewTask(batch.BatchMerkleRoot, batch.SenderAddress, batch.TaskCreatedBlock)
68+
taskIndex, err = agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash)
69+
if err != nil {
70+
// This shouldn't happen, since we just added the task
71+
agg.logger.Error("Unexpected error trying to get taskIndex from internal map")
72+
*reply = 1
73+
return nil
74+
}
11575
}
11676

11777
agg.telemetry.LogOperatorResponse(signedTaskResponse.BatchMerkleRoot, signedTaskResponse.OperatorId)
@@ -194,11 +154,11 @@ func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32
194154
func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) {
195155
getTaskIndex_func := func() (uint32, error) {
196156
agg.taskMutex.Lock()
197-
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response")
157+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Get task index")
198158
taskIndex, ok := agg.batchesIdxByIdentifierHash[batchIdentifierHash]
159+
agg.taskMutex.Unlock()
160+
agg.logger.Info("- Unlocked Resources: Get task index")
199161
if !ok {
200-
agg.taskMutex.Unlock()
201-
agg.logger.Info("- Unlocked Resources: Task not found in the internal map")
202162
return taskIndex, fmt.Errorf("Task not found in the internal map")
203163
} else {
204164
return taskIndex, nil

core/chainio/avs_reader.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -158,21 +158,18 @@ func (r *AvsReader) GetOldTaskHash(nBlocksOld uint64, interval uint64) (*[32]byt
158158
// Returns a pending batch from its merkle root or nil if it doesn't exist
159159
// Searches the last `BatchFetchBlocksRange` blocks at most
160160
func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
161-
latestBlock, err := r.AvsContractBindings.ethClient.BlockNumber(context.Background())
161+
latestBlock, err := r.BlockNumberRetryable(context.Background())
162162
if err != nil {
163-
latestBlock, err = r.AvsContractBindings.ethClientFallback.BlockNumber(context.Background())
164-
if err != nil {
165-
return nil, fmt.Errorf("failed to get latest block number: %w", err)
166-
}
163+
return nil, fmt.Errorf("Failed to get latest block number: %w", err)
167164
}
168165

169166
var fromBlock uint64 = 0
170167

171168
if latestBlock > BatchFetchBlocksRange {
172-
fromBlock = latestBlock - BatchFetchBlocksRange
169+
fromBlock = latestBlock - BatchFetchBlocksRange // TODO: Add this to config
173170
}
174171

175-
logs, err := r.AvsContractBindings.ServiceManager.FilterNewBatchV3(&bind.FilterOpts{Start: fromBlock, End: &latestBlock, Context: context.Background()}, [][32]byte{merkleRoot})
172+
logs, err := r.FilterBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: &latestBlock, Context: context.Background()}, [][32]byte{merkleRoot})
176173
if err != nil {
177174
return nil, err
178175
}
@@ -181,17 +178,19 @@ func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte) (*service
181178
}
182179

183180
if !logs.Next() {
184-
return nil, nil //not an error, but no tasks found
181+
return nil, nil //Not an error, but no tasks found
185182
}
186183

187184
batch := logs.Event
188185

189186
batchIdentifier := append(batch.BatchMerkleRoot[:], batch.SenderAddress[:]...)
190187
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
191-
state, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(nil, batchIdentifierHash)
188+
state, err := r.BatchesStateRetryable(nil, batchIdentifierHash)
189+
192190
if err != nil {
193191
return nil, err
194192
}
193+
195194
if state.Responded {
196195
return nil, nil
197196
}

core/chainio/retryable.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,70 @@ func SubscribeToNewTasksV3Retryable(
202202
}
203203
return retry.RetryWithData(subscribe_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
204204
}
205+
206+
// |---AVS_READER---|
207+
208+
// TODO: These functions are being copied from AvsSubscriber and should be refactorized
209+
// we don't actually need access to the AvsReader, AvsSubscriber or AbsWriter, but instead to the AvsContractBindings
210+
211+
// TODO: We should also add the fallback calls to the functions which are missing it
212+
213+
/*
214+
- All errors are considered Transient Errors
215+
- Retry times (3 retries): 1 sec, 2 sec, 4 sec.
216+
*/
217+
func (r *AvsReader) BlockNumberRetryable(ctx context.Context) (uint64, error) {
218+
latestBlock_func := func() (uint64, error) {
219+
// Try with main connection
220+
latestBlock, err := r.AvsContractBindings.ethClient.BlockNumber(ctx)
221+
if err != nil {
222+
// If error try with fallback connection
223+
latestBlock, err = r.AvsContractBindings.ethClientFallback.BlockNumber(ctx)
224+
}
225+
return latestBlock, err
226+
}
227+
return retry.RetryWithData(latestBlock_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
228+
}
229+
230+
/*
231+
- All errors are considered Transient Errors
232+
- Retry times (3 retries): 1 sec, 2 sec, 4 sec.
233+
*/
234+
func (r *AvsReader) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) {
235+
filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) {
236+
// Try with main connection
237+
batch, err := r.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot)
238+
if err != nil {
239+
// If error try with fallback connection
240+
batch, err = r.AvsContractBindings.ServiceManagerFallback.FilterNewBatchV3(opts, batchMerkleRoot)
241+
}
242+
return batch, err
243+
}
244+
return retry.RetryWithData(filterNewBatchV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
245+
}
246+
247+
/*
248+
- All errors are considered Transient Errors
249+
- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks)
250+
*/
251+
func (r *AvsReader) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct {
252+
TaskCreatedBlock uint32
253+
Responded bool
254+
RespondToTaskFeeLimit *big.Int
255+
}, error) {
256+
batchState_func := func() (struct {
257+
TaskCreatedBlock uint32
258+
Responded bool
259+
RespondToTaskFeeLimit *big.Int
260+
}, error) {
261+
// Try with main connection
262+
state, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0)
263+
if err != nil {
264+
// If error try with fallback connection
265+
state, err = r.AvsContractBindings.ServiceManagerFallback.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0)
266+
}
267+
return state, err
268+
}
269+
270+
return retry.RetryWithData(batchState_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime)
271+
}

0 commit comments

Comments
 (0)