Skip to content

Commit a0e9956

Browse files
avilagaston9JuArceJulianVenturaJulian VenturaOppen
authored
feat(telemetry): check operators status on fetching operators list (#1186)
Co-authored-by: JuArce <[email protected]> Co-authored-by: Julian Ventura <[email protected]> Co-authored-by: Julian Ventura <[email protected]> Co-authored-by: Mario Rugiero <[email protected]> Co-authored-by: Mariano Nicolini <[email protected]>
1 parent df0b8c0 commit a0e9956

File tree

7 files changed

+120
-15
lines changed

7 files changed

+120
-15
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
defmodule TelemetryApi.ContractManagers.RegistryCoordinatorManager do
2+
alias TelemetryApi.ContractManagers.RegistryCoordinatorManager
3+
4+
require Logger
5+
6+
@aligned_config_file System.get_env("ALIGNED_CONFIG_FILE")
7+
8+
config_file_path =
9+
case @aligned_config_file do
10+
nil -> raise("ALIGNED_CONFIG_FILE not set in .env")
11+
file -> file
12+
end
13+
14+
{status, config_json_string} = File.read(config_file_path)
15+
16+
case status do
17+
:ok ->
18+
Logger.debug("Aligned deployment file read successfully")
19+
20+
:error ->
21+
raise("Config file not read successfully")
22+
end
23+
24+
@registry_coordinator_address Jason.decode!(config_json_string)
25+
|> Map.get("addresses")
26+
|> Map.get("registryCoordinator")
27+
28+
use Ethers.Contract,
29+
abi_file: "priv/abi/IRegistryCoordinator.json",
30+
default_address: @registry_coordinator_address
31+
32+
def get_registry_coordinator_address() do
33+
@registry_coordinator_address
34+
end
35+
36+
def fetch_operator_status(operator_address) do
37+
RegistryCoordinatorManager.get_operator_status(operator_address)
38+
|> Ethers.call()
39+
end
40+
end

telemetry_api/lib/telemetry_api/operators.ex

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ defmodule TelemetryApi.Operators do
2828
2929
## Examples
3030
31-
iex> get_operator(%Operator{id: some_id})
31+
iex> get_operator(%{id: some_id})
3232
{:ok, %Operator{}}
3333
34-
iex> get_operator(%Operator{address: some_address})
34+
iex> get_operator(%{address: some_address})
3535
{:ok, %Operator{}}
3636
37-
iex> get_operator(%Operator{address: non_existent_address})
37+
iex> get_operator(%{address: non_existent_address})
3838
{:error, :not_found, "Operator not found for address: non_existent_address"}
3939
"""
4040
def get_operator(%{address: address}) do
@@ -188,4 +188,20 @@ defmodule TelemetryApi.Operators do
188188
def change_operator(%Operator{} = operator, attrs \\ %{}) do
189189
Operator.changeset(operator, attrs)
190190
end
191+
192+
@doc """
193+
Checks if an operator is registered.
194+
195+
## Examples
196+
197+
iex> is_registered?(%Operator{status: "REGISTERED"})
198+
true
199+
200+
iex> is_registered?(%Operator{status: "DEREGISTERED"})
201+
false
202+
203+
"""
204+
def is_registered?(operator) do
205+
operator.status == "REGISTERED"
206+
end
191207
end

telemetry_api/lib/telemetry_api/operators/operator.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@ defmodule TelemetryApi.Operators.Operator do
88
field :stake, :string
99
field :name, :string
1010
field :version, :string
11+
field :status, :string
1112

1213
timestamps(type: :utc_datetime)
1314
end
1415

1516
@doc false
1617
def changeset(operator, attrs) do
1718
operator
18-
|> cast(attrs, [:address, :id, :stake, :name, :version])
19+
|> cast(attrs, [:address, :id, :stake, :name, :version, :status])
1920
|> validate_required([:address, :id, :name, :stake])
2021
end
2122
end

telemetry_api/lib/telemetry_api/periodic/operator_fetcher.ex

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
defmodule TelemetryApi.Periodic.OperatorFetcher do
22
use GenServer
33
alias TelemetryApi.Operators
4+
alias TelemetryApi.ContractManagers.RegistryCoordinatorManager
5+
require Logger
6+
7+
@never_registered 0
8+
@registered 1
9+
@deregistered 2
410

511
wait_time_str = System.get_env("OPERATOR_FETCHER_WAIT_TIME_MS") ||
612
raise """
@@ -24,14 +30,37 @@ defmodule TelemetryApi.Periodic.OperatorFetcher do
2430
end
2531

2632
def send_work() do
27-
:timer.send_interval(@wait_time_ms, :fetch_operators)
33+
:timer.send_interval(@wait_time_ms, :poll_service)
34+
end
35+
36+
def handle_info(:poll_service, state) do
37+
fetch_operators_info()
38+
fetch_operators_status()
39+
{:noreply, state}
40+
end
41+
42+
defp fetch_operators_info() do
43+
case Operators.fetch_all_operators() do
44+
{:ok, _} -> :ok
45+
{:error, message} -> IO.inspect("Couldn't fetch operators: #{IO.inspect(message)}")
46+
end
2847
end
2948

30-
def handle_info(:fetch_operators, _state) do
31-
case Operators.fetch_all_operators() do
32-
{:ok, _} -> :ok
33-
{:error, message} -> IO.inspect "Couldn't fetch operators: #{IO.inspect message}"
34-
end
35-
{:noreply, %{}}
49+
defp fetch_operators_status() do
50+
Operators.list_operators()
51+
|> Enum.map(fn op ->
52+
case RegistryCoordinatorManager.fetch_operator_status(op.address) do
53+
{:ok, status} ->
54+
Operators.update_operator(op, %{status: string_status(status)})
55+
56+
error ->
57+
Logger.error("Error when updating status: #{error}")
58+
end
59+
end)
60+
:ok
3661
end
62+
63+
defp string_status(@never_registered), do: "NEVER_REGISTERED"
64+
defp string_status(@registered), do: "REGISTERED"
65+
defp string_status(@deregistered), do: "DEREGISTERED"
3766
end

telemetry_api/lib/telemetry_api/traces.ex

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ defmodule TelemetryApi.Traces do
6363
"""
6464
def register_operator_response(merkle_root, operator_id) do
6565
with {:ok, operator} <- Operators.get_operator(%{id: operator_id}),
66+
:ok <- validate_operator_registration(operator),
6667
{:ok, trace} <- set_current_trace(merkle_root) do
6768
operator_stake = Decimal.new(operator.stake)
6869
new_stake = Decimal.add(trace.current_stake, operator_stake)
@@ -111,7 +112,7 @@ defmodule TelemetryApi.Traces do
111112
def quorum_reached(merkle_root) do
112113
with {:ok, _trace} <- set_current_trace(merkle_root) do
113114
Tracer.add_event("Quorum Reached", [])
114-
IO.inspect("Reached quorum registered. merkle_root: #{IO.inspect(merkle_root)}")
115+
IO.inspect("Reached quorum registered. merkle_root: #{merkle_root}")
115116
:ok
116117
end
117118
end
@@ -155,7 +156,7 @@ defmodule TelemetryApi.Traces do
155156
def finish_task_trace(merkle_root) do
156157
with {:ok, trace} <- set_current_trace(merkle_root) do
157158
missing_operators =
158-
Operators.list_operators() |> Enum.filter(fn o -> o.id not in trace.responses end)
159+
Operators.list_operators() |> Enum.filter(fn o -> o.id not in trace.responses and Operators.is_registered?(o) end)
159160

160161
add_missing_operators(missing_operators)
161162

@@ -165,7 +166,7 @@ defmodule TelemetryApi.Traces do
165166

166167
# Clean up the context from the Agent
167168
TraceStore.delete_trace(merkle_root)
168-
IO.inspect("Finished task trace with merkle_root: #{IO.inspect(merkle_root)}.")
169+
IO.inspect("Finished task trace with merkle_root: #{merkle_root}.")
169170
:ok
170171
end
171172
end
@@ -186,4 +187,12 @@ defmodule TelemetryApi.Traces do
186187
{:ok, trace}
187188
end
188189
end
190+
191+
defp validate_operator_registration(operator) do
192+
if Operators.is_registered?(operator) do
193+
:ok
194+
else
195+
{:error, :bad_request, "Operator not registered"}
196+
end
197+
end
189198
end

telemetry_api/lib/telemetry_api_web/controllers/operator_json.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ defmodule TelemetryApiWeb.OperatorJSON do
2121
id: operator.id,
2222
stake: operator.stake,
2323
name: operator.name,
24-
version: operator.version
24+
version: operator.version,
25+
status: operator.status
2526
}
2627
end
2728
end
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
defmodule TelemetryApi.Repo.Migrations.AddOperatorStatus do
2+
use Ecto.Migration
3+
4+
def change do
5+
alter table(:operators) do
6+
add :status, :string
7+
end
8+
end
9+
end

0 commit comments

Comments
 (0)