Skip to content

Commit a4e8e31

Browse files
JulianVenturaJulian Ventura
andauthored
feat: add aggregator quorum reached and task responded latency gauges (#1565)
Co-authored-by: Julian Ventura <[email protected]>
1 parent 75c34a8 commit a4e8e31

File tree

3 files changed

+261
-6
lines changed

3 files changed

+261
-6
lines changed

aggregator/pkg/aggregator.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ type Aggregator struct {
6767
// Stores the TaskResponse for each batch by batchIdentifierHash
6868
batchDataByIdentifierHash map[[32]byte]BatchData
6969

70+
// Stores the start time for each batch of the aggregator by task index
71+
batchStartTimeByIdx map[uint32]time.Time
72+
7073
// This task index is to communicate with the local BLS
7174
// Service.
7275
// Note: In case of a reboot it can start from 0 again
@@ -78,6 +81,7 @@ type Aggregator struct {
7881
// - batchCreatedBlockByIdx
7982
// - batchDataByIdentifierHash
8083
// - nextBatchIndex
84+
// - batchStartTimeByIdx
8185
taskMutex *sync.Mutex
8286

8387
// Mutex to protect ethereum wallet
@@ -124,6 +128,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
124128
batchesIdxByIdentifierHash := make(map[[32]byte]uint32)
125129
batchDataByIdentifierHash := make(map[[32]byte]BatchData)
126130
batchCreatedBlockByIdx := make(map[uint32]uint64)
131+
batchStartTimeByIdx := make(map[uint32]time.Time)
127132

128133
chainioConfig := sdkclients.BuildAllConfig{
129134
EthHttpUrl: aggregatorConfig.BaseConfig.EthRpcUrl,
@@ -172,6 +177,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
172177
batchesIdxByIdentifierHash: batchesIdxByIdentifierHash,
173178
batchDataByIdentifierHash: batchDataByIdentifierHash,
174179
batchCreatedBlockByIdx: batchCreatedBlockByIdx,
180+
batchStartTimeByIdx: batchStartTimeByIdx,
175181
nextBatchIndex: nextBatchIndex,
176182
taskMutex: &sync.Mutex{},
177183
walletMutex: &sync.Mutex{},
@@ -233,6 +239,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
233239
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
234240
batchData := agg.batchDataByIdentifierHash[batchIdentifierHash]
235241
taskCreatedBlock := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]
242+
taskCreatedAt := agg.batchStartTimeByIdx[blsAggServiceResp.TaskIndex]
236243
agg.taskMutex.Unlock()
237244
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching task data")
238245

@@ -266,6 +273,9 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
266273

267274
agg.telemetry.LogQuorumReached(batchData.BatchMerkleRoot)
268275

276+
// Only observe quorum reached if successful
277+
agg.metrics.ObserveTaskQuorumReached(time.Since(taskCreatedAt))
278+
269279
agg.logger.Info("Threshold reached", "taskIndex", blsAggServiceResp.TaskIndex,
270280
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
271281

@@ -320,6 +330,8 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
320330
agg.metrics.IncBumpedGasPriceForAggregatedResponse()
321331
agg.telemetry.BumpedTaskGasPrice(batchMerkleRoot, bumpedGasPrice.String())
322332
}
333+
334+
startTime := time.Now()
323335
receipt, err := agg.avsWriter.SendAggregatedResponse(
324336
batchIdentifierHash,
325337
batchMerkleRoot,
@@ -338,6 +350,9 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
338350
return nil, err
339351
}
340352

353+
// We only send the latency metric if the response is successul
354+
agg.metrics.ObserveLatencyForRespondToTask(time.Since(startTime))
355+
341356
agg.walletMutex.Unlock()
342357
agg.logger.Infof("- Unlocked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchIdentifierHash[:]))
343358

@@ -383,6 +398,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
383398
BatchMerkleRoot: batchMerkleRoot,
384399
SenderAddress: senderAddress,
385400
}
401+
agg.batchStartTimeByIdx[batchIndex] = time.Now()
386402
agg.logger.Info(
387403
"Task Info added in aggregator:",
388404
"Task", batchIndex,
@@ -447,6 +463,7 @@ func (agg *Aggregator) ClearTasksFromMaps() {
447463
delete(agg.batchCreatedBlockByIdx, i)
448464
delete(agg.batchesIdentifierHashByIdx, i)
449465
delete(agg.batchDataByIdentifierHash, batchIdentifierHash)
466+
delete(agg.batchStartTimeByIdx, i)
450467
} else {
451468
agg.logger.Warn("Task not found in maps", "taskIndex", i)
452469
}

grafana/provisioning/dashboards/aligned/aggregator_batcher.json

Lines changed: 224 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"editable": true,
1919
"fiscalYearStartMonth": 0,
2020
"graphTooltip": 0,
21-
"id": 4,
21+
"id": 2,
2222
"links": [],
2323
"liveNow": false,
2424
"panels": [
@@ -153,7 +153,6 @@
153153
},
154154
{
155155
"datasource": {
156-
"default": true,
157156
"type": "prometheus",
158157
"uid": "prometheus"
159158
},
@@ -2451,7 +2450,32 @@
24512450
]
24522451
}
24532452
},
2454-
"overrides": []
2453+
"overrides": [
2454+
{
2455+
"__systemRef": "hideSeriesFrom",
2456+
"matcher": {
2457+
"id": "byNames",
2458+
"options": {
2459+
"mode": "exclude",
2460+
"names": [
2461+
"{bot=\"aggregator\", instance=\"host.docker.internal:9091\", job=\"aligned-aggregator\"}"
2462+
],
2463+
"prefix": "All except:",
2464+
"readOnly": true
2465+
}
2466+
},
2467+
"properties": [
2468+
{
2469+
"id": "custom.hideFrom",
2470+
"value": {
2471+
"legend": false,
2472+
"tooltip": false,
2473+
"viz": true
2474+
}
2475+
}
2476+
]
2477+
}
2478+
]
24552479
},
24562480
"gridPos": {
24572481
"h": 7,
@@ -2625,9 +2649,203 @@
26252649
}
26262650
],
26272651
"type": "timeseries"
2652+
},
2653+
{
2654+
"datasource": {
2655+
"type": "prometheus",
2656+
"uid": "prometheus"
2657+
},
2658+
"description": "",
2659+
"fieldConfig": {
2660+
"defaults": {
2661+
"color": {
2662+
"mode": "palette-classic"
2663+
},
2664+
"custom": {
2665+
"axisCenteredZero": false,
2666+
"axisColorMode": "text",
2667+
"axisLabel": "",
2668+
"axisPlacement": "auto",
2669+
"barAlignment": 0,
2670+
"drawStyle": "line",
2671+
"fillOpacity": 0,
2672+
"gradientMode": "none",
2673+
"hideFrom": {
2674+
"legend": false,
2675+
"tooltip": false,
2676+
"viz": false
2677+
},
2678+
"insertNulls": false,
2679+
"lineInterpolation": "linear",
2680+
"lineWidth": 1,
2681+
"pointSize": 5,
2682+
"scaleDistribution": {
2683+
"type": "linear"
2684+
},
2685+
"showPoints": "auto",
2686+
"spanNulls": false,
2687+
"stacking": {
2688+
"group": "A",
2689+
"mode": "none"
2690+
},
2691+
"thresholdsStyle": {
2692+
"mode": "off"
2693+
}
2694+
},
2695+
"mappings": [],
2696+
"thresholds": {
2697+
"mode": "absolute",
2698+
"steps": [
2699+
{
2700+
"color": "green",
2701+
"value": null
2702+
},
2703+
{
2704+
"color": "red",
2705+
"value": 80
2706+
}
2707+
]
2708+
},
2709+
"unit": "s"
2710+
},
2711+
"overrides": []
2712+
},
2713+
"gridPos": {
2714+
"h": 8,
2715+
"w": 12,
2716+
"x": 12,
2717+
"y": 61
2718+
},
2719+
"id": 43,
2720+
"interval": "1s",
2721+
"options": {
2722+
"legend": {
2723+
"calcs": [],
2724+
"displayMode": "list",
2725+
"placement": "right",
2726+
"showLegend": false
2727+
},
2728+
"tooltip": {
2729+
"mode": "single",
2730+
"sort": "none"
2731+
}
2732+
},
2733+
"targets": [
2734+
{
2735+
"datasource": {
2736+
"type": "prometheus",
2737+
"uid": "prometheus"
2738+
},
2739+
"editorMode": "code",
2740+
"expr": "aligned_aggregator_respond_to_task_latency{bot=\"aggregator\"}",
2741+
"hide": false,
2742+
"instant": false,
2743+
"legendFormat": "Latest latency",
2744+
"range": true,
2745+
"refId": "Latency"
2746+
}
2747+
],
2748+
"title": "Latest respond to task latency",
2749+
"type": "timeseries"
2750+
},
2751+
{
2752+
"datasource": {
2753+
"type": "prometheus",
2754+
"uid": "prometheus"
2755+
},
2756+
"fieldConfig": {
2757+
"defaults": {
2758+
"color": {
2759+
"mode": "palette-classic"
2760+
},
2761+
"custom": {
2762+
"axisCenteredZero": false,
2763+
"axisColorMode": "text",
2764+
"axisLabel": "",
2765+
"axisPlacement": "auto",
2766+
"barAlignment": 0,
2767+
"drawStyle": "line",
2768+
"fillOpacity": 0,
2769+
"gradientMode": "none",
2770+
"hideFrom": {
2771+
"legend": false,
2772+
"tooltip": false,
2773+
"viz": false
2774+
},
2775+
"insertNulls": false,
2776+
"lineInterpolation": "linear",
2777+
"lineWidth": 1,
2778+
"pointSize": 5,
2779+
"scaleDistribution": {
2780+
"type": "linear"
2781+
},
2782+
"showPoints": "auto",
2783+
"spanNulls": false,
2784+
"stacking": {
2785+
"group": "A",
2786+
"mode": "none"
2787+
},
2788+
"thresholdsStyle": {
2789+
"mode": "off"
2790+
}
2791+
},
2792+
"mappings": [],
2793+
"thresholds": {
2794+
"mode": "absolute",
2795+
"steps": [
2796+
{
2797+
"color": "green",
2798+
"value": null
2799+
},
2800+
{
2801+
"color": "red",
2802+
"value": 80
2803+
}
2804+
]
2805+
}
2806+
},
2807+
"overrides": []
2808+
},
2809+
"gridPos": {
2810+
"h": 8,
2811+
"w": 12,
2812+
"x": 12,
2813+
"y": 69
2814+
},
2815+
"id": 44,
2816+
"interval": "1s",
2817+
"options": {
2818+
"legend": {
2819+
"calcs": [],
2820+
"displayMode": "list",
2821+
"placement": "right",
2822+
"showLegend": false
2823+
},
2824+
"tooltip": {
2825+
"mode": "single",
2826+
"sort": "none"
2827+
}
2828+
},
2829+
"targets": [
2830+
{
2831+
"datasource": {
2832+
"type": "prometheus",
2833+
"uid": "prometheus"
2834+
},
2835+
"editorMode": "code",
2836+
"expr": "aligned_aggregator_task_quorum_reached_latency{bot=\"aggregator\"}",
2837+
"hide": false,
2838+
"instant": false,
2839+
"legendFormat": "Latest latency",
2840+
"range": true,
2841+
"refId": "A"
2842+
}
2843+
],
2844+
"title": "Latest quorum reached latency",
2845+
"type": "timeseries"
26282846
}
26292847
],
2630-
"refresh": "5s",
2848+
"refresh": "",
26312849
"schemaVersion": 38,
26322850
"style": "dark",
26332851
"tags": [],
@@ -2642,6 +2860,6 @@
26422860
"timezone": "browser",
26432861
"title": "System Data",
26442862
"uid": "aggregator",
2645-
"version": 9,
2863+
"version": 19,
26462864
"weekStart": ""
2647-
}
2865+
}

