diff --git a/config/config.exs b/config/config.exs index d109b7f05..9478220ac 100644 --- a/config/config.exs +++ b/config/config.exs @@ -52,3 +52,13 @@ if System.get_env("RUSTLER_SKIP_COMPILE") do config :lambda_ethereum_consensus, Snappy, skip_compilation?: true config :lambda_ethereum_consensus, Ssz, skip_compilation?: true end + +config :sse, + keep_alive: {:system, "SSE_KEEP_ALIVE_IN_MS", 55000} + +config :event_bus, + topics: [:finalized_checkpoint, :block] + +config :mime, :types, %{ + "text/event-stream" => ["sse"] +} diff --git a/config/runtime.exs b/config/runtime.exs index 66852bc25..58e242c5e 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -150,7 +150,9 @@ config :lambda_ethereum_consensus, EngineApi, # Beacon API config :lambda_ethereum_consensus, BeaconApi.Endpoint, server: enable_beacon_api, - http: [port: beacon_api_port || 4000], + # We use an infinit idle timeout to avoid closing sse connections, if needed we can + # create a separate endpoint for them. + http: [port: beacon_api_port || 4000, protocol_options: [idle_timeout: :infinity]], url: [host: "localhost"], render_errors: [ formats: [json: BeaconApi.ErrorJSON], diff --git a/lib/beacon_api/controllers/v1/events_controller.ex b/lib/beacon_api/controllers/v1/events_controller.ex new file mode 100644 index 000000000..c52ea5154 --- /dev/null +++ b/lib/beacon_api/controllers/v1/events_controller.ex @@ -0,0 +1,55 @@ +defmodule BeaconApi.V1.EventsController do + use BeaconApi, :controller + + alias BeaconApi.ApiSpec + alias BeaconApi.EventPubSub + + require Logger + + def open_api_operation(:subscribe), + do: ApiSpec.spec().paths["/eth/v1/events"].get + + @spec subscribe(Plug.Conn.t(), any) :: Plug.Conn.t() + def subscribe(conn, %{"topics" => topics}) do + case parse_topics(topics) do + {:ok, topics} -> + EventPubSub.sse_subscribe(conn, topics) + + {:error, error} -> + send_chunked_error(conn, error) + end + end + + def subscribe(conn, _params) do + error = + Jason.encode!(%{ + code: 400, + message: "Missing field topics" + }) + + send_chunked_error(conn, error) + end + + defp parse_topics(topics_string) do + # topics is a string list in the form of: "finalized_checkpoint, block" we need to split it + topics = topics_string |> String.split(",") |> Enum.map(&String.trim/1) + not_implemented_topics = Enum.reject(topics, &EventPubSub.implemented_topic?/1) + + if Enum.empty?(not_implemented_topics) do + {:ok, topics} + else + {:error, + "Invalid topic/s #{inspect(not_implemented_topics)}. For now, only #{inspect(EventPubSub.implemented_topics())} are supported."} + end + end + + defp send_chunked_error(conn, error) do + conn + |> Plug.Conn.send_chunked(400) + |> Plug.Conn.chunk(error) + |> case do + {:ok, conn} -> Plug.Conn.halt(conn) + {:error, _reason} -> Plug.Conn.halt(conn) + end + end +end diff --git a/lib/beacon_api/event_pubsub.ex b/lib/beacon_api/event_pubsub.ex new file mode 100644 index 000000000..8f26c1875 --- /dev/null +++ b/lib/beacon_api/event_pubsub.ex @@ -0,0 +1,84 @@ +defmodule BeaconApi.EventPubSub do + @moduledoc """ + Event listener for aggregating and sending events for SSE subscribers. + + TODO: (#1368) This depends on `event_bus` and `sse`, but it could be easily switched later: + - `event_bus` we could move to phoenix pubsub + - `sse` we could just implement it ourselves using Plug.Conn.chunk and Plug.Conn.send_chunked + + The idea is to have a single place to publish events, and then a method for a connection to subscribe to them. + """ + + require Logger + alias EventBus.Model.Event + alias LambdaEthereumConsensus.Store + alias SSE.Chunk + alias Types.StateInfo + + @type topic() :: String.t() | atom() + @type topics() :: list(topic()) + @type event_data() :: any() + + # This is also dependant on the already needed event_bus compile time config, + # we maintain them as strings for convienience + @implemented_topics Application.compile_env!(:event_bus, :topics) |> Enum.map(&Atom.to_string/1) + + @spec implemented_topics() :: topics() + def implemented_topics(), do: @implemented_topics + + @spec implemented_topic?(topic()) :: boolean() + def implemented_topic?(topic) when is_atom(topic), do: implemented_topic?(Atom.to_string(topic)) + def implemented_topic?(topic) when is_binary(topic), do: topic in @implemented_topics + + @doc """ + Publish an event to the event bus. + + TODO: We might want a noop if there are no subscribers for a topic. + """ + @spec publish(topic(), event_data()) :: :ok | {:error, atom()} + def publish(:finalized_checkpoint = topic, %{root: block_root, epoch: epoch}) do + case Store.BlockStates.get_state_info(block_root) do + %StateInfo{root: state_root} -> + data = %{ + block: BeaconApi.Utils.hex_encode(block_root), + state: BeaconApi.Utils.hex_encode(state_root), + epoch: Integer.to_string(epoch), + # TODO: this is a placeholder, we need to get if the execution is optimistic or not + execution_optimistic: false + } + + chunk = %Chunk{event: topic, data: [Jason.encode!(data)]} + event = %Event{id: UUID.uuid4(), data: chunk, topic: topic} + + EventBus.notify(event) + + nil -> + Logger.error("State not available for block", root: block_root) + + {:error, :state_not_available} + end + end + + def publish(:block = topic, %{root: block_root, slot: slot}) do + data = %{ + block: BeaconApi.Utils.hex_encode(block_root), + slot: Integer.to_string(slot), + # TODO: this is a placeholder, we need to get if the execution is optimistic or not + execution_optimistic: false + } + + chunk = %Chunk{event: topic, data: [Jason.encode!(data)]} + event = %Event{id: UUID.uuid4(), data: chunk, topic: topic} + + EventBus.notify(event) + end + + def publish(_topic, _event_data), do: {:error, :unsupported_topic} + + @doc """ + Subscribe to a topic for stream events in an sse connection. + """ + @spec sse_subscribe(Plug.Conn.t(), topics()) :: Plug.Conn.t() + def sse_subscribe(conn, topics) when is_list(topics), + do: SSE.stream(conn, {topics, %Chunk{data: []}}) +end diff --git a/lib/beacon_api/router.ex b/lib/beacon_api/router.ex index df36dda97..526c0d7da 100644 --- a/lib/beacon_api/router.ex +++ b/lib/beacon_api/router.ex @@ -2,7 +2,7 @@ defmodule BeaconApi.Router do use BeaconApi, :router pipeline :api do - plug(:accepts, ["json"]) + plug(:accepts, ["json", "sse"]) plug(OpenApiSpex.Plug.PutApiSpec, module: BeaconApi.ApiSpec) end @@ -22,6 +22,10 @@ defmodule BeaconApi.Router do get("/identity", NodeController, :identity) get("/version", NodeController, :version) end + + scope "/events" do + get("/", EventsController, :subscribe) + end end # Ethereum API Version 2 diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 02384db26..d473b37b6 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do """ require Logger + alias BeaconApi.EventPubSub alias LambdaEthereumConsensus.Execution.ExecutionChain alias LambdaEthereumConsensus.ForkChoice.Handlers alias LambdaEthereumConsensus.ForkChoice.Head @@ -65,6 +66,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do |> tap(fn store -> StoreDb.persist_store(store) Logger.info("[Fork choice] Added new block", slot: slot, root: block_root) + EventPubSub.publish(:block, %{root: block_root, slot: slot}) Logger.info("[Fork choice] Recomputed head", slot: store.head_slot, diff --git a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex index 6384ed48a..df12d606f 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex @@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do """ require Logger + alias BeaconApi.EventPubSub alias LambdaEthereumConsensus.Execution.ExecutionClient alias LambdaEthereumConsensus.ForkChoice alias LambdaEthereumConsensus.StateTransition @@ -281,7 +282,11 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do |> if_then_update( finalized_checkpoint.epoch > store.finalized_checkpoint.epoch, # Update finalized checkpoint - &%Store{&1 | finalized_checkpoint: finalized_checkpoint} + fn store -> + EventPubSub.publish(:finalized_checkpoint, finalized_checkpoint) + + %Store{store | finalized_checkpoint: finalized_checkpoint} + end ) end diff --git a/mix.exs b/mix.exs index d993bbed9..30efc3fbf 100644 --- a/mix.exs +++ b/mix.exs @@ -64,7 +64,10 @@ defmodule LambdaEthereumConsensus.MixProject do {:sentry, "~> 10.8.0"}, {:prom_ex, "~> 1.11.0"}, {:flama, git: "https://github.com/lambdaclass/ht1223_tracer"}, - {:uuid, "~> 1.1"} + {:uuid, "~> 1.1"}, + # TODO: (#1368) We might want to use phoenix_pubsub instead and do our implementation of SSE. + {:sse, "~> 0.4"}, + {:event_bus, ">= 1.6.0"} ] end diff --git a/mix.lock b/mix.lock index de3f807d8..403fa727f 100644 --- a/mix.lock +++ b/mix.lock @@ -19,6 +19,7 @@ "elixir_make": {:hex, :elixir_make, "0.8.4", "4960a03ce79081dee8fe119d80ad372c4e7badb84c493cc75983f9d3bc8bde0f", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.0", [hex: :certifi, repo: "hexpm", optional: true]}], "hexpm", "6e7f1d619b5f61dfabd0a20aa268e575572b542ac31723293a4c1a567d5ef040"}, "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, "escape": {:hex, :escape, "0.1.0", "548edab75e6e6938b1e199ef59cb8e504bcfd3bcf83471d4ae9a3c7a7a3c7d45", [:mix], [], "hexpm", "a5d8e92db4677155df54bc1306d401b5233875d570d474201db03cb3047491cd"}, + "event_bus": {:hex, :event_bus, "1.7.0", "29a36fc09e8c4463c82206b6a300fa1d61cf4baf9a7b4e7cf0c3efb99c73998e", [:mix], [], "hexpm", "e556470f49f53060a0696c4bad81341252685011afc69eda25032c8a3a86eb2e"}, "ex2ms": {:hex, :ex2ms, "1.7.0", "45b9f523d0b777667ded60070d82d871a37e294f0b6c5b8eca86771f00f82ee1", [:mix], [], "hexpm", "2589eee51f81f1b1caa6d08c990b1ad409215fe6f64c73f73c67d36ed10be827"}, "exleveldb": {:hex, :exleveldb, "0.14.0", "8e9353bbce38482d6971d254c6b98ceb50f3f179c94732b5d17db1be426fca18", [:mix], [{:eleveldb, "~> 2.2.20", [hex: :eleveldb, repo: "hexpm", optional: false]}], "hexpm", "803cd3b4c826a1e17e7e28f6afe224837a743b475e1a48336f186af3dd8636ad"}, "expo": {:hex, :expo, "0.5.2", "beba786aab8e3c5431813d7a44b828e7b922bfa431d6bfbada0904535342efe2", [:mix], [], "hexpm", "8c9bfa06ca017c9cb4020fabe980bc7fdb1aaec059fd004c2ab3bff03b1c599c"}, @@ -68,6 +69,7 @@ "sentry": {:hex, :sentry, "10.8.1", "aa45309785e1521416225adb16e0b4d8b957578804527f3c7babb6fefbc5e456", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_ownership, "~> 0.3.0 or ~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.6", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, "~> 0.20 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "495b3cdadad90ba72eef973aa3dec39b3b8b2a362fe87e2f4ef32133ac3b4097"}, "snappyer": {:hex, :snappyer, "1.2.10", "023e9ae00e969b0997208b5de7d3b12bb46ec6bc5411e8dc53e7b3f435b8f0fd", [:rebar3], [], "hexpm", "f55bd9ed147e7163cb3acd1e431a7ff2c9e31ceacbb8308786094fb64551c284"}, "sourceror": {:hex, :sourceror, "1.5.0", "3e65d5fbb1a8e2864ad6411262c8018fee73474f5789dda12285c82999253d5d", [:mix], [], "hexpm", "4a32b5d189d8453f73278c15712f8731b89e9211e50726b798214b303b51bfc7"}, + "sse": {:hex, :sse, "0.4.0", "f17affacbc4618bac07590eec7bff849aa27d1f71bb3d41da3fd3cb255d16910", [:mix], [{:event_bus, ">= 1.6.0", [hex: :event_bus, repo: "hexpm", optional: false]}, {:plug, ">= 1.4.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "2dfb9923725b9d5292763c3de9b7798713f5771522823e961a250204917d7efb"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"}, "stream_data": {:hex, :stream_data, "1.1.2", "05499eaec0443349ff877aaabc6e194e82bda6799b9ce6aaa1aadac15a9fdb4d", [:mix], [], "hexpm", "129558d2c77cbc1eb2f4747acbbea79e181a5da51108457000020a906813a1a9"}, diff --git a/network_params.yaml b/network_params.yaml index 76f0325e1..13618b8b9 100644 --- a/network_params.yaml +++ b/network_params.yaml @@ -1,9 +1,12 @@ participants: - el_type: geth + el_image: ethereum/client-go:v1.14.12 cl_type: lighthouse + cl_image: sigp/lighthouse:v5.3.0 count: 2 validator_count: 32 - el_type: geth + el_image: ethereum/client-go:v1.14.12 cl_type: lambda cl_image: lambda_ethereum_consensus:latest use_separate_vc: false diff --git a/test/unit/beacon_api/beacon_api_v1_test.exs b/test/unit/beacon_api/beacon_api_v1_test.exs index 3a9fbfe50..75e4323e5 100644 --- a/test/unit/beacon_api/beacon_api_v1_test.exs +++ b/test/unit/beacon_api/beacon_api_v1_test.exs @@ -158,6 +158,8 @@ defmodule Unit.BeaconApiTest.V1 do test "node identity" do alias LambdaEthereumConsensus.Libp2pPort alias LambdaEthereumConsensus.P2P.Metadata + + patch(BeaconApi.EventPubSub, :publish, fn _, _ -> :ok end) patch(ForkChoice, :get_fork_version, fn -> ChainSpec.get("DENEB_FORK_VERSION") end) start_link_supervised!({Libp2pPort, genesis_time: :os.system_time(:second), store: %Store{}}) diff --git a/test/unit/fork_choice/handlers_test.exs b/test/unit/fork_choice/handlers_test.exs index 19de64872..c1dfdc68b 100644 --- a/test/unit/fork_choice/handlers_test.exs +++ b/test/unit/fork_choice/handlers_test.exs @@ -1,6 +1,8 @@ defmodule Unit.ForkChoice.HandlersTest do use ExUnit.Case + use Patch + alias LambdaEthereumConsensus.ForkChoice.Handlers alias LambdaEthereumConsensus.Utils.Diff alias Types.Store @@ -44,6 +46,8 @@ defmodule Unit.ForkChoice.HandlersTest do end test "upgrades unrealized checkpoints" do + patch(BeaconApi.EventPubSub, :publish, fn _, _ -> :ok end) + start_time = 0 end_time = start_time + ChainSpec.get("SECONDS_PER_SLOT") * ChainSpec.get("SLOTS_PER_EPOCH")