Skip to content

Commit 9ca55fb

Browse files
JulianVenturaJulian Ventura
andauthored
feat: Improve jaeger traces on the aggregator (#1603)
Co-authored-by: Julian Ventura <[email protected]>
1 parent 15008ab commit 9ca55fb

File tree

6 files changed

+42
-37
lines changed

6 files changed

+42
-37
lines changed

aggregator/pkg/aggregator.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,12 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
295295
if err == nil {
296296
// In some cases, we may fail to retrieve the receipt for the transaction.
297297
txHash := "Unknown"
298+
effectiveGasPrice := "Unknown"
298299
if receipt != nil {
299300
txHash = receipt.TxHash.String()
301+
effectiveGasPrice = receipt.EffectiveGasPrice.String()
300302
}
301-
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, txHash)
303+
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, txHash, effectiveGasPrice)
302304
agg.logger.Info("Aggregator successfully responded to task",
303305
"taskIndex", blsAggServiceResp.TaskIndex,
304306
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
@@ -326,9 +328,8 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
326328
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))
327329

328330
// This function is a callback that is called when the gas price is bumped on the avsWriter.SendAggregatedResponse
329-
onGasPriceBumped := func(bumpedGasPrice *big.Int) {
330-
agg.metrics.IncBumpedGasPriceForAggregatedResponse()
331-
agg.telemetry.BumpedTaskGasPrice(batchMerkleRoot, bumpedGasPrice.String())
331+
onSetGasPrice := func(gasPrice *big.Int) {
332+
agg.telemetry.TaskSetGasPrice(batchMerkleRoot, gasPrice.String())
332333
}
333334

334335
startTime := time.Now()
@@ -341,12 +342,12 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
341342
agg.AggregatorConfig.Aggregator.GasBumpIncrementalPercentage,
342343
agg.AggregatorConfig.Aggregator.GasBumpPercentageLimit,
343344
agg.AggregatorConfig.Aggregator.TimeToWaitBeforeBump,
344-
onGasPriceBumped,
345+
agg.metrics,
346+
onSetGasPrice,
345347
)
346348
if err != nil {
347349
agg.walletMutex.Unlock()
348350
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err)
349-
agg.telemetry.LogTaskError(batchMerkleRoot, err)
350351
return nil, err
351352
}
352353

aggregator/pkg/telemetry.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,15 @@ type TaskErrorMessage struct {
3030
TaskError string `json:"error"`
3131
}
3232

