Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
erlang 26.2
elixir 1.16.2-otp-26
golang 1.21.3
golang 1.22
rust 1.81.0
protoc 24.3
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# libp2p port
FROM golang:1.21.3 AS libp2p_builder
FROM golang:1.22 AS libp2p_builder
LABEL stage=builder

# Install dependencies
Expand Down Expand Up @@ -120,4 +120,4 @@
# TODO: This could be an issue regarding OS signals, we should use JSONArgs but shell form is the
# only way to pass args to ENTRYPOINT, specially important because of the cookie. Best
# solution would be to move to releases and avoid starting the node manually through iex.
ENTRYPOINT iex $IEX_ARGS_VALUE -S mix run -- $0 $@

Check warning on line 123 in Dockerfile

View workflow job for this annotation

GitHub Actions / Build Docker image

JSON arguments recommended for ENTRYPOINT/CMD to prevent unintended behavior related to OS signals

JSONArgsRecommended: JSON arguments recommended for ENTRYPOINT to prevent unintended behavior related to OS signals More info: https://docs.docker.com/go/dockerfile/rule/json-args-recommended/

Check warning on line 123 in Dockerfile

View workflow job for this annotation

GitHub Actions / ethereum-testnet

JSON arguments recommended for ENTRYPOINT/CMD to prevent unintended behavior related to OS signals

JSONArgsRecommended: JSON arguments recommended for ENTRYPOINT to prevent unintended behavior related to OS signals More info: https://docs.docker.com/go/dockerfile/rule/json-args-recommended/
87 changes: 37 additions & 50 deletions lib/lambda_ethereum_consensus/beacon/pending_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
alias LambdaEthereumConsensus.Metrics
alias LambdaEthereumConsensus.P2P.BlobDownloader
alias LambdaEthereumConsensus.P2P.BlockDownloader
alias LambdaEthereumConsensus.Store.BlobDb
alias LambdaEthereumConsensus.Store.Blobs
alias LambdaEthereumConsensus.Store.Blocks
alias LambdaEthereumConsensus.Utils
alias Types.BlockInfo
Expand Down Expand Up @@ -46,15 +46,20 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do

# If the block is new or was to be downloaded, we store it.
if is_nil(loaded_block) or loaded_block.status == :download do
missing_blobs = missing_blobs(block_info)
missing_blobs = Blobs.missing_for_block(block_info)

if Enum.empty?(missing_blobs) do
Logger.debug("[PendingBlocks] No missing blobs for block, process it", log_md)
Blocks.new_block_info(block_info)
process_block_and_check_children(store, block_info)
else
Logger.debug("[PendingBlocks] Missing blobs for block, scheduling download", log_md)
BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/2, @download_retries)

BlobDownloader.request_blobs_by_root(
missing_blobs,
&process_blobs/2,
@download_retries
)

