Skip to content

Commit 18f64e9

Browse files
JuArceavilagaston9JulianVenturaJulian VenturaOppen
authored
feat(telemetry): implement telemetry for aggregator (#1077)
Co-authored-by: Avila Gastón <[email protected]> Co-authored-by: Julian Ventura <[email protected]> Co-authored-by: Julian Ventura <[email protected]> Co-authored-by: avilagaston9 <[email protected]> Co-authored-by: Mario Rugiero <[email protected]>
1 parent 477de1d commit 18f64e9

28 files changed

+713
-64
lines changed

aggregator/internal/pkg/aggregator.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/prometheus/client_golang/prometheus"
1313
"github.com/yetanotherco/aligned_layer/metrics"
1414

15-
"github.com/Layr-Labs/eigensdk-go/chainio/clients"
1615
sdkclients "github.com/Layr-Labs/eigensdk-go/chainio/clients"
1716
"github.com/Layr-Labs/eigensdk-go/logging"
1817
"github.com/Layr-Labs/eigensdk-go/services/avsregistry"
@@ -80,8 +79,12 @@ type Aggregator struct {
8079

8180
logger logging.Logger
8281

82+
// Metrics
8383
metricsReg *prometheus.Registry
8484
metrics *metrics.Metrics
85+
86+
// Telemetry
87+
telemetry *Telemetry
8588
}
8689

8790
func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error) {
@@ -119,7 +122,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
119122
aggregatorPrivateKey := aggregatorConfig.EcdsaConfig.PrivateKey
120123

121124
logger := aggregatorConfig.BaseConfig.Logger
122-
clients, err := clients.BuildAll(chainioConfig, aggregatorPrivateKey, logger)
125+
clients, err := sdkclients.BuildAll(chainioConfig, aggregatorPrivateKey, logger)
123126
if err != nil {
124127
logger.Errorf("Cannot create sdk clients", "err", err)
125128
return nil, err
@@ -148,6 +151,9 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
148151
reg := prometheus.NewRegistry()
149152
aggregatorMetrics := metrics.NewMetrics(aggregatorConfig.Aggregator.MetricsIpPortAddress, reg, logger)
150153

154+
// Telemetry
155+
aggregatorTelemetry := NewTelemetry(aggregatorConfig.Aggregator.TelemetryIpPortAddress, logger)
156+
151157
nextBatchIndex := uint32(0)
152158

153159
aggregator := Aggregator{
@@ -169,6 +175,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
169175
logger: logger,
170176
metricsReg: reg,
171177
metrics: aggregatorMetrics,
178+
telemetry: aggregatorTelemetry,
172179
}
173180

174181
return &aggregator, nil
@@ -209,11 +216,20 @@ func (agg *Aggregator) Start(ctx context.Context) error {
209216
const MaxSentTxRetries = 5
210217

211218
func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
219+
agg.taskMutex.Lock()
220+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Fetching task data")
221+
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
222+
batchData := agg.batchDataByIdentifierHash[batchIdentifierHash]
223+
taskCreatedBlock := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]
224+
agg.taskMutex.Unlock()
225+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching task data")
226+
227+
// Finish task trace once the task is processed (either successfully or not)
228+
defer agg.telemetry.FinishTrace(batchData.BatchMerkleRoot)
229+
212230
if blsAggServiceResp.Err != nil {
213-
agg.taskMutex.Lock()
214-
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
231+
agg.telemetry.LogTaskError(batchData.BatchMerkleRoot, blsAggServiceResp.Err)
215232
agg.logger.Error("BlsAggregationServiceResponse contains an error", "err", blsAggServiceResp.Err, "batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))
216-
agg.taskMutex.Unlock()
217233
return
218234
}
219235
nonSignerPubkeys := []servicemanager.BN254G1Point{}
@@ -236,13 +252,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
236252
NonSignerStakeIndices: blsAggServiceResp.NonSignerStakeIndices,
237253
}
238254

239-
agg.taskMutex.Lock()
240-
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Fetching merkle root")
241-
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
242-
batchData := agg.batchDataByIdentifierHash[batchIdentifierHash]
243-
taskCreatedBlock := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]
244-
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching merkle root")
245-
agg.taskMutex.Unlock()
255+
agg.telemetry.LogQuorumReached(batchData.BatchMerkleRoot)
246256

247257
agg.logger.Info("Threshold reached", "taskIndex", blsAggServiceResp.TaskIndex,
248258
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
@@ -278,6 +288,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
278288
"merkleRoot", "0x"+hex.EncodeToString(batchData.BatchMerkleRoot[:]),
279289
"senderAddress", "0x"+hex.EncodeToString(batchData.SenderAddress[:]),
280290
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
291+
agg.telemetry.LogTaskError(batchData.BatchMerkleRoot, err)
281292
}
282293

283294
// / Sends response to contract and waits for transaction receipt
@@ -294,6 +305,7 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
294305
if err != nil {
295306
agg.walletMutex.Unlock()
296307
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err)
308+
agg.telemetry.LogTaskError(batchMerkleRoot, err)
297309
return nil, err
298310
}
299311

