diff --git a/.tool-versions b/.tool-versions index ec2ffeec7..73dd5a9c6 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,5 +1,5 @@ erlang 26.2 elixir 1.16.2-otp-26 -golang 1.21.3 +golang 1.22 rust 1.81.0 protoc 24.3 diff --git a/Dockerfile b/Dockerfile index 1efd25f54..603376fd5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # libp2p port -FROM golang:1.21.3 AS libp2p_builder +FROM golang:1.22 AS libp2p_builder LABEL stage=builder # Install dependencies diff --git a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex index a39c85a38..bd5ba0e15 100644 --- a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex @@ -10,7 +10,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do alias LambdaEthereumConsensus.Metrics alias LambdaEthereumConsensus.P2P.BlobDownloader alias LambdaEthereumConsensus.P2P.BlockDownloader - alias LambdaEthereumConsensus.Store.BlobDb + alias LambdaEthereumConsensus.Store.Blobs alias LambdaEthereumConsensus.Store.Blocks alias LambdaEthereumConsensus.Utils alias Types.BlockInfo @@ -46,7 +46,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do # If the block is new or was to be downloaded, we store it. if is_nil(loaded_block) or loaded_block.status == :download do - missing_blobs = missing_blobs(block_info) + missing_blobs = Blobs.missing_for_block(block_info) if Enum.empty?(missing_blobs) do Logger.debug("[PendingBlocks] No missing blobs for block, process it", log_md) @@ -54,7 +54,12 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do process_block_and_check_children(store, block_info) else Logger.debug("[PendingBlocks] Missing blobs for block, scheduling download", log_md) - BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/2, @download_retries) + + BlobDownloader.request_blobs_by_root( + missing_blobs, + &process_blobs/2, + @download_retries + ) block_info |> BlockInfo.change_status(:download_blobs) @@ -72,6 +77,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do module after receiving a new block, but there are some other cases like at node startup, as there may be pending blocks from prior executions. """ + @spec process_blocks(Store.t()) :: Store.t() def process_blocks(store) do case Blocks.get_blocks_with_status(:pending) do {:ok, blocks} -> @@ -92,6 +98,34 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do end end + @doc """ + Process incoming blobs if the block can be processed does so immediately. + """ + @spec process_blobs(Store.t(), {:ok, [Types.BlobSidecar.t()]}) :: {:ok, Store.t()} + def process_blobs(store, {:ok, blobs}) do + blobs + |> Blobs.add_blobs() + |> Enum.reduce(store, fn root, store -> + with %BlockInfo{status: :download_blobs} = block_info <- Blocks.get_block_info(root), + [] <- Blobs.missing_for_block(block_info) do + block_info + |> Blocks.change_status(:pending) + |> then(&process_block_and_check_children(store, &1)) + + {:ok, store} + else + _ -> {:ok, store} + end + end) + end + + @spec process_blobs(Store.t(), {:error, any()}) :: {:ok, Store.t()} + def process_blobs(store, {:error, reason}) do + # We might want to declare a block invalid here. + Logger.error("[PendingBlocks] Error downloading blobs: #{inspect(reason)}") + {:ok, store} + end + ########################## ### Private Functions ########################## @@ -184,51 +218,4 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do Logger.error("[PendingBlocks] Error downloading block: #{inspect(reason)}") {:ok, store} end - - def process_blobs(store, {:ok, blobs}), do: {:ok, add_blobs(store, blobs)} - - def process_blobs(store, {:error, reason}) do - # We might want to declare a block invalid here. - Logger.error("[PendingBlocks] Error downloading blobs: #{inspect(reason)}") - {:ok, store} - end - - def add_blob(store, blob), do: add_blobs(store, [blob]) - - # To be used when a series of blobs are downloaded. Stores each blob. - # If there are blocks that can be processed, does so immediately. - defp add_blobs(store, blobs) do - blobs - |> Enum.map(&BlobDb.store_blob/1) - |> Enum.uniq() - |> Enum.reduce(store, fn root, store -> - with %BlockInfo{status: :download_blobs} = block_info <- Blocks.get_block_info(root), - [] <- missing_blobs(block_info) do - block_info - |> Blocks.change_status(:pending) - |> then(&process_block_and_check_children(store, &1)) - else - _ -> - store - end - end) - end - - @spec missing_blobs(BlockInfo.t()) :: [Types.BlobIdentifier.t()] - def missing_blobs(%BlockInfo{root: root, signed_block: signed_block}) do - signed_block.message.body.blob_kzg_commitments - |> Stream.with_index() - |> Enum.filter(&blob_needs_download?(&1, root)) - |> Enum.map(&%Types.BlobIdentifier{block_root: root, index: elem(&1, 1)}) - end - - defp blob_needs_download?({commitment, index}, block_root) do - case BlobDb.get_blob_sidecar(block_root, index) do - {:ok, %{kzg_commitment: ^commitment}} -> - false - - _ -> - true - end - end end diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex b/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex index eed5da8c8..bfb3100c9 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex @@ -18,7 +18,8 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do Ssz.from_ssz(uncompressed, Types.BlobSidecar) do Logger.debug("[Gossip] Blob sidecar received, with index #{blob_index}") Libp2pPort.validate_message(msg_id, :accept) - PendingBlocks.add_blob(store, blob) + # TODO: (#1406) Enhance the API to reduce unnecessary wrappers (:ok + list) + PendingBlocks.process_blobs(store, {:ok, [blob]}) else {:error, reason} -> Logger.warning("[Gossip] Blob rejected, reason: #{inspect(reason)}") diff --git a/lib/lambda_ethereum_consensus/store/blobs.ex b/lib/lambda_ethereum_consensus/store/blobs.ex new file mode 100644 index 000000000..be5043b6c --- /dev/null +++ b/lib/lambda_ethereum_consensus/store/blobs.ex @@ -0,0 +1,38 @@ +defmodule LambdaEthereumConsensus.Store.Blobs do + @moduledoc """ + Interface to `Store.Blobs`. + """ + require Logger + + alias LambdaEthereumConsensus.Store.BlobDb + alias Types.BlobSidecar + alias Types.BlockInfo + + @doc """ + To be used when a series of blobs are downloaded. Stores each blob. + """ + @spec add_blobs([BlobSidecar.t()]) :: [Types.root()] + def add_blobs(blobs) do + blobs + |> Enum.map(&BlobDb.store_blob/1) + |> Enum.uniq() + end + + @spec missing_for_block(BlockInfo.t()) :: [Types.BlobIdentifier.t()] + def missing_for_block(%BlockInfo{root: root, signed_block: signed_block}) do + signed_block.message.body.blob_kzg_commitments + |> Stream.with_index() + |> Enum.filter(&present?(&1, root)) + |> Enum.map(&%Types.BlobIdentifier{block_root: root, index: elem(&1, 1)}) + end + + defp present?({commitment, index}, block_root) do + case BlobDb.get_blob_sidecar(block_root, index) do + {:ok, %{kzg_commitment: ^commitment}} -> + false + + _ -> + true + end + end +end diff --git a/test/fixtures/blobs/blob_sidecar.ssz_snappy b/test/fixtures/blobs/blob_sidecar.ssz_snappy new file mode 100644 index 000000000..0fdc9cb03 Binary files /dev/null and b/test/fixtures/blobs/blob_sidecar.ssz_snappy differ diff --git a/test/unit/blobs_test.exs b/test/unit/blobs_test.exs new file mode 100644 index 000000000..288d34673 --- /dev/null +++ b/test/unit/blobs_test.exs @@ -0,0 +1,74 @@ +defmodule Unit.BlobsTest do + use ExUnit.Case + alias Fixtures.Block + alias LambdaEthereumConsensus.Store.BlobDb + alias LambdaEthereumConsensus.Store.Blobs + alias SpecTestUtils + alias Types.BlobSidecar + alias Types.BlockInfo + + setup %{tmp_dir: tmp_dir} do + start_link_supervised!({LambdaEthereumConsensus.Store.Db, dir: tmp_dir}) + start_link_supervised!(LambdaEthereumConsensus.Store.Blocks) + + Application.fetch_env!(:lambda_ethereum_consensus, ChainSpec) + |> Keyword.put(:config, MainnetConfig) + |> then(&Application.put_env(:lambda_ethereum_consensus, ChainSpec, &1)) + + # Blob sidecar from spec test + blob_sidecar = + SpecTestUtils.read_ssz_from_file!( + "test/fixtures/blobs/blob_sidecar.ssz_snappy", + BlobSidecar + ) + + {:ok, blob_sidecar: blob_sidecar} + end + + defp new_block_info() do + Block.signed_beacon_block() |> BlockInfo.from_block() + end + + describe "Blobs unit tests" do + @tag :tmp_dir + test "Basic blobs saving and loading", %{blob_sidecar: blob_sidecar} do + Blobs.add_blobs([blob_sidecar]) + block_root = Ssz.hash_tree_root!(blob_sidecar.signed_block_header.message) + index = blob_sidecar.index + {:ok, recovered_blob} = BlobDb.get_blob_sidecar(block_root, index) + + assert(blob_sidecar == recovered_blob) + end + + @tag :tmp_dir + test "One missing blob from block, then add, then no missing blobs", %{ + blob_sidecar: blob_sidecar + } do + blob_sidecar = %BlobSidecar{blob_sidecar | index: 0} + + # Create random block info + block_info = new_block_info() + # add blob_sidecar kzg_commitment to the block_info + block_info = + put_in( + block_info.signed_block.message.body.blob_kzg_commitments, + [blob_sidecar.kzg_commitment] + ) + + # change block root to the one from the blob + block_info = %BlockInfo{ + block_info + | root: Ssz.hash_tree_root!(blob_sidecar.signed_block_header.message) + } + + # check that the blob is detetected as missing + missing = Blobs.missing_for_block(block_info) + assert(length(missing) == 1) + # add blob to db + Blobs.add_blobs([blob_sidecar]) + # check that the blob is not missing + missing = Blobs.missing_for_block(block_info) + assert(Enum.empty?(missing)) + end + end +end