Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions telemetry_api/lib/telemetry_api/periodically.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@ defmodule TelemetryApi.Periodically do
EthereumMetrics.new_gas_price(gas_price)

{:error, error} ->
IO.inspect("Error fetching gas price: #{error}")
Logger.error("Error fetching gas price: #{inspect(error)}")
end
{:noreply, %{}}

{:noreply, %{}}
end

defp fetch_operators_info() do
case Operators.fetch_all_operators() do
:ok -> :ok
{:error, message} -> IO.inspect("Couldn't fetch operators: #{IO.inspect(message)}")
{:error, message} -> Logger.error("Couldn't fetch operators: #{inspect(message)}")
end
end

Expand All @@ -67,9 +69,10 @@ defmodule TelemetryApi.Periodically do
Operators.update_operator(op, %{status: string_status(status)})

error ->
Logger.error("Error when updating status: #{error}")
Logger.error("Error when updating status: #{inspect(error)}")
end
end)

:ok
end

Expand Down
214 changes: 79 additions & 135 deletions telemetry_api/lib/telemetry_api/traces.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,31 @@ defmodule TelemetryApi.Traces do
@moduledoc """
The Traces context.
"""
require OpenTelemetry.Tracer
alias TelemetryApi.Traces.Trace
alias TelemetryApi.Operators
alias TelemetryApi.ContractManagers.StakeRegistry

require OpenTelemetry.Tracer
require OpenTelemetry.Ctx
alias OpenTelemetry.Tracer, as: Tracer
alias OpenTelemetry.Ctx, as: Ctx
alias OpenTelemetry.Tracer
alias OpenTelemetry.Ctx

@doc """
Send the trace to OpenTelemetry

This function is responsible for creating a new span and storing the context in the Agent.
Registers an aggregator new task initialization.

## Examples

iex> merkle_root = "0x1234567890abcdef"
iex> create_task_trace(merkle_root)
iex> aggregator_init_task(merkle_root)
:ok
"""
def create_task_trace(merkle_root) do
with {:ok, trace} <- set_current_trace(merkle_root) do
with {:ok, total_stake} <- StakeRegistry.get_current_total_stake() do
aggregator_subspan_ctx =
Tracer.start_span(
"Aggregator",
%{
attributes: [
{:merkle_root, merkle_root},
{:total_stake, total_stake}
]
}
)

Tracer.set_current_span(aggregator_subspan_ctx)
Tracer.add_event("New task event received", [])

TraceStore.store_trace(merkle_root, %{
trace
| subspans: Map.put(trace.subspans, :aggregator, aggregator_subspan_ctx)
})

:ok
def aggregator_init_task(merkle_root) do
with {:ok, _trace} <- set_current_trace(merkle_root),
{:ok, total_stake} <- StakeRegistry.get_current_total_stake() do
Tracer.with_span "Aggregator - New task event received" do
Tracer.set_attributes(%{total_stake: total_stake})
end

:ok
end
end

Expand All @@ -62,25 +43,25 @@ defmodule TelemetryApi.Traces do
def register_operator_response(merkle_root, operator_id) do
with {:ok, operator} <- Operators.get_operator(%{id: operator_id}),
:ok <- validate_operator_registration(operator),
{:ok, trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do
{:ok, trace} <- set_current_trace(merkle_root) do
operator_stake = Decimal.new(operator.stake)
new_stake = Decimal.add(trace.current_stake, operator_stake)
new_stake_fraction = Decimal.div(new_stake, trace.total_stake)
operator_stake_fraction = Decimal.div(operator_stake, trace.total_stake)

Tracer.add_event(
"Operator Response: " <> operator.name,
[
{:merkle_root, merkle_root},
{:operator_id, operator_id},
{:name, operator.name},
{:address, operator.address},
{:operator_stake, Decimal.to_string(operator_stake)},
{:current_stake, Decimal.to_string(new_stake)},
{:current_stake_fraction, Decimal.to_string(new_stake_fraction)},
{:operator_stake_fraction, Decimal.to_string(operator_stake_fraction)}
]
)
Tracer.with_span "Aggregator - Operator Response: " <>
operator.name do
Tracer.set_attributes(%{
merkle_root: merkle_root,
operator_id: operator_id,
name: operator.name,
address: operator.address,
operator_stake: Decimal.to_string(operator_stake),
current_stake: Decimal.to_string(new_stake),
current_stake_fraction: Decimal.to_string(new_stake_fraction),
operator_stake_fraction: Decimal.to_string(operator_stake_fraction)
})
end

responses = trace.responses ++ [operator_id]

Expand All @@ -91,7 +72,7 @@ defmodule TelemetryApi.Traces do
})