@@ -303,6 +315,7 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
303315
receipt, err := utils.WaitForTransactionReceipt(
304316
agg.AggregatorConfig.BaseConfig.EthRpcClient, context.Background(), *txHash)
305317
if err != nil {
318+
agg.telemetry.LogTaskError(batchMerkleRoot, err)
306319
return nil, err
307320
}
308321

@@ -312,6 +325,7 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
312325
}
313326

314327
func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]byte, taskCreatedBlock uint32) {
328+
agg.telemetry.InitNewTrace(batchMerkleRoot)
315329
batchIdentifier := append(batchMerkleRoot[:], senderAddress[:]...)
316330
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))
317331

aggregator/internal/pkg/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
4949
"SenderAddress", "0x"+hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
5050
"BatchIdentifierHash", "0x"+hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]),
5151
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))
52-
5352
taskIndex := uint32(0)
5453
ok := false
5554

@@ -71,6 +70,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
7170
*reply = 1
7271
return nil
7372
}
73+
agg.telemetry.LogOperatorResponse(signedTaskResponse.BatchMerkleRoot, signedTaskResponse.OperatorId)
7474

7575
// Don't wait infinitely if it can't answer
7676
// Create a context with a timeout of 5 seconds
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package pkg
2+
3+
import (
4+
"bytes"
5+
"encoding/hex"
6+
"encoding/json"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
"net/url"
11+
"time"
12+
13+
"github.com/Layr-Labs/eigensdk-go/logging"
14+
)
15+
16+
type TraceMessage struct {
17+
MerkleRoot string `json:"merkle_root"`
18+
}
19+
20+
type OperatorResponseMessage struct {
21+
MerkleRoot string `json:"merkle_root"`
22+
OperatorId string `json:"operator_id"`
23+
}
24+
type QuorumReachedMessage struct {
25+
MerkleRoot string `json:"merkle_root"`
26+
}
27+
28+
type TaskErrorMessage struct {
29+
MerkleRoot string `json:"merkle_root"`
30+
TaskError string `json:"error"`
31+
}
32+
33+
type Telemetry struct {
34+
client http.Client
35+
baseURL url.URL
36+
logger logging.Logger
37+
}
38+
39+
func NewTelemetry(serverAddress string, logger logging.Logger) *Telemetry {
40+
client := http.Client{}
41+
42+
baseURL := url.URL{
43+
Scheme: "http",
44+
Host: serverAddress,
45+
}
46+
logger.Info("[Telemetry] Starting Telemetry client.", "server_address",
47+
serverAddress)
48+
49+
return &Telemetry{
50+
client: client,
51+
baseURL: baseURL,
52+
logger: logger,
53+
}
54+
}
55+
56+
func (t *Telemetry) InitNewTrace(batchMerkleRoot [32]byte) {
57+
body := TraceMessage{
58+
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
59+
}
60+
if err := t.sendTelemetryMessage("/api/initTaskTrace", body); err != nil {
61+
t.logger.Error("[Telemetry] Error in InitNewTrace", "error", err)
62+
}
63+
}
64+
65+
func (t *Telemetry) LogOperatorResponse(batchMerkleRoot [32]byte, operatorId [32]byte) {
66+
body := OperatorResponseMessage{
67+
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
68+
OperatorId: fmt.Sprintf("0x%s", hex.EncodeToString(operatorId[:])),
69+
}
70+
if err := t.sendTelemetryMessage("/api/operatorResponse", body); err != nil {
71+
t.logger.Error("[Telemetry] Error in LogOperatorResponse", "error", err)
72+
}
73+
}
74+
75+
func (t *Telemetry) LogQuorumReached(batchMerkleRoot [32]byte) {
76+
body := QuorumReachedMessage{
77+
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
78+
}
79+
if err := t.sendTelemetryMessage("/api/quorumReached", body); err != nil {
80+
t.logger.Error("[Telemetry] Error in LogQuorumReached", "error", err)
81+
}
82+
}
83+
84+
func (t *Telemetry) LogTaskError(batchMerkleRoot [32]byte, taskError error) {
85+
body := TaskErrorMessage{
86+
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
87+
TaskError: taskError.Error(),
88+
}
89+
if err := t.sendTelemetryMessage("/api/taskError", body); err != nil {
90+
t.logger.Error("[Telemetry] Error in LogTaskError", "error", err)
91+
}
92+
}
93+
94+
func (t *Telemetry) FinishTrace(batchMerkleRoot [32]byte) {
95+
// In order to wait for all operator responses, even if the quorum is reached, this function has a delayed execution
96+
go func() {
97+
time.Sleep(10 * time.Second)
98+
body := TraceMessage{
99+
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
100+
}
101+
if err := t.sendTelemetryMessage("/api/finishTaskTrace", body); err != nil {
102+
t.logger.Error("[Telemetry] Error in FinishTrace", "error", err)
103+
}
104+
}()
105+
}
106+
107+
func (t *Telemetry) sendTelemetryMessage(endpoint string, message interface{}) error {
108+
encodedBody, err := json.Marshal(message)
109+
if err != nil {
110+
t.logger.Error("[Telemetry] Error marshalling JSON", "error", err)
111+
return fmt.Errorf("error marshalling JSON: %w", err)
112+
}
113+
114+
t.logger.Info("[Telemetry] Sending message.", "endpoint", endpoint, "message", message)
115+
116+
fullURL := t.baseURL.ResolveReference(&url.URL{Path: endpoint})
117+
118+
resp, err := t.client.Post(fullURL.String(), "application/json", bytes.NewBuffer(encodedBody))
119+
if err != nil {
120+
t.logger.Error("[Telemetry] Error sending POST request", "error", err)
121+
return fmt.Errorf("error making POST request: %w", err)
122+
}
123+
defer resp.Body.Close()
124+
125+
respBody, err := io.ReadAll(resp.Body)
126+
if err != nil {
127+
t.logger.Error("[Telemetry] Error reading response body", "error", err)
128+
return fmt.Errorf("error reading response body: %w", err)
129+
}
130+
131+
t.logger.Info("[Telemetry] Response received", "status", resp.Status, "response_body", string(respBody))
132+
133+
return nil
134+
}

