diff --git a/telemetry_api/lib/telemetry_api/periodically.ex b/telemetry_api/lib/telemetry_api/periodically.ex index 6aae119823..57f4388f84 100644 --- a/telemetry_api/lib/telemetry_api/periodically.ex +++ b/telemetry_api/lib/telemetry_api/periodically.ex @@ -48,14 +48,16 @@ defmodule TelemetryApi.Periodically do EthereumMetrics.new_gas_price(gas_price) {:error, error} -> - IO.inspect("Error fetching gas price: #{error}") + Logger.error("Error fetching gas price: #{inspect(error)}") end - {:noreply, %{}} + + {:noreply, %{}} end + defp fetch_operators_info() do case Operators.fetch_all_operators() do :ok -> :ok - {:error, message} -> IO.inspect("Couldn't fetch operators: #{IO.inspect(message)}") + {:error, message} -> Logger.error("Couldn't fetch operators: #{inspect(message)}") end end @@ -67,9 +69,10 @@ defmodule TelemetryApi.Periodically do Operators.update_operator(op, %{status: string_status(status)}) error -> - Logger.error("Error when updating status: #{error}") + Logger.error("Error when updating status: #{inspect(error)}") end end) + :ok end diff --git a/telemetry_api/lib/telemetry_api/traces.ex b/telemetry_api/lib/telemetry_api/traces.ex index e611f386a0..eb693dcdc1 100644 --- a/telemetry_api/lib/telemetry_api/traces.ex +++ b/telemetry_api/lib/telemetry_api/traces.ex @@ -2,50 +2,31 @@ defmodule TelemetryApi.Traces do @moduledoc """ The Traces context. """ + require OpenTelemetry.Tracer alias TelemetryApi.Traces.Trace alias TelemetryApi.Operators alias TelemetryApi.ContractManagers.StakeRegistry - require OpenTelemetry.Tracer - require OpenTelemetry.Ctx - alias OpenTelemetry.Tracer, as: Tracer - alias OpenTelemetry.Ctx, as: Ctx + alias OpenTelemetry.Tracer + alias OpenTelemetry.Ctx @doc """ - Send the trace to OpenTelemetry - - This function is responsible for creating a new span and storing the context in the Agent. + Registers an aggregator new task initialization. ## Examples iex> merkle_root = "0x1234567890abcdef" - iex> create_task_trace(merkle_root) + iex> aggregator_init_task(merkle_root) :ok """ - def create_task_trace(merkle_root) do - with {:ok, trace} <- set_current_trace(merkle_root) do - with {:ok, total_stake} <- StakeRegistry.get_current_total_stake() do - aggregator_subspan_ctx = - Tracer.start_span( - "Aggregator", - %{ - attributes: [ - {:merkle_root, merkle_root}, - {:total_stake, total_stake} - ] - } - ) - - Tracer.set_current_span(aggregator_subspan_ctx) - Tracer.add_event("New task event received", []) - - TraceStore.store_trace(merkle_root, %{ - trace - | subspans: Map.put(trace.subspans, :aggregator, aggregator_subspan_ctx) - }) - - :ok + def aggregator_init_task(merkle_root) do + with {:ok, _trace} <- set_current_trace(merkle_root), + {:ok, total_stake} <- StakeRegistry.get_current_total_stake() do + Tracer.with_span "Aggregator - New task event received" do + Tracer.set_attributes(%{total_stake: total_stake}) end + + :ok end end @@ -62,25 +43,25 @@ defmodule TelemetryApi.Traces do def register_operator_response(merkle_root, operator_id) do with {:ok, operator} <- Operators.get_operator(%{id: operator_id}), :ok <- validate_operator_registration(operator), - {:ok, trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do + {:ok, trace} <- set_current_trace(merkle_root) do operator_stake = Decimal.new(operator.stake) new_stake = Decimal.add(trace.current_stake, operator_stake) new_stake_fraction = Decimal.div(new_stake, trace.total_stake) operator_stake_fraction = Decimal.div(operator_stake, trace.total_stake) - Tracer.add_event( - "Operator Response: " <> operator.name, - [ - {:merkle_root, merkle_root}, - {:operator_id, operator_id}, - {:name, operator.name}, - {:address, operator.address}, - {:operator_stake, Decimal.to_string(operator_stake)}, - {:current_stake, Decimal.to_string(new_stake)}, - {:current_stake_fraction, Decimal.to_string(new_stake_fraction)}, - {:operator_stake_fraction, Decimal.to_string(operator_stake_fraction)} - ] - ) + Tracer.with_span "Aggregator - Operator Response: " <> + operator.name do + Tracer.set_attributes(%{ + merkle_root: merkle_root, + operator_id: operator_id, + name: operator.name, + address: operator.address, + operator_stake: Decimal.to_string(operator_stake), + current_stake: Decimal.to_string(new_stake), + current_stake_fraction: Decimal.to_string(new_stake_fraction), + operator_stake_fraction: Decimal.to_string(operator_stake_fraction) + }) + end responses = trace.responses ++ [operator_id] @@ -91,7 +72,7 @@ defmodule TelemetryApi.Traces do }) IO.inspect( - "Operator response included. merkle_root: #{IO.inspect(merkle_root)} operator_id: #{IO.inspect(operator_id)}" + "Operator response included. merkle_root: #{inspect(merkle_root)} operator_id: #{inspect(operator_id)}" ) :ok @@ -109,24 +90,9 @@ defmodule TelemetryApi.Traces do :ok """ def batcher_task_creation_failed(merkle_root, error) do - with {:ok, trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do - Tracer.add_event( - "Batcher Task Creation Failed", - [ - {:error, error} - ] - ) - - Tracer.end_span() - - TraceStore.store_trace(merkle_root, %{ - trace - | subspans: Map.delete(trace.subspans, :batcher) - }) - - with {:ok, _trace} <- set_current_trace(merkle_root) do - Tracer.end_span() - TraceStore.delete_trace(merkle_root) + with {:ok, _trace} <- set_current_trace(merkle_root) do + Tracer.with_span "Batcher - Task Creation Failed" do + Tracer.set_attributes(%{error: error}) end :ok @@ -134,7 +100,7 @@ defmodule TelemetryApi.Traces do end @doc """ - Create a new task trace for the batcher and starts the subspan for the batcher. + Create a new task trace from the batcher. ## Examples @@ -161,34 +127,14 @@ defmodule TelemetryApi.Traces do context: ctx, total_stake: total_stake, current_stake: 0, - responses: [], - subspans: %{} + responses: [] }) - with {:ok, trace} <- set_current_trace(merkle_root) do - # This span ends inmediately after it's created just to set the correct title to the final task. - Tracer.with_span "Task: #{merkle_root}" do - Tracer.set_attributes(%{merkle_root: merkle_root}) - end - - batcher_subspan_ctx = - Tracer.start_span( - "Batcher", - %{ - attributes: [ - {:merkle_root, merkle_root} - ] - } - ) - - Tracer.set_current_span(batcher_subspan_ctx) - Tracer.add_event("New batch", [{:merkle_root, merkle_root}]) - - TraceStore.store_trace(merkle_root, %{ - trace - | subspans: Map.put(trace.subspans, :batcher, batcher_subspan_ctx) - }) + Tracer.set_current_span(root_span_ctx) + # This span ends inmediately after it's created just to set the correct title to the final task. + Tracer.with_span "Task: #{merkle_root}" do + Tracer.set_attributes(%{merkle_root: merkle_root}) :ok end end @@ -203,12 +149,14 @@ defmodule TelemetryApi.Traces do :ok """ def batcher_task_uploaded_to_s3(merkle_root) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do - Tracer.add_event("Batcher Task Uploaded to S3", []) + with {:ok, _trace} <- set_current_trace(merkle_root) do + Tracer.with_span "Batcher - Task Uploaded to S3" do + Tracer.set_attributes(%{merkle_root: merkle_root}) + end + :ok end end - @doc """ Registers the sending of a batcher task to Ethereum in the task trace. @@ -221,15 +169,10 @@ defmodule TelemetryApi.Traces do :ok """ def batcher_task_sent(merkle_root, tx_hash) do - with {:ok, trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do - Tracer.add_event("Batcher Task Sent to Ethereum", [{"tx_hash", tx_hash}]) - - Tracer.end_span() - - TraceStore.store_trace(merkle_root, %{ - trace - | subspans: Map.delete(trace.subspans, :batcher) - }) + with {:ok, _trace} <- set_current_trace(merkle_root) do + Tracer.with_span "Batcher - Task Sent to Ethereum" do + Tracer.set_attribute("tx_hash", tx_hash) + end :ok end @@ -245,13 +188,12 @@ defmodule TelemetryApi.Traces do :ok """ def batcher_task_started(merkle_root, fee_per_proof, total_proofs) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do + with {:ok, _trace} <- set_current_trace(merkle_root) do IO.inspect("fee_per_proof: #{fee_per_proof}") - Tracer.add_event("Batcher Task being created", - fee_per_proof: fee_per_proof, - total_proofs: total_proofs - ) + Tracer.with_span "Batcher - Task being created" do + Tracer.set_attributes(%{fee_per_proof: fee_per_proof, total_proofs: total_proofs}) + end :ok end @@ -267,8 +209,11 @@ defmodule TelemetryApi.Traces do :ok """ def quorum_reached(merkle_root) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do - Tracer.add_event("Quorum Reached", []) + with {:ok, _trace} <- set_current_trace(merkle_root) do + Tracer.with_span "Aggregator - Quorum Reached" do + Tracer.set_attributes(%{merkle_root: merkle_root}) + end + IO.inspect("Reached quorum registered. merkle_root: #{merkle_root}") :ok end @@ -285,20 +230,19 @@ defmodule TelemetryApi.Traces do :ok """ def task_error(merkle_root, error) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do - Tracer.add_event( - "Batch verification failed", - [ - {:status, "error"}, - {:error, error} - ] - ) + with {:ok, _trace} <- set_current_trace(merkle_root) do + Tracer.with_span "Aggregator - Batch verification failed" do + Tracer.set_attributes(%{ + status: "error", + error: error + }) + end - IO.inspect("Task error registered. merkle_root: #{IO.inspect(merkle_root)}") + IO.inspect("Task error registered. merkle_root: #{inspect(merkle_root)}") :ok end end - + @doc """ Registers a bump in the gas price when the aggregator tries to respond to a task in the task trace. @@ -310,8 +254,11 @@ defmodule TelemetryApi.Traces do :ok """ def aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do - Tracer.add_event("Task gas price bumped", [{"bumped__gas_price", bumped_gas_price}]) + with {:ok, _trace} <- set_current_trace(merkle_root) do + Tracer.with_span "Aggregator - Task gas price bumped" do + Tracer.set_attributes(%{bumped_gas_price: bumped_gas_price}) + end + :ok end end @@ -327,8 +274,11 @@ defmodule TelemetryApi.Traces do :ok """ def aggregator_task_sent(merkle_root, tx_hash) do - with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do - Tracer.add_event("Task Sent to Ethereum", [{"tx_hash", tx_hash}]) + with {:ok, _trace} <- set_current_trace(merkle_root) do + Tracer.with_span "Aggregator - Task Sent to Ethereum" do + Tracer.set_attributes(%{tx_hash: tx_hash}) + end + :ok end end @@ -341,11 +291,11 @@ defmodule TelemetryApi.Traces do ## Examples iex> merkle_root = "0x1234567890abcdef" - iex> finish_task_trace(merkle_root) + iex> aggregator_finish_task(merkle_root) :ok """ - def finish_task_trace(merkle_root) do - with {:ok, trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do + def aggregator_finish_task(merkle_root) do + with {:ok, trace} <- set_current_trace(merkle_root) do missing_operators = Operators.list_operators() |> Enum.filter(fn o -> o.id not in trace.responses and Operators.is_registered?(o) end) @@ -371,7 +321,9 @@ defmodule TelemetryApi.Traces do missing_operators = missing_operators |> Enum.map(fn o -> o.name end) |> Enum.join(";") - Tracer.add_event("Missing Operators", [{:operators, missing_operators}]) + Tracer.with_span "Aggregator - Missing Operators" do + Tracer.set_attribute("operators", missing_operators) + end end defp set_current_trace(merkle_root) do @@ -382,14 +334,6 @@ defmodule TelemetryApi.Traces do end end - defp set_current_trace_with_subspan(merkle_root, span_name) do - with {:ok, trace} <- TraceStore.get_trace(merkle_root) do - Ctx.attach(trace.context) - Tracer.set_current_span(trace.subspans[span_name]) - {:ok, trace} - end - end - defp validate_operator_registration(operator) do if Operators.is_registered?(operator) do :ok diff --git a/telemetry_api/lib/telemetry_api/traces/trace.ex b/telemetry_api/lib/telemetry_api/traces/trace.ex index 9e074d2893..cabad1b41d 100644 --- a/telemetry_api/lib/telemetry_api/traces/trace.ex +++ b/telemetry_api/lib/telemetry_api/traces/trace.ex @@ -1,4 +1,4 @@ defmodule TelemetryApi.Traces.Trace do - @enforce_keys [:parent_span, :context, :total_stake, :current_stake, :responses, :subspans] - defstruct [:parent_span, :context, :total_stake, :current_stake, :responses, :subspans] + @enforce_keys [:parent_span, :context, :total_stake, :current_stake, :responses] + defstruct [:parent_span, :context, :total_stake, :current_stake, :responses] end diff --git a/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex b/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex index b9e334652a..82dae6c2a4 100644 --- a/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex +++ b/telemetry_api/lib/telemetry_api_web/controllers/trace_controller.ex @@ -6,11 +6,11 @@ defmodule TelemetryApiWeb.TraceController do action_fallback(TelemetryApiWeb.FallbackController) @doc """ - Create a trace for a NewTask with the given merkle_root + Register an aggregator init task with the given merkle_root Method: POST initTaskTrace """ - def create_task_trace(conn, %{"merkle_root" => merkle_root}) do - with :ok <- Traces.create_task_trace(merkle_root) do + def aggregator_init_task(conn, %{"merkle_root" => merkle_root}) do + with :ok <- Traces.aggregator_init_task(merkle_root) do conn |> put_status(:ok) |> render(:show_merkle, merkle_root: merkle_root) @@ -153,8 +153,8 @@ defmodule TelemetryApiWeb.TraceController do Finish a trace for the given merkle_root Method: POST finishTaskTrace """ - def finish_task_trace(conn, %{"merkle_root" => merkle_root}) do - with :ok <- Traces.finish_task_trace(merkle_root) do + def aggregator_finish_task(conn, %{"merkle_root" => merkle_root}) do + with :ok <- Traces.aggregator_finish_task(merkle_root) do conn |> put_status(:ok) |> render(:show_merkle, merkle_root: merkle_root) diff --git a/telemetry_api/lib/telemetry_api_web/router.ex b/telemetry_api/lib/telemetry_api_web/router.ex index 200e6c94f5..320453526e 100644 --- a/telemetry_api/lib/telemetry_api_web/router.ex +++ b/telemetry_api/lib/telemetry_api_web/router.ex @@ -11,13 +11,13 @@ defmodule TelemetryApiWeb.Router do get "/operators", OperatorController, :index get "/operators/:id", OperatorController, :show post "/operators", OperatorController, :create_or_update - post "/initTaskTrace", TraceController, :create_task_trace + post "/initTaskTrace", TraceController, :aggregator_init_task post "/operatorResponse", TraceController, :register_operator_response post "/quorumReached", TraceController, :quorum_reached post "/taskError", TraceController, :task_error post "/aggregatorTaskGasPriceBump", TraceController, :aggregator_task_gas_price_bumped post "/aggregatorTaskSent", TraceController, :aggregator_task_sent - post "/finishTaskTrace", TraceController, :finish_task_trace + post "/finishTaskTrace", TraceController, :aggregator_finish_task post "/initBatcherTaskTrace", TraceController, :create_batcher_task_trace post "/batcherTaskUploadedToS3", TraceController, :batcher_task_uploaded_to_s3