From 8d787bc5baafba3bf68af8b4094e1b9e5d99e27a Mon Sep 17 00:00:00 2001 From: avilagaston9 Date: Fri, 22 Nov 2024 10:04:38 -0300 Subject: [PATCH 1/5] fix: improve telemetry spans --- .../src/retry/batcher_retryables.rs | 12 + .../lib/telemetry_api/periodically.ex | 33 ++- telemetry_api/lib/telemetry_api/traces.ex | 256 +++++++++--------- .../lib/telemetry_api/traces/trace.ex | 4 +- .../controllers/trace_controller.ex | 10 +- telemetry_api/lib/telemetry_api_web/router.ex | 4 +- 6 files changed, 166 insertions(+), 153 deletions(-) diff --git a/batcher/aligned-batcher/src/retry/batcher_retryables.rs b/batcher/aligned-batcher/src/retry/batcher_retryables.rs index ca7bdff018..c4917ebb76 100644 --- a/batcher/aligned-batcher/src/retry/batcher_retryables.rs +++ b/batcher/aligned-batcher/src/retry/batcher_retryables.rs @@ -162,6 +162,18 @@ pub async fn create_new_task_retryable( }; // timeout to prevent a deadlock while waiting for the transaction to be included in a block. + // _ = timeout(Duration::from_millis(transaction_wait_timeout), pending_tx) + // .await + // .map_err(|e| { + // warn!("Error while waiting for batch inclusion: {e}"); + // RetryError::Permanent(BatcherError::ReceiptNotFoundError) + // })? + // .map_err(|e| { + // warn!("Error while waiting for batch inclusion: {e}"); + // RetryError::Permanent(BatcherError::ReceiptNotFoundError) + // })? + // .ok_or(RetryError::Permanent(BatcherError::ReceiptNotFoundError)); + // return Err(RetryError::Permanent(BatcherError::ReceiptNotFoundError)); timeout(Duration::from_millis(transaction_wait_timeout), pending_tx) .await .map_err(|e| { diff --git a/telemetry_api/lib/telemetry_api/periodically.ex b/telemetry_api/lib/telemetry_api/periodically.ex index 6aae119823..a50a5b5b39 100644 --- a/telemetry_api/lib/telemetry_api/periodically.ex +++ b/telemetry_api/lib/telemetry_api/periodically.ex @@ -10,16 +10,19 @@ defmodule TelemetryApi.Periodically do @deregistered 2 @wait_time_str System.get_env("OPERATOR_FETCHER_WAIT_TIME_MS") || - raise """ - environment variable OPERATOR_FETCHER_WAIT_TIME_MS is missing. - """ - - @wait_time_ms ( - case Integer.parse(@wait_time_str) do - :error -> raise("OPERATOR_FETCHER_WAIT_TIME_MS is not a number, received: #{@wait_time_str}") - {num, _} -> num - end - ) + raise(""" + environment variable OPERATOR_FETCHER_WAIT_TIME_MS is missing. + """) + + @wait_time_ms (case Integer.parse(@wait_time_str) do + :error -> + raise( + "OPERATOR_FETCHER_WAIT_TIME_MS is not a number, received: #{@wait_time_str}" + ) + + {num, _} -> + num + end) def start_link(_) do GenServer.start_link(__MODULE__, %{}) @@ -32,7 +35,8 @@ defmodule TelemetryApi.Periodically do def send_work() do one_second = 1000 - :timer.send_interval(one_second * 10, :gas_price) # every 10 seconds, once per block + some margin + # every 10 seconds, once per block + some margin + :timer.send_interval(one_second * 10, :gas_price) :timer.send_interval(@wait_time_ms, :poll_service) end @@ -50,8 +54,10 @@ defmodule TelemetryApi.Periodically do {:error, error} -> IO.inspect("Error fetching gas price: #{error}") end - {:noreply, %{}} + + {:noreply, %{}} end + defp fetch_operators_info() do case Operators.fetch_all_operators() do :ok -> :ok @@ -67,9 +73,10 @@ defmodule TelemetryApi.Periodically do Operators.update_operator(op, %{status: string_status(status)}) error -> - Logger.error("Error when updating status: #{error}") + Logger.error("Error when updating status: #{inspect(error)}") end end) + :ok end diff --git a/telemetry_api/lib/telemetry_api/traces.ex b/telemetry_api/lib/telemetry_api/traces.ex index e611f386a0..7d9ce0499a 100644 --- a/telemetry_api/lib/telemetry_api/traces.ex +++ b/telemetry_api/lib/telemetry_api/traces.ex @@ -2,50 +2,40 @@ defmodule TelemetryApi.Traces do @moduledoc """ The Traces context. """ + require OpenTelemetry.Tracer alias TelemetryApi.Traces.Trace alias TelemetryApi.Operators alias TelemetryApi.ContractManagers.StakeRegistry - require OpenTelemetry.Tracer - require OpenTelemetry.Ctx - alias OpenTelemetry.Tracer, as: Tracer - alias OpenTelemetry.Ctx, as: Ctx + alias OpenTelemetry.Tracer + alias OpenTelemetry.Ctx @doc """ - Send the trace to OpenTelemetry - - This function is responsible for creating a new span and storing the context in the Agent. + Registers an aggregator new task initialization. ## Examples iex> merkle_root = "0x1234567890abcdef" - iex> create_task_trace(merkle_root) + iex> aggregator_init_task(merkle_root) :ok """ - def create_task_trace(merkle_root) do - with {:ok, trace} <- set_current_trace(merkle_root) do - with {:ok, total_stake} <- StakeRegistry.get_current_total_stake() do - aggregator_subspan_ctx = - Tracer.start_span( - "Aggregator", - %{ - attributes: [ - {:merkle_root, merkle_root}, - {:total_stake, total_stake} - ] - } - ) - - Tracer.set_current_span(aggregator_subspan_ctx) - Tracer.add_event("New task event received", []) - - TraceStore.store_trace(merkle_root, %{ - trace - | subspans: Map.put(trace.subspans, :aggregator, aggregator_subspan_ctx) + def aggregator_init_task(merkle_root) do + with {:ok, _trace} <- set_current_trace(merkle_root), + {:ok, total_stake} <- StakeRegistry.get_current_total_stake() do + Tracer.with_span "Aggregator - New task event received" do + Tracer.set_attributes(%{ + attributes: [ + {:total_stake, total_stake} + ] }) - - :ok end + + # TraceStore.store_trace(merkle_root, %{ + # trace + # | subspans: Map.put(trace.subspans, :aggregator, aggregator_subspan_ctx) + # }) + + :ok end end @@ -62,25 +52,27 @@ defmodule TelemetryApi.Traces do def register_operator_response(merkle_root, operator_id) do with {:ok, operator} <- Operators.get_operator(%{id: operator_id}), :ok <- validate_operator_registration(operator), - {:ok, trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do + {:ok, trace} <- set_current_trace(merkle_root) do operator_stake = Decimal.new(operator.stake) new_stake = Decimal.add(trace.current_stake, operator_stake) new_stake_fraction = Decimal.div(new_stake, trace.total_stake) operator_stake_fraction = Decimal.div(operator_stake, trace.total_stake) - Tracer.add_event( - "Operator Response: " <> operator.name, - [ - {:merkle_root, merkle_root}, - {:operator_id, operator_id}, - {:name, operator.name}, - {:address, operator.address}, - {:operator_stake, Decimal.to_string(operator_stake)}, - {:current_stake, Decimal.to_string(new_stake)}, - {:current_stake_fraction, Decimal.to_string(new_stake_fraction)}, - {:operator_stake_fraction, Decimal.to_string(operator_stake_fraction)} - ] - ) + Tracer.with_span "Aggregator - Operator Response: " <> + operator.name do + Tracer.set_attributes(%{ + attributes: [ + {:merkle_root, merkle_root}, + {:operator_id, operator_id}, + {:name, operator.name}, + {:address, operator.address}, + {:operator_stake, Decimal.to_string(operator_stake)}, + {:current_stake, Decimal.to_string(new_stake)}, + {:current_stake_fraction, Decimal.to_string(new_stake_fraction)}, + {:operator_stake_fraction, Decimal.to_string(operator_stake_fraction)} + ] + }) + end responses = trace.responses ++ [operator_id] @@ -109,24 +101,13 @@ defmodule TelemetryApi.Traces do :ok """ def batcher_task_creation_failed(merkle_root, error) do - with {:ok, trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do - Tracer.add_event( - "Batcher Task Creation Failed", - [ - {:error, error} - ] - ) - - Tracer.end_span() - - TraceStore.store_trace(merkle_root, %{ - trace - | subspans: Map.delete(trace.subspans, :batcher) - }) - - with {:ok, _trace} <- set_current_trace(merkle_root) do - Tracer.end_span() - TraceStore.delete_trace(merkle_root) + with {:ok, _trace} <- set_current_trace(merkle_root) do + Tracer.with_span "Batcher - Task Creation Failed" do + Tracer.set_attributes(%{ + attributes: [ + {:error, error} + ] + }) end :ok @@ -134,7 +115,7 @@ defmodule TelemetryApi.Traces do end @doc """ - Create a new task trace for the batcher and starts the subspan for the batcher. + Create a new task trace from the batcher. ## Examples @@ -161,33 +142,32 @@ defmodule TelemetryApi.Traces do context: ctx, total_stake: total_stake, current_stake: 0, - responses: [], - subspans: %{} + responses: [] }) - with {:ok, trace} <- set_current_trace(merkle_root) do - # This span ends inmediately after it's created just to set the correct title to the final task. - Tracer.with_span "Task: #{merkle_root}" do - Tracer.set_attributes(%{merkle_root: merkle_root}) - end - - batcher_subspan_ctx = - Tracer.start_span( - "Batcher", - %{ - attributes: [ - {:merkle_root, merkle_root} - ] - } - ) - - Tracer.set_current_span(batcher_subspan_ctx) - Tracer.add_event("New batch", [{:merkle_root, merkle_root}]) - - TraceStore.store_trace(merkle_root, %{ - trace - | subspans: Map.put(trace.subspans, :batcher, batcher_subspan_ctx) - }) + # with {:ok, trace} <- set_current_trace(merkle_root) do + # This span ends inmediately after it's created just to set the correct title to the final task. + Tracer.with_span "Task: #{merkle_root}" do + Tracer.set_attributes(%{merkle_root: merkle_root}) + # end + + # batcher_subspan_ctx = + # Tracer.start_span( + # "Batcher", + # %{ + # attributes: [ + # {:merkle_root, merkle_root} + # ] + # } + # ) + + # Tracer.set_current_span(batcher_subspan_ctx) + # Tracer.add_event("New batch", [{:merkle_root, merkle_root}]) + + # TraceStore.store_trace(merkle_root, %{ + # trace + # | subspans: Map.put(trace.subspans, :batcher, batcher_subspan_ctx) + # }) :ok end @@ -203,12 +183,16 @@ defmodule TelemetryApi.Traces do :ok """ def batcher_task_uploaded_to_s3(merkle_root) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do - Tracer.add_event("Batcher Task Uploaded to S3", []) + with {:ok, _trace} <- set_current_trace(merkle_root) do + Tracer.with_span "Batcher - Task Uploaded to S3" do + Tracer.set_attributes(%{ + attributes: [] + }) + end + :ok end end - @doc """ Registers the sending of a batcher task to Ethereum in the task trace. @@ -221,15 +205,10 @@ defmodule TelemetryApi.Traces do :ok """ def batcher_task_sent(merkle_root, tx_hash) do - with {:ok, trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do - Tracer.add_event("Batcher Task Sent to Ethereum", [{"tx_hash", tx_hash}]) - - Tracer.end_span() - - TraceStore.store_trace(merkle_root, %{ - trace - | subspans: Map.delete(trace.subspans, :batcher) - }) + with {:ok, _trace} <- set_current_trace(merkle_root) do + Tracer.with_span "Batcher - Task Sent to Ethereum" do + Tracer.set_attribute("tx_hash", tx_hash) + end :ok end @@ -245,13 +224,14 @@ defmodule TelemetryApi.Traces do :ok """ def batcher_task_started(merkle_root, fee_per_proof, total_proofs) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do + with {:ok, _trace} <- set_current_trace(merkle_root) do IO.inspect("fee_per_proof: #{fee_per_proof}") - Tracer.add_event("Batcher Task being created", - fee_per_proof: fee_per_proof, - total_proofs: total_proofs - ) + Tracer.with_span "Batcher - Task being created" do + Tracer.set_attributes(%{ + attributes: [fee_per_proof: fee_per_proof, total_proofs: total_proofs] + }) + end :ok end @@ -267,8 +247,13 @@ defmodule TelemetryApi.Traces do :ok """ def quorum_reached(merkle_root) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do - Tracer.add_event("Quorum Reached", []) + with {:ok, _trace} <- set_current_trace(merkle_root) do + Tracer.with_span "Aggregator - Quorum Reached" do + Tracer.set_attributes(%{ + attributes: [] + }) + end + IO.inspect("Reached quorum registered. merkle_root: #{merkle_root}") :ok end @@ -285,20 +270,21 @@ defmodule TelemetryApi.Traces do :ok """ def task_error(merkle_root, error) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do - Tracer.add_event( - "Batch verification failed", - [ - {:status, "error"}, - {:error, error} - ] - ) + with {:ok, _trace} <- set_current_trace(merkle_root) do + Tracer.with_span "Batcher - Verification failed" do + Tracer.set_attributes(%{ + attributes: [ + {:status, "error"}, + {:error, error} + ] + }) + end IO.inspect("Task error registered. merkle_root: #{IO.inspect(merkle_root)}") :ok end end - + @doc """ Registers a bump in the gas price when the aggregator tries to respond to a task in the task trace. @@ -310,8 +296,11 @@ defmodule TelemetryApi.Traces do :ok """ def aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do - Tracer.add_event("Task gas price bumped", [{"bumped__gas_price", bumped_gas_price}]) + with {:ok, _trace} <- set_current_trace(merkle_root) do + Tracer.with_span "Aggregator - Task gas price bumped" do + Tracer.set_attributes(%{attributes: [{"bumped__gas_price", bumped_gas_price}]}) + end + :ok end end @@ -327,8 +316,11 @@ defmodule TelemetryApi.Traces do :ok """ def aggregator_task_sent(merkle_root, tx_hash) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do - Tracer.add_event("Task Sent to Ethereum", [{"tx_hash", tx_hash}]) + with {:ok, _trace} <- set_current_trace(merkle_root) do + Tracer.with_span "Aggregator - Task Sent to Ethereum" do + Tracer.set_attributes(%{attributes: [{"tx_hash", tx_hash}]}) + end + :ok end end @@ -341,11 +333,11 @@ defmodule TelemetryApi.Traces do ## Examples iex> merkle_root = "0x1234567890abcdef" - iex> finish_task_trace(merkle_root) + iex> aggregator_finish_task(merkle_root) :ok """ - def finish_task_trace(merkle_root) do - with {:ok, trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do + def aggregator_finish_task(merkle_root) do + with {:ok, trace} <- set_current_trace(merkle_root) do missing_operators = Operators.list_operators() |> Enum.filter(fn o -> o.id not in trace.responses and Operators.is_registered?(o) end) @@ -371,7 +363,9 @@ defmodule TelemetryApi.Traces do missing_operators = missing_operators |> Enum.map(fn o -> o.name end) |> Enum.join(";") - Tracer.add_event("Missing Operators", [{:operators, missing_operators}]) + Tracer.with_span "Aggregator - Missing Operators" do + Tracer.set_attribute("operators", missing_operators) + end end defp set_current_trace(merkle_root) do @@ -382,13 +376,13 @@ defmodule TelemetryApi.Traces do end end - defp set_current_trace_with_subspan(merkle_root, span_name) do - with {:ok, trace} <- TraceStore.get_trace(merkle_root) do - Ctx.attach(trace.context) - Tracer.set_current_span(trace.subspans[span_name]) - {:ok, trace} - end - end + # defp set_current_trace_with_subspan(merkle_root, span_name) do + # with {:ok, trace} <- TraceStore.get_trace(merkle_root) do + # Ctx.attach(trace.context) + # Tracer.set_current_span(trace.subspans[span_name]) + # {:ok, trace} + # end + # end defp validate_operator_registration(operator) do if Operators.is_registered?(operator) do diff --git a/telemetry_api/lib/telemetry_api/traces/trace.ex b/telemetry_api/lib/telemetry_api/traces/trace.ex index 9e074d2893..cabad1b41d 100644 --- a/telemetry_api/lib/telemetry_api/traces/trace.ex +++ b/telemetry_api/lib/telemetry_api/traces/trace.ex @@ -1,4 +1,4 @@ defmodule TelemetryApi.Traces.Trace do - @enforce_keys [:parent_span, :context, :total_stake, :current_stake, :responses, :subspans] - defstruct [:parent_span, :context, :total_stake, :current_stake, :responses, :subspans] + @enforce_keys [:parent_span, :context, :total_stake, :current_stake, :responses] + defstruct [:parent_span, :context, :total_stake, :current_stake, :responses] end diff --git a/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex b/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex index b9e334652a..f42f4164c1 100644 --- a/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex +++ b/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex @@ -6,11 +6,11 @@ defmodule TelemetryApiWeb.TraceController do action_fallback(TelemetryApiWeb.FallbackController) @doc """ - Create a trace for a NewTask with the given merkle_root + Register an aggregator init task in the bls service. Method: POST initTaskTrace """ - def create_task_trace(conn, %{"merkle_root" => merkle_root}) do - with :ok <- Traces.create_task_trace(merkle_root) do + def aggregator_init_task(conn, %{"merkle_root" => merkle_root}) do + with :ok <- Traces.aggregator_init_task(merkle_root) do conn |> put_status(:ok) |> render(:show_merkle, merkle_root: merkle_root) @@ -153,8 +153,8 @@ defmodule TelemetryApiWeb.TraceController do Finish a trace for the given merkle_root Method: POST finishTaskTrace """ - def finish_task_trace(conn, %{"merkle_root" => merkle_root}) do - with :ok <- Traces.finish_task_trace(merkle_root) do + def aggregator_finish_task(conn, %{"merkle_root" => merkle_root}) do + with :ok <- Traces.aggregator_finish_task(merkle_root) do conn |> put_status(:ok) |> render(:show_merkle, merkle_root: merkle_root) diff --git a/telemetry_api/lib/telemetry_api_web/router.ex b/telemetry_api/lib/telemetry_api_web/router.ex index 200e6c94f5..320453526e 100644 --- a/telemetry_api/lib/telemetry_api_web/router.ex +++ b/telemetry_api/lib/telemetry_api_web/router.ex @@ -11,13 +11,13 @@ defmodule TelemetryApiWeb.Router do get "/operators", OperatorController, :index get "/operators/:id", OperatorController, :show post "/operators", OperatorController, :create_or_update - post "/initTaskTrace", TraceController, :create_task_trace + post "/initTaskTrace", TraceController, :aggregator_init_task post "/operatorResponse", TraceController, :register_operator_response post "/quorumReached", TraceController, :quorum_reached post "/taskError", TraceController, :task_error post "/aggregatorTaskGasPriceBump", TraceController, :aggregator_task_gas_price_bumped post "/aggregatorTaskSent", TraceController, :aggregator_task_sent - post "/finishTaskTrace", TraceController, :finish_task_trace + post "/finishTaskTrace", TraceController, :aggregator_finish_task post "/initBatcherTaskTrace", TraceController, :create_batcher_task_trace post "/batcherTaskUploadedToS3", TraceController, :batcher_task_uploaded_to_s3 From 2ec5c324c6525b5a5fe97b33f19132f358481fe7 Mon Sep 17 00:00:00 2001 From: avilagaston9 Date: Fri, 22 Nov 2024 10:39:41 -0300 Subject: [PATCH 2/5] fix: formatting --- .../lib/telemetry_api/periodically.ex | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/telemetry_api/lib/telemetry_api/periodically.ex b/telemetry_api/lib/telemetry_api/periodically.ex index a50a5b5b39..57f4388f84 100644 --- a/telemetry_api/lib/telemetry_api/periodically.ex +++ b/telemetry_api/lib/telemetry_api/periodically.ex @@ -10,19 +10,16 @@ defmodule TelemetryApi.Periodically do @deregistered 2 @wait_time_str System.get_env("OPERATOR_FETCHER_WAIT_TIME_MS") || - raise(""" - environment variable OPERATOR_FETCHER_WAIT_TIME_MS is missing. - """) - - @wait_time_ms (case Integer.parse(@wait_time_str) do - :error -> - raise( - "OPERATOR_FETCHER_WAIT_TIME_MS is not a number, received: #{@wait_time_str}" - ) - - {num, _} -> - num - end) + raise """ + environment variable OPERATOR_FETCHER_WAIT_TIME_MS is missing. + """ + + @wait_time_ms ( + case Integer.parse(@wait_time_str) do + :error -> raise("OPERATOR_FETCHER_WAIT_TIME_MS is not a number, received: #{@wait_time_str}") + {num, _} -> num + end + ) def start_link(_) do GenServer.start_link(__MODULE__, %{}) @@ -35,8 +32,7 @@ defmodule TelemetryApi.Periodically do def send_work() do one_second = 1000 - # every 10 seconds, once per block + some margin - :timer.send_interval(one_second * 10, :gas_price) + :timer.send_interval(one_second * 10, :gas_price) # every 10 seconds, once per block + some margin :timer.send_interval(@wait_time_ms, :poll_service) end @@ -52,7 +48,7 @@ defmodule TelemetryApi.Periodically do EthereumMetrics.new_gas_price(gas_price) {:error, error} -> - IO.inspect("Error fetching gas price: #{error}") + Logger.error("Error fetching gas price: #{inspect(error)}") end {:noreply, %{}} @@ -61,7 +57,7 @@ defmodule TelemetryApi.Periodically do defp fetch_operators_info() do case Operators.fetch_all_operators() do :ok -> :ok - {:error, message} -> IO.inspect("Couldn't fetch operators: #{IO.inspect(message)}") + {:error, message} -> Logger.error("Couldn't fetch operators: #{inspect(message)}") end end From cbef9d50d40583b72e657536c3bdad7619b5fafe Mon Sep 17 00:00:00 2001 From: avilagaston9 Date: Fri, 22 Nov 2024 10:59:40 -0300 Subject: [PATCH 3/5] fix: clean unused code --- telemetry_api/lib/telemetry_api/traces.ex | 36 +------------------ .../controllers/trace_controller.ex | 2 +- 2 files changed, 2 insertions(+), 36 deletions(-) diff --git a/telemetry_api/lib/telemetry_api/traces.ex b/telemetry_api/lib/telemetry_api/traces.ex index 7d9ce0499a..6633ee1996 100644 --- a/telemetry_api/lib/telemetry_api/traces.ex +++ b/telemetry_api/lib/telemetry_api/traces.ex @@ -30,11 +30,6 @@ defmodule TelemetryApi.Traces do }) end - # TraceStore.store_trace(merkle_root, %{ - # trace - # | subspans: Map.put(trace.subspans, :aggregator, aggregator_subspan_ctx) - # }) - :ok end end @@ -145,30 +140,9 @@ defmodule TelemetryApi.Traces do responses: [] }) - # with {:ok, trace} <- set_current_trace(merkle_root) do # This span ends inmediately after it's created just to set the correct title to the final task. Tracer.with_span "Task: #{merkle_root}" do Tracer.set_attributes(%{merkle_root: merkle_root}) - # end - - # batcher_subspan_ctx = - # Tracer.start_span( - # "Batcher", - # %{ - # attributes: [ - # {:merkle_root, merkle_root} - # ] - # } - # ) - - # Tracer.set_current_span(batcher_subspan_ctx) - # Tracer.add_event("New batch", [{:merkle_root, merkle_root}]) - - # TraceStore.store_trace(merkle_root, %{ - # trace - # | subspans: Map.put(trace.subspans, :batcher, batcher_subspan_ctx) - # }) - :ok end end @@ -271,7 +245,7 @@ defmodule TelemetryApi.Traces do """ def task_error(merkle_root, error) do with {:ok, _trace} <- set_current_trace(merkle_root) do - Tracer.with_span "Batcher - Verification failed" do + Tracer.with_span "Aggregator - Batch verification failed" do Tracer.set_attributes(%{ attributes: [ {:status, "error"}, @@ -376,14 +350,6 @@ defmodule TelemetryApi.Traces do end end - # defp set_current_trace_with_subspan(merkle_root, span_name) do - # with {:ok, trace} <- TraceStore.get_trace(merkle_root) do - # Ctx.attach(trace.context) - # Tracer.set_current_span(trace.subspans[span_name]) - # {:ok, trace} - # end - # end - defp validate_operator_registration(operator) do if Operators.is_registered?(operator) do :ok diff --git a/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex b/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex index f42f4164c1..82dae6c2a4 100644 --- a/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex +++ b/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex @@ -6,7 +6,7 @@ defmodule TelemetryApiWeb.TraceController do action_fallback(TelemetryApiWeb.FallbackController) @doc """ - Register an aggregator init task in the bls service. + Register an aggregator init task with the given merkle_root Method: POST initTaskTrace """ def aggregator_init_task(conn, %{"merkle_root" => merkle_root}) do From 291be6ae51973ec35681f4e4b2569bbb488dd0bb Mon Sep 17 00:00:00 2001 From: avilagaston9 Date: Fri, 22 Nov 2024 12:18:07 -0300 Subject: [PATCH 4/5] fix: attributes in spans --- .../src/retry/batcher_retryables.rs | 12 ---- telemetry_api/lib/telemetry_api/traces.ex | 58 +++++++------------ 2 files changed, 21 insertions(+), 49 deletions(-) diff --git a/batcher/aligned-batcher/src/retry/batcher_retryables.rs b/batcher/aligned-batcher/src/retry/batcher_retryables.rs index c4917ebb76..ca7bdff018 100644 --- a/batcher/aligned-batcher/src/retry/batcher_retryables.rs +++ b/batcher/aligned-batcher/src/retry/batcher_retryables.rs @@ -162,18 +162,6 @@ pub async fn create_new_task_retryable( }; // timeout to prevent a deadlock while waiting for the transaction to be included in a block. - // _ = timeout(Duration::from_millis(transaction_wait_timeout), pending_tx) - // .await - // .map_err(|e| { - // warn!("Error while waiting for batch inclusion: {e}"); - // RetryError::Permanent(BatcherError::ReceiptNotFoundError) - // })? - // .map_err(|e| { - // warn!("Error while waiting for batch inclusion: {e}"); - // RetryError::Permanent(BatcherError::ReceiptNotFoundError) - // })? - // .ok_or(RetryError::Permanent(BatcherError::ReceiptNotFoundError)); - // return Err(RetryError::Permanent(BatcherError::ReceiptNotFoundError)); timeout(Duration::from_millis(transaction_wait_timeout), pending_tx) .await .map_err(|e| { diff --git a/telemetry_api/lib/telemetry_api/traces.ex b/telemetry_api/lib/telemetry_api/traces.ex index 6633ee1996..eb693dcdc1 100644 --- a/telemetry_api/lib/telemetry_api/traces.ex +++ b/telemetry_api/lib/telemetry_api/traces.ex @@ -23,11 +23,7 @@ defmodule TelemetryApi.Traces do with {:ok, _trace} <- set_current_trace(merkle_root), {:ok, total_stake} <- StakeRegistry.get_current_total_stake() do Tracer.with_span "Aggregator - New task event received" do - Tracer.set_attributes(%{ - attributes: [ - {:total_stake, total_stake} - ] - }) + Tracer.set_attributes(%{total_stake: total_stake}) end :ok @@ -56,16 +52,14 @@ defmodule TelemetryApi.Traces do Tracer.with_span "Aggregator - Operator Response: " <> operator.name do Tracer.set_attributes(%{ - attributes: [ - {:merkle_root, merkle_root}, - {:operator_id, operator_id}, - {:name, operator.name}, - {:address, operator.address}, - {:operator_stake, Decimal.to_string(operator_stake)}, - {:current_stake, Decimal.to_string(new_stake)}, - {:current_stake_fraction, Decimal.to_string(new_stake_fraction)}, - {:operator_stake_fraction, Decimal.to_string(operator_stake_fraction)} - ] + merkle_root: merkle_root, + operator_id: operator_id, + name: operator.name, + address: operator.address, + operator_stake: Decimal.to_string(operator_stake), + current_stake: Decimal.to_string(new_stake), + current_stake_fraction: Decimal.to_string(new_stake_fraction), + operator_stake_fraction: Decimal.to_string(operator_stake_fraction) }) end @@ -78,7 +72,7 @@ defmodule TelemetryApi.Traces do }) IO.inspect( - "Operator response included. merkle_root: #{IO.inspect(merkle_root)} operator_id: #{IO.inspect(operator_id)}" + "Operator response included. merkle_root: #{inspect(merkle_root)} operator_id: #{inspect(operator_id)}" ) :ok @@ -98,11 +92,7 @@ defmodule TelemetryApi.Traces do def batcher_task_creation_failed(merkle_root, error) do with {:ok, _trace} <- set_current_trace(merkle_root) do Tracer.with_span "Batcher - Task Creation Failed" do - Tracer.set_attributes(%{ - attributes: [ - {:error, error} - ] - }) + Tracer.set_attributes(%{error: error}) end :ok @@ -140,6 +130,8 @@ defmodule TelemetryApi.Traces do responses: [] }) + Tracer.set_current_span(root_span_ctx) + # This span ends inmediately after it's created just to set the correct title to the final task. Tracer.with_span "Task: #{merkle_root}" do Tracer.set_attributes(%{merkle_root: merkle_root}) @@ -159,9 +151,7 @@ defmodule TelemetryApi.Traces do def batcher_task_uploaded_to_s3(merkle_root) do with {:ok, _trace} <- set_current_trace(merkle_root) do Tracer.with_span "Batcher - Task Uploaded to S3" do - Tracer.set_attributes(%{ - attributes: [] - }) + Tracer.set_attributes(%{merkle_root: merkle_root}) end :ok @@ -202,9 +192,7 @@ defmodule TelemetryApi.Traces do IO.inspect("fee_per_proof: #{fee_per_proof}") Tracer.with_span "Batcher - Task being created" do - Tracer.set_attributes(%{ - attributes: [fee_per_proof: fee_per_proof, total_proofs: total_proofs] - }) + Tracer.set_attributes(%{fee_per_proof: fee_per_proof, total_proofs: total_proofs}) end :ok @@ -223,9 +211,7 @@ defmodule TelemetryApi.Traces do def quorum_reached(merkle_root) do with {:ok, _trace} <- set_current_trace(merkle_root) do Tracer.with_span "Aggregator - Quorum Reached" do - Tracer.set_attributes(%{ - attributes: [] - }) + Tracer.set_attributes(%{merkle_root: merkle_root}) end IO.inspect("Reached quorum registered. merkle_root: #{merkle_root}") @@ -247,14 +233,12 @@ defmodule TelemetryApi.Traces do with {:ok, _trace} <- set_current_trace(merkle_root) do Tracer.with_span "Aggregator - Batch verification failed" do Tracer.set_attributes(%{ - attributes: [ - {:status, "error"}, - {:error, error} - ] + status: "error", + error: error }) end - IO.inspect("Task error registered. merkle_root: #{IO.inspect(merkle_root)}") + IO.inspect("Task error registered. merkle_root: #{inspect(merkle_root)}") :ok end end @@ -272,7 +256,7 @@ defmodule TelemetryApi.Traces do def aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price) do with {:ok, _trace} <- set_current_trace(merkle_root) do Tracer.with_span "Aggregator - Task gas price bumped" do - Tracer.set_attributes(%{attributes: [{"bumped__gas_price", bumped_gas_price}]}) + Tracer.set_attributes(%{bumped_gas_price: bumped_gas_price}) end :ok @@ -292,7 +276,7 @@ defmodule TelemetryApi.Traces do def aggregator_task_sent(merkle_root, tx_hash) do with {:ok, _trace} <- set_current_trace(merkle_root) do Tracer.with_span "Aggregator - Task Sent to Ethereum" do - Tracer.set_attributes(%{attributes: [{"tx_hash", tx_hash}]}) + Tracer.set_attributes(%{tx_hash: tx_hash}) end :ok From 1d5d5aef6ad1c95c23b2d73ad04f570969a671fc Mon Sep 17 00:00:00 2001 From: avilagaston9 Date: Thu, 28 Nov 2024 14:01:42 -0300 Subject: [PATCH 5/5] fix: update eigen middlesare --- contracts/lib/eigenlayer-middleware | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contracts/lib/eigenlayer-middleware b/contracts/lib/eigenlayer-middleware index a1801f07e1..c0863c3789 160000 --- a/contracts/lib/eigenlayer-middleware +++ b/contracts/lib/eigenlayer-middleware @@ -1 +1 @@ -Subproject commit a1801f07e15192ef06db86dfec716eb2b6a4fe90 +Subproject commit c0863c378974f92e6cc5030c24aa0aecb5c5b7c5