metrics/metrics.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ type Metrics struct {
2121
aggregatorGasCostPaidForBatcherTotal prometheus.Gauge
2222
aggregatorNumTimesPaidForBatcher prometheus.Counter
2323
numBumpedGasPriceForAggregatedResponse prometheus.Counter
24+
aggregatorRespondToTaskLatency prometheus.Gauge
25+
aggregatorTaskQuorumReachedLatency prometheus.Gauge
2426
}
2527

2628
const alignedNamespace = "aligned"
@@ -59,6 +61,16 @@ func NewMetrics(ipPortAddress string, reg prometheus.Registerer, logger logging.
5961
Name: "respond_to_task_gas_price_bumped_count",
6062
Help: "Number of times gas price was bumped while sending aggregated response",
6163
}),
64+
aggregatorRespondToTaskLatency: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
65+
Namespace: alignedNamespace,
66+
Name: "aggregator_respond_to_task_latency",
67+
Help: "Latency of last call to respondToTask on Aligned Service Manager",
68+
}),
69+
aggregatorTaskQuorumReachedLatency: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
70+
Namespace: alignedNamespace,
71+
Name: "aggregator_task_quorum_reached_latency",
72+
Help: "Time it takes for a task to reach quorum",
73+
}),
6274
}
6375
}
6476

@@ -116,3 +128,11 @@ func (m *Metrics) AddAggregatorGasPaidForBatcher(value float64) {
116128
func (m *Metrics) IncBumpedGasPriceForAggregatedResponse() {
117129
m.numBumpedGasPriceForAggregatedResponse.Inc()
118130
}
131+
132+
func (m *Metrics) ObserveLatencyForRespondToTask(elapsed time.Duration) {
133+
m.aggregatorRespondToTaskLatency.Set(elapsed.Seconds())
134+
}
135+
136+
func (m *Metrics) ObserveTaskQuorumReached(elapsed time.Duration) {
137+
m.aggregatorTaskQuorumReachedLatency.Set(elapsed.Seconds())
138+
}

0 commit comments

Comments
 (0)