block_info
|> BlockInfo.change_status(:download_blobs)
Expand All @@ -72,6 +77,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
module after receiving a new block, but there are some other cases like at node startup, as there
may be pending blocks from prior executions.
"""
@spec process_blocks(Store.t()) :: Store.t()
def process_blocks(store) do
case Blocks.get_blocks_with_status(:pending) do
{:ok, blocks} ->
Expand All @@ -92,6 +98,34 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
end
end

@doc """
Process incoming blobs if the block can be processed does so immediately.
"""
@spec process_blobs(Store.t(), {:ok, [Types.BlobSidecar.t()]}) :: {:ok, Store.t()}
def process_blobs(store, {:ok, blobs}) do
blobs
|> Blobs.add_blobs()
|> Enum.reduce(store, fn root, store ->
with %BlockInfo{status: :download_blobs} = block_info <- Blocks.get_block_info(root),
[] <- Blobs.missing_for_block(block_info) do
block_info
|> Blocks.change_status(:pending)
|> then(&process_block_and_check_children(store, &1))

{:ok, store}
else
_ -> {:ok, store}
end
end)
end

@spec process_blobs(Store.t(), {:error, any()}) :: {:ok, Store.t()}
def process_blobs(store, {:error, reason}) do
# We might want to declare a block invalid here.
Logger.error("[PendingBlocks] Error downloading blobs: #{inspect(reason)}")
{:ok, store}
end

##########################
### Private Functions
##########################
Expand Down Expand Up @@ -184,51 +218,4 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
Logger.error("[PendingBlocks] Error downloading block: #{inspect(reason)}")
{:ok, store}
end

def process_blobs(store, {:ok, blobs}), do: {:ok, add_blobs(store, blobs)}

def process_blobs(store, {:error, reason}) do
# We might want to declare a block invalid here.
Logger.error("[PendingBlocks] Error downloading blobs: #{inspect(reason)}")
{:ok, store}
end

def add_blob(store, blob), do: add_blobs(store, [blob])

# To be used when a series of blobs are downloaded. Stores each blob.
# If there are blocks that can be processed, does so immediately.
defp add_blobs(store, blobs) do
blobs
|> Enum.map(&BlobDb.store_blob/1)
|> Enum.uniq()
|> Enum.reduce(store, fn root, store ->
with %BlockInfo{status: :download_blobs} = block_info <- Blocks.get_block_info(root),
[] <- missing_blobs(block_info) do
block_info
|> Blocks.change_status(:pending)
|> then(&process_block_and_check_children(store, &1))
else
_ ->
store
end
end)
end

@spec missing_blobs(BlockInfo.t()) :: [Types.BlobIdentifier.t()]
def missing_blobs(%BlockInfo{root: root, signed_block: signed_block}) do
signed_block.message.body.blob_kzg_commitments
|> Stream.with_index()
|> Enum.filter(&blob_needs_download?(&1, root))
|> Enum.map(&%Types.BlobIdentifier{block_root: root, index: elem(&1, 1)})
end

defp blob_needs_download?({commitment, index}, block_root) do
case BlobDb.get_blob_sidecar(block_root, index) do
{:ok, %{kzg_commitment: ^commitment}} ->
false

_ ->
true
end
end
end
3 changes: 2 additions & 1 deletion lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do
Ssz.from_ssz(uncompressed, Types.BlobSidecar) do
Logger.debug("[Gossip] Blob sidecar received, with index #{blob_index}")
Libp2pPort.validate_message(msg_id, :accept)
PendingBlocks.add_blob(store, blob)
# TODO: (#1406) Enhance the API to reduce unnecessary wrappers (:ok + list)
PendingBlocks.process_blobs(store, {:ok, [blob]})
else
{:error, reason} ->
Logger.warning("[Gossip] Blob rejected, reason: #{inspect(reason)}")
Expand Down
38 changes: 38 additions & 0 deletions lib/lambda_ethereum_consensus/store/blobs.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule LambdaEthereumConsensus.Store.Blobs do
@moduledoc """
Interface to `Store.Blobs`.
"""
require Logger

alias LambdaEthereumConsensus.Store.BlobDb
alias Types.BlobSidecar
alias Types.BlockInfo

@doc """
To be used when a series of blobs are downloaded. Stores each blob.
"""
@spec add_blobs([BlobSidecar.t()]) :: [Types.root()]
def add_blobs(blobs) do
blobs
|> Enum.map(&BlobDb.store_blob/1)
|> Enum.uniq()
end

@spec missing_for_block(BlockInfo.t()) :: [Types.BlobIdentifier.t()]
def missing_for_block(%BlockInfo{root: root, signed_block: signed_block}) do
signed_block.message.body.blob_kzg_commitments
|> Stream.with_index()
|> Enum.filter(&present?(&1, root))
|> Enum.map(&%Types.BlobIdentifier{block_root: root, index: elem(&1, 1)})
end

defp present?({commitment, index}, block_root) do
case BlobDb.get_blob_sidecar(block_root, index) do
{:ok, %{kzg_commitment: ^commitment}} ->
false

_ ->
true
end
end
end
Binary file added test/fixtures/blobs/blob_sidecar.ssz_snappy
Binary file not shown.
74 changes: 74 additions & 0 deletions test/unit/blobs_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
defmodule Unit.BlobsTest do
use ExUnit.Case
alias Fixtures.Block
alias LambdaEthereumConsensus.Store.BlobDb
alias LambdaEthereumConsensus.Store.Blobs
alias SpecTestUtils
alias Types.BlobSidecar
alias Types.BlockInfo

setup %{tmp_dir: tmp_dir} do
start_link_supervised!({LambdaEthereumConsensus.Store.Db, dir: tmp_dir})
start_link_supervised!(LambdaEthereumConsensus.Store.Blocks)

Application.fetch_env!(:lambda_ethereum_consensus, ChainSpec)
|> Keyword.put(:config, MainnetConfig)
|> then(&Application.put_env(:lambda_ethereum_consensus, ChainSpec, &1))
Comment on lines +14 to +16
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!


# Blob sidecar from spec test
blob_sidecar =
SpecTestUtils.read_ssz_from_file!(
"test/fixtures/blobs/blob_sidecar.ssz_snappy",
BlobSidecar
)

{:ok, blob_sidecar: blob_sidecar}
end

defp new_block_info() do
Block.signed_beacon_block() |> BlockInfo.from_block()
end

describe "Blobs unit tests" do
@tag :tmp_dir
test "Basic blobs saving and loading", %{blob_sidecar: blob_sidecar} do
Blobs.add_blobs([blob_sidecar])
block_root = Ssz.hash_tree_root!(blob_sidecar.signed_block_header.message)
index = blob_sidecar.index
{:ok, recovered_blob} = BlobDb.get_blob_sidecar(block_root, index)

assert(blob_sidecar == recovered_blob)
end

@tag :tmp_dir
test "One missing blob from block, then add, then no missing blobs", %{
blob_sidecar: blob_sidecar
} do
blob_sidecar = %BlobSidecar{blob_sidecar | index: 0}

# Create random block info
block_info = new_block_info()
# add blob_sidecar kzg_commitment to the block_info
block_info =
put_in(
block_info.signed_block.message.body.blob_kzg_commitments,
[blob_sidecar.kzg_commitment]
)

# change block root to the one from the blob
block_info = %BlockInfo{
block_info
| root: Ssz.hash_tree_root!(blob_sidecar.signed_block_header.message)
}

# check that the blob is detetected as missing
missing = Blobs.missing_for_block(block_info)
assert(length(missing) == 1)
# add blob to db
Blobs.add_blobs([blob_sidecar])
# check that the blob is not missing
missing = Blobs.missing_for_block(block_info)
assert(Enum.empty?(missing))
end
end
end