IO.inspect(
"Operator response included. merkle_root: #{IO.inspect(merkle_root)} operator_id: #{IO.inspect(operator_id)}"
"Operator response included. merkle_root: #{inspect(merkle_root)} operator_id: #{inspect(operator_id)}"
)

:ok
Expand All @@ -109,32 +90,17 @@ defmodule TelemetryApi.Traces do
:ok
"""
def batcher_task_creation_failed(merkle_root, error) do
with {:ok, trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do
Tracer.add_event(
"Batcher Task Creation Failed",
[
{:error, error}
]
)

Tracer.end_span()

TraceStore.store_trace(merkle_root, %{
trace
| subspans: Map.delete(trace.subspans, :batcher)
})

with {:ok, _trace} <- set_current_trace(merkle_root) do
Tracer.end_span()
TraceStore.delete_trace(merkle_root)
with {:ok, _trace} <- set_current_trace(merkle_root) do
Tracer.with_span "Batcher - Task Creation Failed" do
Tracer.set_attributes(%{error: error})
end

:ok
end
end

@doc """
Create a new task trace for the batcher and starts the subspan for the batcher.
Create a new task trace from the batcher.

## Examples

Expand All @@ -161,34 +127,14 @@ defmodule TelemetryApi.Traces do
context: ctx,
total_stake: total_stake,
current_stake: 0,
responses: [],
subspans: %{}
responses: []
})

with {:ok, trace} <- set_current_trace(merkle_root) do
# This span ends inmediately after it's created just to set the correct title to the final task.
Tracer.with_span "Task: #{merkle_root}" do
Tracer.set_attributes(%{merkle_root: merkle_root})
end

batcher_subspan_ctx =
Tracer.start_span(
"Batcher",
%{
attributes: [
{:merkle_root, merkle_root}
]
}
)

Tracer.set_current_span(batcher_subspan_ctx)
Tracer.add_event("New batch", [{:merkle_root, merkle_root}])

TraceStore.store_trace(merkle_root, %{
trace
| subspans: Map.put(trace.subspans, :batcher, batcher_subspan_ctx)
})
Tracer.set_current_span(root_span_ctx)

# This span ends inmediately after it's created just to set the correct title to the final task.
Tracer.with_span "Task: #{merkle_root}" do
Tracer.set_attributes(%{merkle_root: merkle_root})
:ok
end
end
Expand All @@ -203,12 +149,14 @@ defmodule TelemetryApi.Traces do
:ok
"""
def batcher_task_uploaded_to_s3(merkle_root) do
with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do
Tracer.add_event("Batcher Task Uploaded to S3", [])
with {:ok, _trace} <- set_current_trace(merkle_root) do
Tracer.with_span "Batcher - Task Uploaded to S3" do
Tracer.set_attributes(%{merkle_root: merkle_root})
end

:ok
end
end


@doc """
Registers the sending of a batcher task to Ethereum in the task trace.
Expand All @@ -221,15 +169,10 @@ defmodule TelemetryApi.Traces do
:ok
"""
def batcher_task_sent(merkle_root, tx_hash) do
with {:ok, trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do
Tracer.add_event("Batcher Task Sent to Ethereum", [{"tx_hash", tx_hash}])

Tracer.end_span()

TraceStore.store_trace(merkle_root, %{
trace
| subspans: Map.delete(trace.subspans, :batcher)
})
with {:ok, _trace} <- set_current_trace(merkle_root) do
Tracer.with_span "Batcher - Task Sent to Ethereum" do
Tracer.set_attribute("tx_hash", tx_hash)
end

:ok
end
Expand All @@ -245,13 +188,12 @@ defmodule TelemetryApi.Traces do
:ok
"""
def batcher_task_started(merkle_root, fee_per_proof, total_proofs) do
with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :batcher) do
with {:ok, _trace} <- set_current_trace(merkle_root) do
IO.inspect("fee_per_proof: #{fee_per_proof}")

Tracer.add_event("Batcher Task being created",
fee_per_proof: fee_per_proof,
total_proofs: total_proofs
)
Tracer.with_span "Batcher - Task being created" do
Tracer.set_attributes(%{fee_per_proof: fee_per_proof, total_proofs: total_proofs})
end

