diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index d1ebb1f41d..247902c0b3 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -4,12 +4,11 @@ import ( "context" "encoding/hex" "fmt" + retry "github.com/yetanotherco/aligned_layer/core" + "github.com/yetanotherco/aligned_layer/core/types" "net/http" "net/rpc" "time" - - retry "github.com/yetanotherco/aligned_layer/core" - "github.com/yetanotherco/aligned_layer/core/types" ) func (agg *Aggregator) ServeOperators() error { @@ -34,11 +33,13 @@ func (agg *Aggregator) ServeOperators() error { return err } -// Aggregator Methods +// ~~ AGGREGATOR METHODS ~~ // This is the list of methods that the Aggregator exposes to the Operator // The Operator can call these methods to interact with the Aggregator // This methods are automatically registered by the RPC server -// This takes a response an adds it to the internal. If reaching the quorum, it sends the aggregated signatures to ethereum + +// Takes a response from an operator and process it. After processing the response, the associated task may reach quorum, triggering a BLS service response. +// If the task related to the response is not known to the aggregator (not stored in internal map), it will try to fetch it from the contract's Events. // Returns: // - 0: Success // - 1: Error @@ -48,7 +49,6 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t "SenderAddress", "0x"+hex.EncodeToString(signedTaskResponse.SenderAddress[:]), "BatchIdentifierHash", "0x"+hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]), "operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:])) - taskIndex := uint32(0) // The Aggregator may receive the Task Identifier after the operators. // If that's the case, we won't know about the task at this point @@ -57,10 +57,24 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash) if err != nil { - agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum") - *reply = 1 - return nil + agg.logger.Warn("Task not found in the internal map, might have been missed. Trying to fetch task data from Ethereum") + batch, err := agg.avsReader.GetPendingBatchFromMerkleRoot(signedTaskResponse.BatchMerkleRoot, agg.AggregatorConfig.Aggregator.PendingBatchFetchBlockRange) + if err != nil || batch == nil { + agg.logger.Warnf("Pending task with merkle root 0x%x not found in the contract", signedTaskResponse.BatchMerkleRoot) + *reply = 1 + return nil // TODO non urgent nice to have: return an error. With it, the Operator would know that his signature corresponded to a not found task + } + agg.logger.Info("Task was found in Ethereum, adding it to the internal map") + agg.AddNewTask(batch.BatchMerkleRoot, batch.SenderAddress, batch.TaskCreatedBlock) + taskIndex, err = agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash) + if err != nil { + // This shouldn't happen, since we just added the task + agg.logger.Error("Unexpected error trying to get taskIndex from internal map") + *reply = 1 + return nil + } } + agg.telemetry.LogOperatorResponse(signedTaskResponse.BatchMerkleRoot, signedTaskResponse.OperatorId) // Don't wait infinitely if it can't answer @@ -120,8 +134,10 @@ TODO: We should refactor the retry duration considering extending it to a larger func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint32, error) { getTaskIndex_func := func() (uint32, error) { agg.taskMutex.Lock() + agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Get task index") taskIndex, ok := agg.batchesIdxByIdentifierHash[batchIdentifierHash] agg.taskMutex.Unlock() + agg.logger.Info("- Unlocked Resources: Get task index") if !ok { return taskIndex, fmt.Errorf("Task not found in the internal map") } else { diff --git a/config-files/config-aggregator.yaml b/config-files/config-aggregator.yaml index ab8c32ac2a..5dbe4e5f04 100644 --- a/config-files/config-aggregator.yaml +++ b/config-files/config-aggregator.yaml @@ -34,10 +34,11 @@ aggregator: enable_metrics: true metrics_ip_port_address: localhost:9091 telemetry_ip_port_address: localhost:4001 - garbage_collector_period: 2m #The period of the GC process. Suggested value for Prod: '168h' (7 days) - garbage_collector_tasks_age: 20 #The age of tasks that will be removed by the GC, in blocks. Suggested value for prod: '216000' (30 days) - garbage_collector_tasks_interval: 10 #The interval of queried blocks to get an old batch. Suggested value for prod: '900' (3 hours) + garbage_collector_period: 2m # The period of the GC process. Suggested value for Prod: '168h' (7 days) + garbage_collector_tasks_age: 20 # The age of tasks that will be removed by the GC, in blocks. Suggested value for prod: '216000' (30 days) + garbage_collector_tasks_interval: 10 # The interval of queried blocks to get an old batch. Suggested value for prod: '900' (3 hours) bls_service_task_timeout: 168h # The timeout of bls aggregation service tasks. Suggested value for prod '168h' (7 days) + pending_batch_fetch_block_range: 1000 # The interval of queried blocks to get a pending batch by logs. Suggested valued for prod `1000` gas_base_bump_percentage: 25 # Percentage to overestimate gas price when sending a task gas_bump_incremental_percentage: 20 # An extra percentage to overestimate in each bump of respond to task. This is additive between tries # Gas used formula = est_gas_by_node * (gas_base_bump_percentage + gas_bum_incremental_percentage * i) / 100, where i is the iteration number. diff --git a/core/chainio/avs_reader.go b/core/chainio/avs_reader.go index 69b8f281b1..2732872066 100644 --- a/core/chainio/avs_reader.go +++ b/core/chainio/avs_reader.go @@ -150,3 +150,46 @@ func (r *AvsReader) GetOldTaskHash(nBlocksOld uint64, interval uint64) (*[32]byt batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier)) return &batchIdentifierHash, nil } + +// Returns a pending batch from its merkle root or nil if it doesn't exist +// Searches the last `blockRange` blocks at most +func (r *AvsReader) GetPendingBatchFromMerkleRoot(merkleRoot [32]byte, blockRange uint64) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) { + latestBlock, err := r.BlockNumberRetryable(context.Background()) + if err != nil { + return nil, fmt.Errorf("Failed to get latest block number: %w", err) + } + + var fromBlock uint64 = 0 + + if latestBlock > blockRange { + fromBlock = latestBlock - blockRange + } + + logs, err := r.FilterBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: &latestBlock, Context: context.Background()}, [][32]byte{merkleRoot}) + if err != nil { + return nil, err + } + if err := logs.Error(); err != nil { + return nil, err + } + + if !logs.Next() { + return nil, nil // Not an error, but no tasks found + } + + batch := logs.Event + + batchIdentifier := append(batch.BatchMerkleRoot[:], batch.SenderAddress[:]...) + batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier)) + state, err := r.BatchesStateRetryable(nil, batchIdentifierHash) + + if err != nil { + return nil, err + } + + if state.Responded { + return nil, nil // Task found but already responded + } + + return batch, nil +} diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index ad44a25093..721098f265 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -227,3 +227,70 @@ func SubscribeToNewTasksV3Retryable( } return retry.RetryWithData(subscribe_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) } + +// |---AVS_READER---| + +// TODO: These functions are being copied from AvsSubscriber and should be refactorized +// we don't actually need access to the AvsReader, AvsSubscriber or AbsWriter, but instead to the AvsContractBindings + +// TODO: We should also add the fallback calls to the functions which are missing it + +/* +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +*/ +func (r *AvsReader) BlockNumberRetryable(ctx context.Context) (uint64, error) { + latestBlock_func := func() (uint64, error) { + // Try with main connection + latestBlock, err := r.AvsContractBindings.ethClient.BlockNumber(ctx) + if err != nil { + // If error try with fallback connection + latestBlock, err = r.AvsContractBindings.ethClientFallback.BlockNumber(ctx) + } + return latestBlock, err + } + return retry.RetryWithData(latestBlock_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) +} + +/* +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +*/ +func (r *AvsReader) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + // Try with main connection + batch, err := r.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot) + if err != nil { + // If error try with fallback connection + batch, err = r.AvsContractBindings.ServiceManagerFallback.FilterNewBatchV3(opts, batchMerkleRoot) + } + return batch, err + } + return retry.RetryWithData(filterNewBatchV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) +} + +/* +- All errors are considered Transient Errors +- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) +*/ +func (r *AvsReader) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { + TaskCreatedBlock uint32 + Responded bool + RespondToTaskFeeLimit *big.Int +}, error) { + batchState_func := func() (struct { + TaskCreatedBlock uint32 + Responded bool + RespondToTaskFeeLimit *big.Int + }, error) { + // Try with main connection + state, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) + if err != nil { + // If error try with fallback connection + state, err = r.AvsContractBindings.ServiceManagerFallback.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) + } + return state, err + } + + return retry.RetryWithData(batchState_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime) +} diff --git a/core/config/aggregator.go b/core/config/aggregator.go index a55da81938..6e7631de24 100644 --- a/core/config/aggregator.go +++ b/core/config/aggregator.go @@ -25,6 +25,7 @@ type AggregatorConfig struct { GarbageCollectorTasksAge uint64 GarbageCollectorTasksInterval uint64 BlsServiceTaskTimeout time.Duration + PendingBatchFetchBlockRange uint64 GasBaseBumpPercentage uint GasBumpIncrementalPercentage uint TimeToWaitBeforeBump time.Duration @@ -43,6 +44,7 @@ type AggregatorConfigFromYaml struct { GarbageCollectorTasksAge uint64 `yaml:"garbage_collector_tasks_age"` GarbageCollectorTasksInterval uint64 `yaml:"garbage_collector_tasks_interval"` BlsServiceTaskTimeout time.Duration `yaml:"bls_service_task_timeout"` + PendingBatchFetchBlockRange uint64 `yaml:"pending_batch_fetch_block_range"` GasBaseBumpPercentage uint `yaml:"gas_base_bump_percentage"` GasBumpIncrementalPercentage uint `yaml:"gas_bump_incremental_percentage"` TimeToWaitBeforeBump time.Duration `yaml:"time_to_wait_before_bump"` @@ -91,6 +93,7 @@ func NewAggregatorConfig(configFilePath string) *AggregatorConfig { GarbageCollectorTasksAge uint64 GarbageCollectorTasksInterval uint64 BlsServiceTaskTimeout time.Duration + PendingBatchFetchBlockRange uint64 GasBaseBumpPercentage uint GasBumpIncrementalPercentage uint TimeToWaitBeforeBump time.Duration