@@ -12,7 +12,6 @@ import (
1212 "github.com/prometheus/client_golang/prometheus"
1313 "github.com/yetanotherco/aligned_layer/metrics"
1414
15- "github.com/Layr-Labs/eigensdk-go/chainio/clients"
1615 sdkclients "github.com/Layr-Labs/eigensdk-go/chainio/clients"
1716 "github.com/Layr-Labs/eigensdk-go/logging"
1817 "github.com/Layr-Labs/eigensdk-go/services/avsregistry"
@@ -80,8 +79,12 @@ type Aggregator struct {
8079
8180 logger logging.Logger
8281
82+ // Metrics
8383 metricsReg * prometheus.Registry
8484 metrics * metrics.Metrics
85+
86+ // Telemetry
87+ telemetry * Telemetry
8588}
8689
8790func NewAggregator (aggregatorConfig config.AggregatorConfig ) (* Aggregator , error ) {
@@ -119,7 +122,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
119122 aggregatorPrivateKey := aggregatorConfig .EcdsaConfig .PrivateKey
120123
121124 logger := aggregatorConfig .BaseConfig .Logger
122- clients , err := clients .BuildAll (chainioConfig , aggregatorPrivateKey , logger )
125+ clients , err := sdkclients .BuildAll (chainioConfig , aggregatorPrivateKey , logger )
123126 if err != nil {
124127 logger .Errorf ("Cannot create sdk clients" , "err" , err )
125128 return nil , err
@@ -148,6 +151,9 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
148151 reg := prometheus .NewRegistry ()
149152 aggregatorMetrics := metrics .NewMetrics (aggregatorConfig .Aggregator .MetricsIpPortAddress , reg , logger )
150153
154+ // Telemetry
155+ aggregatorTelemetry := NewTelemetry (aggregatorConfig .Aggregator .TelemetryIpPortAddress , logger )
156+
151157 nextBatchIndex := uint32 (0 )
152158
153159 aggregator := Aggregator {
@@ -169,6 +175,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
169175 logger : logger ,
170176 metricsReg : reg ,
171177 metrics : aggregatorMetrics ,
178+ telemetry : aggregatorTelemetry ,
172179 }
173180
174181 return & aggregator , nil
@@ -209,11 +216,20 @@ func (agg *Aggregator) Start(ctx context.Context) error {
209216const MaxSentTxRetries = 5
210217
211218func (agg * Aggregator ) handleBlsAggServiceResponse (blsAggServiceResp blsagg.BlsAggregationServiceResponse ) {
219+ agg .taskMutex .Lock ()
220+ agg .AggregatorConfig .BaseConfig .Logger .Info ("- Locked Resources: Fetching task data" )
221+ batchIdentifierHash := agg .batchesIdentifierHashByIdx [blsAggServiceResp .TaskIndex ]
222+ batchData := agg .batchDataByIdentifierHash [batchIdentifierHash ]
223+ taskCreatedBlock := agg .batchCreatedBlockByIdx [blsAggServiceResp .TaskIndex ]
224+ agg .taskMutex .Unlock ()
225+ agg .AggregatorConfig .BaseConfig .Logger .Info ("- Unlocked Resources: Fetching task data" )
226+
227+ // Finish task trace once the task is processed (either successfully or not)
228+ defer agg .telemetry .FinishTrace (batchData .BatchMerkleRoot )
229+
212230 if blsAggServiceResp .Err != nil {
213- agg .taskMutex .Lock ()
214- batchIdentifierHash := agg .batchesIdentifierHashByIdx [blsAggServiceResp .TaskIndex ]
231+ agg .telemetry .LogTaskError (batchData .BatchMerkleRoot , blsAggServiceResp .Err )
215232 agg .logger .Error ("BlsAggregationServiceResponse contains an error" , "err" , blsAggServiceResp .Err , "batchIdentifierHash" , hex .EncodeToString (batchIdentifierHash [:]))
216- agg .taskMutex .Unlock ()
217233 return
218234 }
219235 nonSignerPubkeys := []servicemanager.BN254G1Point {}
@@ -236,13 +252,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
236252 NonSignerStakeIndices : blsAggServiceResp .NonSignerStakeIndices ,
237253 }
238254
239- agg .taskMutex .Lock ()
240- agg .AggregatorConfig .BaseConfig .Logger .Info ("- Locked Resources: Fetching merkle root" )
241- batchIdentifierHash := agg .batchesIdentifierHashByIdx [blsAggServiceResp .TaskIndex ]
242- batchData := agg .batchDataByIdentifierHash [batchIdentifierHash ]
243- taskCreatedBlock := agg .batchCreatedBlockByIdx [blsAggServiceResp .TaskIndex ]
244- agg .AggregatorConfig .BaseConfig .Logger .Info ("- Unlocked Resources: Fetching merkle root" )
245- agg .taskMutex .Unlock ()
255+ agg .telemetry .LogQuorumReached (batchData .BatchMerkleRoot )
246256
247257 agg .logger .Info ("Threshold reached" , "taskIndex" , blsAggServiceResp .TaskIndex ,
248258 "batchIdentifierHash" , "0x" + hex .EncodeToString (batchIdentifierHash [:]))
@@ -278,6 +288,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
278288 "merkleRoot" , "0x" + hex .EncodeToString (batchData .BatchMerkleRoot [:]),
279289 "senderAddress" , "0x" + hex .EncodeToString (batchData .SenderAddress [:]),
280290 "batchIdentifierHash" , "0x" + hex .EncodeToString (batchIdentifierHash [:]))
291+ agg .telemetry .LogTaskError (batchData .BatchMerkleRoot , err )
281292}
282293
283294// / Sends response to contract and waits for transaction receipt
@@ -294,6 +305,7 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
294305 if err != nil {
295306 agg .walletMutex .Unlock ()
296307 agg .logger .Infof ("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s" , hex .EncodeToString (batchIdentifierHash [:]), err )
308+ agg .telemetry .LogTaskError (batchMerkleRoot , err )
297309 return nil , err
298310 }
299311
@@ -303,6 +315,7 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
303315 receipt , err := utils .WaitForTransactionReceipt (
304316 agg .AggregatorConfig .BaseConfig .EthRpcClient , context .Background (), * txHash )
305317 if err != nil {
318+ agg .telemetry .LogTaskError (batchMerkleRoot , err )
306319 return nil , err
307320 }
308321
@@ -312,6 +325,7 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
312325}
313326
314327func (agg * Aggregator ) AddNewTask (batchMerkleRoot [32 ]byte , senderAddress [20 ]byte , taskCreatedBlock uint32 ) {
328+ agg .telemetry .InitNewTrace (batchMerkleRoot )
315329 batchIdentifier := append (batchMerkleRoot [:], senderAddress [:]... )
316330 var batchIdentifierHash = * (* [32 ]byte )(crypto .Keccak256 (batchIdentifier ))
317331
@@ -358,6 +372,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
358372 agg .logger .Fatalf ("BLS aggregation service error when initializing new task: %s" , err )
359373 }
360374
375+ agg .metrics .IncAggregatorReceivedTasks ()
361376 agg .taskMutex .Unlock ()
362377 agg .AggregatorConfig .BaseConfig .Logger .Info ("- Unlocked Resources: Adding new task" )
363378 agg .logger .Info ("New task added" , "batchIndex" , batchIndex , "batchIdentifierHash" , "0x" + hex .EncodeToString (batchIdentifierHash [:]))
0 commit comments