config-files/config-aggregator.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ aggregator:
3434
avs_service_manager_address: 0xc3e53F4d16Ae77Db1c982e75a937B9f60FE63690
3535
enable_metrics: true
3636
metrics_ip_port_address: localhost:9091
37+
telemetry_ip_port_address: localhost:4001
3738
## Operator Configurations
3839
# operator:
3940
# aggregator_rpc_server_ip_port_address: localhost:8090

core/config/aggregator.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package config
22

33
import (
44
"errors"
5-
sdkutils "github.com/Layr-Labs/eigensdk-go/utils"
6-
"github.com/ethereum/go-ethereum/common"
75
"log"
86
"os"
7+
8+
sdkutils "github.com/Layr-Labs/eigensdk-go/utils"
9+
"github.com/ethereum/go-ethereum/common"
910
)
1011

1112
type AggregatorConfig struct {
@@ -18,6 +19,7 @@ type AggregatorConfig struct {
1819
AvsServiceManagerAddress common.Address
1920
EnableMetrics bool
2021
MetricsIpPortAddress string
22+
TelemetryIpPortAddress string
2123
}
2224
}
2325

@@ -28,6 +30,7 @@ type AggregatorConfigFromYaml struct {
2830
AvsServiceManagerAddress common.Address `yaml:"avs_service_manager_address"`
2931
EnableMetrics bool `yaml:"enable_metrics"`
3032
MetricsIpPortAddress string `yaml:"metrics_ip_port_address"`
33+
TelemetryIpPortAddress string `yaml:"telemetry_ip_port_address"`
3134
} `yaml:"aggregator"`
3235
}
3336

