@@ -74,7 +74,12 @@ type Aggregator struct {
7474 // Note: In case of a reboot it can start from 0 again
7575 nextBatchIndex uint32
7676
77- // Mutex to protect batchesIdentifierHashByIdx, batchesIdxByIdentifierHash and nextBatchIndex
77+ // Mutex to protect:
78+ // - batchesIdentifierHashByIdx
79+ // - batchesIdxByIdentifierHash
80+ // - batchCreatedBlockByIdx
81+ // - batchDataByIdentifierHash
82+ // - nextBatchIndex
7883 taskMutex * sync.Mutex
7984
8085 // Mutex to protect ethereum wallet
@@ -93,6 +98,15 @@ type Aggregator struct {
9398func NewAggregator (aggregatorConfig config.AggregatorConfig ) (* Aggregator , error ) {
9499 newBatchChan := make (chan * servicemanager.ContractAlignedLayerServiceManagerNewBatchV3 )
95100
101+ logger := aggregatorConfig .BaseConfig .Logger
102+
103+ // Metrics
104+ reg := prometheus .NewRegistry ()
105+ aggregatorMetrics := metrics .NewMetrics (aggregatorConfig .Aggregator .MetricsIpPortAddress , reg , logger )
106+
107+ // Telemetry
108+ aggregatorTelemetry := NewTelemetry (aggregatorConfig .Aggregator .TelemetryIpPortAddress , logger )
109+
96110 avsReader , err := chainio .NewAvsReaderFromConfig (aggregatorConfig .BaseConfig , aggregatorConfig .EcdsaConfig )
97111 if err != nil {
98112 return nil , err
@@ -103,7 +117,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
103117 return nil , err
104118 }
105119
106- avsWriter , err := chainio .NewAvsWriterFromConfig (aggregatorConfig .BaseConfig , aggregatorConfig .EcdsaConfig )
120+ avsWriter , err := chainio .NewAvsWriterFromConfig (aggregatorConfig .BaseConfig , aggregatorConfig .EcdsaConfig , aggregatorMetrics )
107121 if err != nil {
108122 return nil , err
109123 }
@@ -124,7 +138,6 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
124138
125139 aggregatorPrivateKey := aggregatorConfig .EcdsaConfig .PrivateKey
126140
127- logger := aggregatorConfig .BaseConfig .Logger
128141 clients , err := sdkclients .BuildAll (chainioConfig , aggregatorPrivateKey , logger )
129142 if err != nil {
130143 logger .Errorf ("Cannot create sdk clients" , "err" , err )
@@ -150,13 +163,6 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
150163 avsRegistryService := avsregistry .NewAvsRegistryServiceChainCaller (avsReader .ChainReader , operatorPubkeysService , logger )
151164 blsAggregationService := blsagg .NewBlsAggregatorService (avsRegistryService , hashFunction , logger )
152165
153- // Metrics
154- reg := prometheus .NewRegistry ()
155- aggregatorMetrics := metrics .NewMetrics (aggregatorConfig .Aggregator .MetricsIpPortAddress , reg , logger )
156-
157- // Telemetry
158- aggregatorTelemetry := NewTelemetry (aggregatorConfig .Aggregator .TelemetryIpPortAddress , logger )
159-
160166 nextBatchIndex := uint32 (0 )
161167
162168 aggregator := Aggregator {
@@ -378,8 +384,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
378384 quorumNums := eigentypes.QuorumNums {eigentypes .QuorumNum (QUORUM_NUMBER )}
379385 quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages {eigentypes .QuorumThresholdPercentage (QUORUM_THRESHOLD )}
380386
381- err := agg .blsAggregationService .InitializeNewTask (batchIndex , taskCreatedBlock , quorumNums , quorumThresholdPercentages , agg .AggregatorConfig .Aggregator .BlsServiceTaskTimeout )
382- // FIXME(marian): When this errors, should we retry initializing new task? Logging fatal for now.
387+ err := agg .InitializeNewTaskRetryable (batchIndex , taskCreatedBlock , quorumNums , quorumThresholdPercentages , agg .AggregatorConfig .Aggregator .BlsServiceTaskTimeout )
383388 if err != nil {
384389 agg .logger .Fatalf ("BLS aggregation service error when initializing new task: %s" , err )
385390 }
@@ -393,7 +398,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
393398// |---RETRYABLE---|
394399
395400/*
396- InitializeNewTask
401+ InitializeNewTaskRetryable
397402Initialize a new task in the BLS Aggregation service
398403 - Errors:
399404 Permanent:
@@ -402,7 +407,7 @@ Initialize a new task in the BLS Aggregation service
402407 - All others.
403408 - Retry times (3 retries): 1 sec, 2 sec, 4 sec
404409*/
405- func (agg * Aggregator ) InitializeNewTask (batchIndex uint32 , taskCreatedBlock uint32 , quorumNums eigentypes.QuorumNums , quorumThresholdPercentages eigentypes.QuorumThresholdPercentages , timeToExpiry time.Duration ) error {
410+ func (agg * Aggregator ) InitializeNewTaskRetryable (batchIndex uint32 , taskCreatedBlock uint32 , quorumNums eigentypes.QuorumNums , quorumThresholdPercentages eigentypes.QuorumThresholdPercentages , timeToExpiry time.Duration ) error {
406411 initializeNewTask_func := func () error {
407412 err := agg .blsAggregationService .InitializeNewTask (batchIndex , taskCreatedBlock , quorumNums , quorumThresholdPercentages , timeToExpiry )
408413 if err != nil {
@@ -443,6 +448,8 @@ func (agg *Aggregator) ClearTasksFromMaps() {
443448 agg .logger .Warn ("No old tasks found" )
444449 continue // Retry in the next iteration
445450 }
451+ agg .taskMutex .Lock ()
452+ agg .AggregatorConfig .BaseConfig .Logger .Info ("- Locked Resources: Cleaning finalized tasks" )
446453
447454 taskIdxToDelete := agg .batchesIdxByIdentifierHash [* oldTaskIdHash ]
448455 agg .logger .Info ("Old task found" , "taskIndex" , taskIdxToDelete )
@@ -460,6 +467,8 @@ func (agg *Aggregator) ClearTasksFromMaps() {
460467 }
461468 }
462469 lastIdxDeleted = taskIdxToDelete
470+ agg .taskMutex .Unlock ()
471+ agg .AggregatorConfig .BaseConfig .Logger .Info ("- Unlocked Resources: Cleaning finalized tasks" )
463472 agg .AggregatorConfig .BaseConfig .Logger .Info ("Done cleaning finalized tasks from maps" )
464473 }
465474}
0 commit comments