33-
type TaskGasPriceBumpMessage struct {
34-
MerkleRoot string `json:"merkle_root"`
35-
BumpedGasPrice string `json:"bumped_gas_price"`
33+
type TaskSetGasPriceMessage struct {
34+
MerkleRoot string `json:"merkle_root"`
35+
GasPrice string `json:"gas_price"`
3636
}
3737

3838
type TaskSentToEthereumMessage struct {
39-
MerkleRoot string `json:"merkle_root"`
40-
TxHash string `json:"tx_hash"`
39+
MerkleRoot string `json:"merkle_root"`
40+
TxHash string `json:"tx_hash"`
41+
EffectiveGasPrice string `json:"effective_gas_price"`
4142
}
4243

4344
type Telemetry struct {
@@ -101,20 +102,21 @@ func (t *Telemetry) LogTaskError(batchMerkleRoot [32]byte, taskError error) {
101102
}
102103
}
103104

104-
func (t *Telemetry) BumpedTaskGasPrice(batchMerkleRoot [32]byte, bumpedGasPrice string) {
105-
body := TaskGasPriceBumpMessage{
106-
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
107-
BumpedGasPrice: bumpedGasPrice,
105+
func (t *Telemetry) TaskSetGasPrice(batchMerkleRoot [32]byte, gasPrice string) {
106+
body := TaskSetGasPriceMessage{
107+
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
108+
GasPrice: gasPrice,
108109
}
109-
if err := t.sendTelemetryMessage("/api/aggregatorTaskGasPriceBump", body); err != nil {
110+
if err := t.sendTelemetryMessage("/api/aggregatorTaskSetGasPrice", body); err != nil {
110111
t.logger.Warn("[Telemetry] Error in LogOperatorResponse", "error", err)
111112
}
112113
}
113114

114-
func (t *Telemetry) TaskSentToEthereum(batchMerkleRoot [32]byte, txHash string) {
115+
func (t *Telemetry) TaskSentToEthereum(batchMerkleRoot [32]byte, txHash string, effectiveGasPrice string) {
115116
body := TaskSentToEthereumMessage{
116-
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
117-
TxHash: txHash,
117+
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
118+
TxHash: txHash,
119+
EffectiveGasPrice: effectiveGasPrice,
118120
}
119121
if err := t.sendTelemetryMessage("/api/aggregatorTaskSent", body); err != nil {
120122
t.logger.Warn("[Telemetry] Error in TaskSentToEthereum", "error", err)

core/chainio/avs_writer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E
8989
// - If no receipt is found, but the batch state indicates the response has already been processed, it exits
9090
// without an error (returning `nil, nil`).
9191
// - An error if the process encounters a fatal issue (e.g., permanent failure in verifying balances or state).
92-
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, gasBumpPercentageLimit uint, timeToWaitBeforeBump time.Duration, onGasPriceBumped func(*big.Int)) (*types.Receipt, error) {
92+
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, gasBumpPercentageLimit uint, timeToWaitBeforeBump time.Duration, metrics *metrics.Metrics, onSetGasPrice func(*big.Int)) (*types.Receipt, error) {
9393
txOpts := *w.Signer.GetTxOpts()
9494
txOpts.NoSend = true // simulate the transaction
9595
simTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature, retry.SendToChainRetryParams())
@@ -141,6 +141,8 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
141141
txOpts.GasPrice = minimumGasPriceBump
142142
}
143143

144+
onSetGasPrice(txOpts.GasPrice)
145+
144146
if i > 0 {
145147
w.logger.Infof("Trying to get old sent transaction receipt before sending a new transaction", "merkle root", batchMerkleRootHashString)
146148
for _, tx := range sentTxs {
@@ -161,7 +163,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
161163
}
162164
w.logger.Infof("Batch state has not been responded yet, will send a new tx", "merkle root", batchMerkleRootHashString)
163165

164-
onGasPriceBumped(txOpts.GasPrice)
166+
metrics.IncBumpedGasPriceForAggregatedResponse()
165167
}
166168

167169
// We compare both Aggregator funds and Batcher balance in Aligned against respondToTaskFeeLimit

telemetry_api/lib/telemetry_api/traces.ex

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -300,18 +300,18 @@ defmodule TelemetryApi.Traces do
300300
end
301301

302302
@doc """
303-
Registers a bump in the gas price when the aggregator tries to respond to a task in the task trace.
303+
Registers a set gas price when the aggregator tries to respond to a task in the task trace.
304304
305305
## Examples
306306
307307
iex> merkle_root
308-
iex> bumped_gas_price
309-
iex> aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price)
308+
iex> gas_price
309+
iex> aggregator_task_set_gas_price(merkle_root, gas_price)
310310
:ok
311311
"""
312-
def aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price) do
312+
def aggregator_task_set_gas_price(merkle_root, gas_price) do
313313
with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do
314-
Tracer.add_event("Task gas price bumped", [{"bumped__gas_price", bumped_gas_price}])
314+
Tracer.add_event("Gas price set", [{"gas_price", gas_price}])
315315
:ok
316316
end
317317
end
@@ -323,12 +323,12 @@ defmodule TelemetryApi.Traces do
323323
324324
iex> merkle_root
325325
iex> tx_hash
326-
iex> aggregator_task_sent(merkle_root, tx_hash)
326+
iex> aggregator_task_sent(merkle_root, tx_hash, effective_gas_price)
327327
:ok
328328
"""
329-
def aggregator_task_sent(merkle_root, tx_hash) do
329+
def aggregator_task_sent(merkle_root, tx_hash, effective_gas_price) do
330330
with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do
331-
Tracer.add_event("Task Sent to Ethereum", [{"tx_hash", tx_hash}])
331+
Tracer.add_event("Task Sent to Ethereum", [{"tx_hash", tx_hash}, {"effective_gas_price", effective_gas_price}])
332332
:ok
333333
end
334334
end

telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,14 +123,14 @@ defmodule TelemetryApiWeb.TraceController do
123123
end
124124

125125
@doc """
126-
Registers a gas price bump in the trace of the given merkle_root
127-
Method: POST aggregatorTaskGasPriceBump
126+
Registers a gas price in the trace of the given merkle_root
127+
Method: POST aggregatorTaskSetGasPrice
128128
"""
129-
def aggregator_task_gas_price_bumped(conn, %{
129+
def aggregator_task_set_gas_price(conn, %{
130130
"merkle_root" => merkle_root,
131-
"bumped_gas_price" => bumped_gas_price
131+
"gas_price" => gas_price
132132
}) do
133-
with :ok <- Traces.aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price) do
133+
with :ok <- Traces.aggregator_task_set_gas_price(merkle_root, gas_price) do
134134
conn
135135
|> put_status(:ok)
136136
|> render(:show_merkle, merkle_root: merkle_root)
@@ -141,8 +141,8 @@ defmodule TelemetryApiWeb.TraceController do
141141
Register a task sent, from the aggregator, to Ethereum in the trace of the given merkle_root
142142
Method: POST aggregatorTaskSent
143143
"""
144-
def aggregator_task_sent(conn, %{"merkle_root" => merkle_root, "tx_hash" => tx_hash}) do
145-
with :ok <- Traces.aggregator_task_sent(merkle_root, tx_hash) do
144+
def aggregator_task_sent(conn, %{"merkle_root" => merkle_root, "tx_hash" => tx_hash, "effective_gas_price" => effective_gas_price}) do
145+
with :ok <- Traces.aggregator_task_sent(merkle_root, tx_hash, effective_gas_price) do
146146
conn
147147
|> put_status(:ok)
148148
|> render(:show_merkle, merkle_root: merkle_root)

telemetry_api/lib/telemetry_api_web/router.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ defmodule TelemetryApiWeb.Router do
1515
post "/operatorResponse", TraceController, :register_operator_response
1616
post "/quorumReached", TraceController, :quorum_reached
1717
post "/taskError", TraceController, :task_error
18-
post "/aggregatorTaskGasPriceBump", TraceController, :aggregator_task_gas_price_bumped
18+
post "/aggregatorTaskSetGasPrice", TraceController, :aggregator_task_set_gas_price
1919
post "/aggregatorTaskSent", TraceController, :aggregator_task_sent
2020
post "/finishTaskTrace", TraceController, :finish_task_trace
2121

0 commit comments

Comments
 (0)