@@ -74,7 +74,12 @@ type Aggregator struct {
7474 // Note: In case of a reboot it can start from 0 again
7575 nextBatchIndex uint32
7676
77- // Mutex to protect batchesIdentifierHashByIdx, batchesIdxByIdentifierHash and nextBatchIndex
77+ // Mutex to protect:
78+ // - batchesIdentifierHashByIdx
79+ // - batchesIdxByIdentifierHash
80+ // - batchCreatedBlockByIdx
81+ // - batchDataByIdentifierHash
82+ // - nextBatchIndex
7883 taskMutex * sync.Mutex
7984
8085 // Mutex to protect ethereum wallet
@@ -93,6 +98,15 @@ type Aggregator struct {
9398func NewAggregator (aggregatorConfig config.AggregatorConfig ) (* Aggregator , error ) {
9499 newBatchChan := make (chan * servicemanager.ContractAlignedLayerServiceManagerNewBatchV3 )
95100
101+ logger := aggregatorConfig .BaseConfig .Logger
102+
103+ // Metrics
104+ reg := prometheus .NewRegistry ()
105+ aggregatorMetrics := metrics .NewMetrics (aggregatorConfig .Aggregator .MetricsIpPortAddress , reg , logger )
106+
107+ // Telemetry
108+ aggregatorTelemetry := NewTelemetry (aggregatorConfig .Aggregator .TelemetryIpPortAddress , logger )
109+
96110 avsReader , err := chainio .NewAvsReaderFromConfig (aggregatorConfig .BaseConfig , aggregatorConfig .EcdsaConfig )
97111 if err != nil {
98112 return nil , err
@@ -103,7 +117,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
103117 return nil , err
104118 }
105119
106- avsWriter , err := chainio .NewAvsWriterFromConfig (aggregatorConfig .BaseConfig , aggregatorConfig .EcdsaConfig )
120+ avsWriter , err := chainio .NewAvsWriterFromConfig (aggregatorConfig .BaseConfig , aggregatorConfig .EcdsaConfig , aggregatorMetrics )
107121 if err != nil {
108122 return nil , err
109123 }
@@ -124,7 +138,6 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
124138
125139 aggregatorPrivateKey := aggregatorConfig .EcdsaConfig .PrivateKey
126140
127- logger := aggregatorConfig .BaseConfig .Logger
128141 clients , err := sdkclients .BuildAll (chainioConfig , aggregatorPrivateKey , logger )
129142 if err != nil {
130143 logger .Errorf ("Cannot create sdk clients" , "err" , err )
@@ -150,13 +163,6 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
150163 avsRegistryService := avsregistry .NewAvsRegistryServiceChainCaller (avsReader .ChainReader , operatorPubkeysService , logger )
151164 blsAggregationService := blsagg .NewBlsAggregatorService (avsRegistryService , hashFunction , logger )
152165
153- // Metrics
154- reg := prometheus .NewRegistry ()
155- aggregatorMetrics := metrics .NewMetrics (aggregatorConfig .Aggregator .MetricsIpPortAddress , reg , logger )
156-
157- // Telemetry
158- aggregatorTelemetry := NewTelemetry (aggregatorConfig .Aggregator .TelemetryIpPortAddress , logger )
159-
160166 nextBatchIndex := uint32 (0 )
161167
162168 aggregator := Aggregator {
@@ -219,6 +225,13 @@ func (agg *Aggregator) Start(ctx context.Context) error {
219225const MaxSentTxRetries = 5
220226
221227func (agg * Aggregator ) handleBlsAggServiceResponse (blsAggServiceResp blsagg.BlsAggregationServiceResponse ) {
228+ defer func () {
229+ err := recover () //stops panics
230+ if err != nil {
231+ agg .logger .Error ("handleBlsAggServiceResponse recovered from panic" , "err" , err )
232+ }
233+ }()
234+
222235 agg .taskMutex .Lock ()
223236 agg .AggregatorConfig .BaseConfig .Logger .Info ("- Locked Resources: Fetching task data" )
224237 batchIdentifierHash := agg .batchesIdentifierHashByIdx [blsAggServiceResp .TaskIndex ]
@@ -271,10 +284,15 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
271284 }
272285
273286 agg .logger .Info ("Sending aggregated response onchain" , "taskIndex" , blsAggServiceResp .TaskIndex ,
274- "batchIdentifierHash" , "0x" + hex .EncodeToString (batchIdentifierHash [:]))
287+ "batchIdentifierHash" , "0x" + hex .EncodeToString (batchIdentifierHash [:]), "merkleRoot" , "0x" + hex . EncodeToString ( batchData . BatchMerkleRoot [:]) )
275288 receipt , err := agg .sendAggregatedResponse (batchIdentifierHash , batchData .BatchMerkleRoot , batchData .SenderAddress , nonSignerStakesAndSignature )
276289 if err == nil {
277- agg .telemetry .TaskSentToEthereum (batchData .BatchMerkleRoot , receipt .TxHash .String ())
290+ // In some cases, we may fail to retrieve the receipt for the transaction.
291+ txHash := "Unknown"
292+ if receipt != nil {
293+ txHash = receipt .TxHash .String ()
294+ }
295+ agg .telemetry .TaskSentToEthereum (batchData .BatchMerkleRoot , txHash )
278296 agg .logger .Info ("Aggregator successfully responded to task" ,
279297 "taskIndex" , blsAggServiceResp .TaskIndex ,
280298 "batchIdentifierHash" , "0x" + hex .EncodeToString (batchIdentifierHash [:]))
@@ -378,8 +396,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
378396 quorumNums := eigentypes.QuorumNums {eigentypes .QuorumNum (QUORUM_NUMBER )}
379397 quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages {eigentypes .QuorumThresholdPercentage (QUORUM_THRESHOLD )}
380398
381- err := agg .blsAggregationService .InitializeNewTask (batchIndex , taskCreatedBlock , quorumNums , quorumThresholdPercentages , agg .AggregatorConfig .Aggregator .BlsServiceTaskTimeout )
382- // FIXME(marian): When this errors, should we retry initializing new task? Logging fatal for now.
399+ err := agg .InitializeNewTaskRetryable (batchIndex , taskCreatedBlock , quorumNums , quorumThresholdPercentages , agg .AggregatorConfig .Aggregator .BlsServiceTaskTimeout )
383400 if err != nil {
384401 agg .logger .Fatalf ("BLS aggregation service error when initializing new task: %s" , err )
385402 }
@@ -393,15 +410,17 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
393410// |---RETRYABLE---|
394411
395412/*
413+ InitializeNewTaskRetryable
414+ Initialize a new task in the BLS Aggregation service
396415 - Errors:
397416 Permanent:
398417 - TaskAlreadyInitializedError (Permanent): Task is already intialized in the BLS Aggregation service (https://github.com/Layr-Labs/eigensdk-go/blob/dev/services/bls_aggregation/blsagg.go#L27).
399418 Transient:
400419 - All others.
401- - Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks)
420+ - Retry times (3 retries): 1 sec, 2 sec, 4 sec
402421*/
403- func (agg * Aggregator ) InitializeNewTask (batchIndex uint32 , taskCreatedBlock uint32 , quorumNums eigentypes.QuorumNums , quorumThresholdPercentages eigentypes.QuorumThresholdPercentages , timeToExpiry time.Duration ) error {
404- initilizeNewTask_func := func () error {
422+ func (agg * Aggregator ) InitializeNewTaskRetryable (batchIndex uint32 , taskCreatedBlock uint32 , quorumNums eigentypes.QuorumNums , quorumThresholdPercentages eigentypes.QuorumThresholdPercentages , timeToExpiry time.Duration ) error {
423+ initializeNewTask_func := func () error {
405424 err := agg .blsAggregationService .InitializeNewTask (batchIndex , taskCreatedBlock , quorumNums , quorumThresholdPercentages , timeToExpiry )
406425 if err != nil {
407426 // Task is already initialized
@@ -411,7 +430,7 @@ func (agg *Aggregator) InitializeNewTask(batchIndex uint32, taskCreatedBlock uin
411430 }
412431 return err
413432 }
414- return retry .Retry (initilizeNewTask_func , retry .MinDelayChain , retry .RetryFactor , retry .NumRetries , retry .MaxIntervalChain , retry .MaxElapsedTime )
433+ return retry .Retry (initializeNewTask_func , retry .MinDelay , retry .RetryFactor , retry .NumRetries , retry .MaxInterval , retry .MaxElapsedTime )
415434}
416435
417436// Long-lived goroutine that periodically checks and removes old Tasks from stored Maps
@@ -421,7 +440,7 @@ func (agg *Aggregator) ClearTasksFromMaps() {
421440 defer func () {
422441 err := recover () //stops panics
423442 if err != nil {
424- agg .logger .Error ("Recovered from panic" , "err" , err )
443+ agg .logger .Error ("ClearTasksFromMaps Recovered from panic" , "err" , err )
425444 }
426445 }()
427446
@@ -441,6 +460,8 @@ func (agg *Aggregator) ClearTasksFromMaps() {
441460 agg .logger .Warn ("No old tasks found" )
442461 continue // Retry in the next iteration
443462 }
463+ agg .taskMutex .Lock ()
464+ agg .AggregatorConfig .BaseConfig .Logger .Info ("- Locked Resources: Cleaning finalized tasks" )
444465
445466 taskIdxToDelete := agg .batchesIdxByIdentifierHash [* oldTaskIdHash ]
446467 agg .logger .Info ("Old task found" , "taskIndex" , taskIdxToDelete )
@@ -458,6 +479,8 @@ func (agg *Aggregator) ClearTasksFromMaps() {
458479 }
459480 }
460481 lastIdxDeleted = taskIdxToDelete
482+ agg .taskMutex .Unlock ()
483+ agg .AggregatorConfig .BaseConfig .Logger .Info ("- Unlocked Resources: Cleaning finalized tasks" )
461484 agg .AggregatorConfig .BaseConfig .Logger .Info ("Done cleaning finalized tasks from maps" )
462485 }
463486}
0 commit comments