:ok
end
Expand All @@ -267,8 +209,11 @@ defmodule TelemetryApi.Traces do
:ok
"""
def quorum_reached(merkle_root) do
with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do
Tracer.add_event("Quorum Reached", [])
with {:ok, _trace} <- set_current_trace(merkle_root) do
Tracer.with_span "Aggregator - Quorum Reached" do
Tracer.set_attributes(%{merkle_root: merkle_root})
end

IO.inspect("Reached quorum registered. merkle_root: #{merkle_root}")
:ok
end
Expand All @@ -285,20 +230,19 @@ defmodule TelemetryApi.Traces do
:ok
"""
def task_error(merkle_root, error) do
with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do
Tracer.add_event(
"Batch verification failed",
[
{:status, "error"},
{:error, error}
]
)
with {:ok, _trace} <- set_current_trace(merkle_root) do
Tracer.with_span "Aggregator - Batch verification failed" do
Tracer.set_attributes(%{
status: "error",
error: error
})
end

IO.inspect("Task error registered. merkle_root: #{IO.inspect(merkle_root)}")
IO.inspect("Task error registered. merkle_root: #{inspect(merkle_root)}")
:ok
end
end

@doc """
Registers a bump in the gas price when the aggregator tries to respond to a task in the task trace.

Expand All @@ -310,8 +254,11 @@ defmodule TelemetryApi.Traces do
:ok
"""
def aggregator_task_gas_price_bumped(merkle_root, bumped_gas_price) do
with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do
Tracer.add_event("Task gas price bumped", [{"bumped__gas_price", bumped_gas_price}])
with {:ok, _trace} <- set_current_trace(merkle_root) do
Tracer.with_span "Aggregator - Task gas price bumped" do
Tracer.set_attributes(%{bumped_gas_price: bumped_gas_price})
end

:ok
end
end
Expand All @@ -327,8 +274,11 @@ defmodule TelemetryApi.Traces do
:ok
"""
def aggregator_task_sent(merkle_root, tx_hash) do
with {:ok, _trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do
Tracer.add_event("Task Sent to Ethereum", [{"tx_hash", tx_hash}])
with {:ok, _trace} <- set_current_trace(merkle_root) do
Tracer.with_span "Aggregator - Task Sent to Ethereum" do
Tracer.set_attributes(%{tx_hash: tx_hash})
end

:ok
end
end
Expand All @@ -341,11 +291,11 @@ defmodule TelemetryApi.Traces do
## Examples

iex> merkle_root = "0x1234567890abcdef"
iex> finish_task_trace(merkle_root)
iex> aggregator_finish_task(merkle_root)
:ok
"""
def finish_task_trace(merkle_root) do
with {:ok, trace} <- set_current_trace_with_subspan(merkle_root, :aggregator) do
def aggregator_finish_task(merkle_root) do
with {:ok, trace} <- set_current_trace(merkle_root) do
missing_operators =
Operators.list_operators()
|> Enum.filter(fn o -> o.id not in trace.responses and Operators.is_registered?(o) end)
Expand All @@ -371,7 +321,9 @@ defmodule TelemetryApi.Traces do
missing_operators =
missing_operators |> Enum.map(fn o -> o.name end) |> Enum.join(";")

Tracer.add_event("Missing Operators", [{:operators, missing_operators}])
Tracer.with_span "Aggregator - Missing Operators" do
Tracer.set_attribute("operators", missing_operators)
end
end

defp set_current_trace(merkle_root) do
Expand All @@ -382,14 +334,6 @@ defmodule TelemetryApi.Traces do
end
end

defp set_current_trace_with_subspan(merkle_root, span_name) do
with {:ok, trace} <- TraceStore.get_trace(merkle_root) do
Ctx.attach(trace.context)
Tracer.set_current_span(trace.subspans[span_name])
{:ok, trace}
end
end

defp validate_operator_registration(operator) do
if Operators.is_registered?(operator) do
:ok
Expand Down
4 changes: 2 additions & 2 deletions telemetry_api/lib/telemetry_api/traces/trace.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule TelemetryApi.Traces.Trace do
@enforce_keys [:parent_span, :context, :total_stake, :current_stake, :responses, :subspans]
defstruct [:parent_span, :context, :total_stake, :current_stake, :responses, :subspans]
@enforce_keys [:parent_span, :context, :total_stake, :current_stake, :responses]
defstruct [:parent_span, :context, :total_stake, :current_stake, :responses]
end
Loading