@@ -68,6 +71,7 @@ func NewAggregatorConfig(configFilePath string) *AggregatorConfig {
6871
AvsServiceManagerAddress common.Address
6972
EnableMetrics bool
7073
MetricsIpPortAddress string
74+
TelemetryIpPortAddress string
7175
}(aggregatorConfigFromYaml.Aggregator),
7276
}
7377
}

otel-collector.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
receivers:
2+
otlp:
3+
protocols:
4+
grpc:
5+
endpoint: 0.0.0.0:4317
6+
processors:
7+
extensions:
8+
health_check: {}
9+
exporters:
10+
otlp:
11+
endpoint: jaeger:4317
12+
tls:
13+
insecure: true
14+
debug:
15+
16+
service:
17+
extensions: [health_check]
18+
pipelines:
19+
traces:
20+
receivers: [otlp]
21+
processors: []
22+
exporters: [otlp]
23+
24+
metrics:
25+
receivers: [ otlp ]
26+
processors: [ ]
27+
exporters: [ debug ]

telemetry-docker-compose.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
version: "3"
2+
3+
services:
4+
otel-collector:
5+
image: otel/opentelemetry-collector-contrib:0.107.0
6+
command: ["--config=/etc/otel-collector.yaml"]
7+
volumes:
8+
- ./otel-collector.yaml:/etc/otel-collector.yaml
9+
ports:
10+
- "4317:4317"
11+
12+
jaeger:
13+
image: jaegertracing/all-in-one:1.60
14+
ports:
15+
- "16686:16686"

telemetry_api/.env.dev

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
ALIGNED_CONFIG_FILE="../contracts/script/output/devnet/alignedlayer_deployment_output.json"
2+
OPERATOR_FETCHER_WAIT_TIME_MS=5000
23
ENVIRONMENT=devnet

telemetry_api/config/config.exs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ config :logger, :console,
3030
# Use Jason for JSON parsing in Phoenix
3131
config :phoenix, :json_library, Jason
3232

33+
# https://opentelemetry.io/docs/languages/erlang/exporters/#setting-up-the-collector
34+
config :opentelemetry_exporter,
35+
otlp_protocol: :grpc,
36+
otlp_endpoint: "http://localhost:4317"
37+
38+
config :opentelemetry,
39+
resource: %{service: %{name: "telemetry_api"}},
40+
span_processor: :batch,
41+
traces_exporter: :otlp
42+
3343
# Import environment specific config. This must remain at the bottom
3444
# of this file so it overrides the configuration defined above.
3545
import_config "#{config_env()}.exs"

telemetry_api/config/dev.exs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,6 @@ config :phoenix, :plug_init_mode, :runtime
6565

6666
# Configure ethereumex url
6767
config :ethereumex, url: "http://localhost:8545"
68+
69+
# For development, we use the stdout exporter to ensure everything is working properly
70+
# config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []}

0 commit comments

Comments
 (0)