Skip to content

Commit 35d4c18

Browse files
LeanSerrarodrigo-o
andauthored
refactor: move blob operations to blobs module (#1404)
Co-authored-by: Rodrigo Oliveri <[email protected]>
1 parent de1dc93 commit 35d4c18

File tree

7 files changed

+153
-53
lines changed

7 files changed

+153
-53
lines changed

.tool-versions

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
erlang 26.2
22
elixir 1.16.2-otp-26
3-
golang 1.21.3
3+
golang 1.22
44
rust 1.81.0
55
protoc 24.3

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# libp2p port
2-
FROM golang:1.21.3 AS libp2p_builder
2+
FROM golang:1.22 AS libp2p_builder
33
LABEL stage=builder
44

55
# Install dependencies

lib/lambda_ethereum_consensus/beacon/pending_blocks.ex

Lines changed: 37 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
1010
alias LambdaEthereumConsensus.Metrics
1111
alias LambdaEthereumConsensus.P2P.BlobDownloader
1212
alias LambdaEthereumConsensus.P2P.BlockDownloader
13-
alias LambdaEthereumConsensus.Store.BlobDb
13+
alias LambdaEthereumConsensus.Store.Blobs
1414
alias LambdaEthereumConsensus.Store.Blocks
1515
alias LambdaEthereumConsensus.Utils
1616
alias Types.BlockInfo
@@ -46,15 +46,20 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
4646

4747
# If the block is new or was to be downloaded, we store it.
4848
if is_nil(loaded_block) or loaded_block.status == :download do
49-
missing_blobs = missing_blobs(block_info)
49+
missing_blobs = Blobs.missing_for_block(block_info)
5050

5151
if Enum.empty?(missing_blobs) do
5252
Logger.debug("[PendingBlocks] No missing blobs for block, process it", log_md)
5353
Blocks.new_block_info(block_info)
5454
process_block_and_check_children(store, block_info)
5555
else
5656
Logger.debug("[PendingBlocks] Missing blobs for block, scheduling download", log_md)
57-
BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/2, @download_retries)
57+
58+
BlobDownloader.request_blobs_by_root(
59+
missing_blobs,
60+
&process_blobs/2,
61+
@download_retries
62+
)
5863

5964
block_info
6065
|> BlockInfo.change_status(:download_blobs)
@@ -72,6 +77,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
7277
module after receiving a new block, but there are some other cases like at node startup, as there
7378
may be pending blocks from prior executions.
7479
"""
80+
@spec process_blocks(Store.t()) :: Store.t()
7581
def process_blocks(store) do
7682
case Blocks.get_blocks_with_status(:pending) do
7783
{:ok, blocks} ->
@@ -92,6 +98,34 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
9298
end
9399
end
94100

101+
@doc """
102+
Process incoming blobs if the block can be processed does so immediately.
103+
"""
104+
@spec process_blobs(Store.t(), {:ok, [Types.BlobSidecar.t()]}) :: {:ok, Store.t()}
105+
def process_blobs(store, {:ok, blobs}) do
106+
blobs
107+
|> Blobs.add_blobs()
108+
|> Enum.reduce(store, fn root, store ->
109+
with %BlockInfo{status: :download_blobs} = block_info <- Blocks.get_block_info(root),
110+
[] <- Blobs.missing_for_block(block_info) do
111+
block_info
112+
|> Blocks.change_status(:pending)
113+
|> then(&process_block_and_check_children(store, &1))
114+
115+
{:ok, store}
116+
else
117+
_ -> {:ok, store}
118+
end
119+
end)
120+
end
121+
122+
@spec process_blobs(Store.t(), {:error, any()}) :: {:ok, Store.t()}
123+
def process_blobs(store, {:error, reason}) do
124+
# We might want to declare a block invalid here.
125+
Logger.error("[PendingBlocks] Error downloading blobs: #{inspect(reason)}")
126+
{:ok, store}
127+
end
128+
95129
##########################
96130
### Private Functions
97131
##########################
@@ -184,51 +218,4 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
184218
Logger.error("[PendingBlocks] Error downloading block: #{inspect(reason)}")
185219
{:ok, store}
186220
end
187-
188-
def process_blobs(store, {:ok, blobs}), do: {:ok, add_blobs(store, blobs)}
189-
190-
def process_blobs(store, {:error, reason}) do
191-
# We might want to declare a block invalid here.
192-
Logger.error("[PendingBlocks] Error downloading blobs: #{inspect(reason)}")
193-
{:ok, store}
194-
end
195-
196-
def add_blob(store, blob), do: add_blobs(store, [blob])
197-
198-
# To be used when a series of blobs are downloaded. Stores each blob.
199-
# If there are blocks that can be processed, does so immediately.
200-
defp add_blobs(store, blobs) do
201-
blobs
202-
|> Enum.map(&BlobDb.store_blob/1)
203-
|> Enum.uniq()
204-
|> Enum.reduce(store, fn root, store ->
205-
with %BlockInfo{status: :download_blobs} = block_info <- Blocks.get_block_info(root),
206-
[] <- missing_blobs(block_info) do
207-
block_info
208-
|> Blocks.change_status(:pending)
209-
|> then(&process_block_and_check_children(store, &1))
210-
else
211-
_ ->
212-
store
213-
end
214-
end)
215-
end
216-
217-
@spec missing_blobs(BlockInfo.t()) :: [Types.BlobIdentifier.t()]
218-
def missing_blobs(%BlockInfo{root: root, signed_block: signed_block}) do
219-
signed_block.message.body.blob_kzg_commitments
220-
|> Stream.with_index()
221-
|> Enum.filter(&blob_needs_download?(&1, root))
222-
|> Enum.map(&%Types.BlobIdentifier{block_root: root, index: elem(&1, 1)})
223-
end
224-
225-
defp blob_needs_download?({commitment, index}, block_root) do
226-
case BlobDb.get_blob_sidecar(block_root, index) do
227-
{:ok, %{kzg_commitment: ^commitment}} ->
228-
false
229-
230-
_ ->
231-
true
232-
end
233-
end
234221
end

lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do
1818
Ssz.from_ssz(uncompressed, Types.BlobSidecar) do
1919
Logger.debug("[Gossip] Blob sidecar received, with index #{blob_index}")
2020
Libp2pPort.validate_message(msg_id, :accept)
21-
PendingBlocks.add_blob(store, blob)
21+
# TODO: (#1406) Enhance the API to reduce unnecessary wrappers (:ok + list)
22+
PendingBlocks.process_blobs(store, {:ok, [blob]})
2223
else
2324
{:error, reason} ->
2425
Logger.warning("[Gossip] Blob rejected, reason: #{inspect(reason)}")
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
defmodule LambdaEthereumConsensus.Store.Blobs do
2+
@moduledoc """
3+
Interface to `Store.Blobs`.
4+
"""
5+
require Logger
6+
7+
alias LambdaEthereumConsensus.Store.BlobDb
8+
alias Types.BlobSidecar
9+
alias Types.BlockInfo
10+
11+
@doc """
12+
To be used when a series of blobs are downloaded. Stores each blob.
13+
"""
14+
@spec add_blobs([BlobSidecar.t()]) :: [Types.root()]
15+
def add_blobs(blobs) do
16+
blobs
17+
|> Enum.map(&BlobDb.store_blob/1)
18+
|> Enum.uniq()
19+
end
20+
21+
@spec missing_for_block(BlockInfo.t()) :: [Types.BlobIdentifier.t()]
22+
def missing_for_block(%BlockInfo{root: root, signed_block: signed_block}) do
23+
signed_block.message.body.blob_kzg_commitments
24+
|> Stream.with_index()
25+
|> Enum.filter(&present?(&1, root))
26+
|> Enum.map(&%Types.BlobIdentifier{block_root: root, index: elem(&1, 1)})
27+
end
28+
29+
defp present?({commitment, index}, block_root) do
30+
case BlobDb.get_blob_sidecar(block_root, index) do
31+
{:ok, %{kzg_commitment: ^commitment}} ->
32+
false
33+
34+
_ ->
35+
true
36+
end
37+
end
38+
end
129 KB
Binary file not shown.

test/unit/blobs_test.exs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
defmodule Unit.BlobsTest do
2+
use ExUnit.Case
3+
alias Fixtures.Block
4+
alias LambdaEthereumConsensus.Store.BlobDb
5+
alias LambdaEthereumConsensus.Store.Blobs
6+
alias SpecTestUtils
7+
alias Types.BlobSidecar
8+
alias Types.BlockInfo
9+
10+
setup %{tmp_dir: tmp_dir} do
11+
start_link_supervised!({LambdaEthereumConsensus.Store.Db, dir: tmp_dir})
12+
start_link_supervised!(LambdaEthereumConsensus.Store.Blocks)
13+
14+
Application.fetch_env!(:lambda_ethereum_consensus, ChainSpec)
15+
|> Keyword.put(:config, MainnetConfig)
16+
|> then(&Application.put_env(:lambda_ethereum_consensus, ChainSpec, &1))
17+
18+
# Blob sidecar from spec test
19+
blob_sidecar =
20+
SpecTestUtils.read_ssz_from_file!(
21+
"test/fixtures/blobs/blob_sidecar.ssz_snappy",
22+
BlobSidecar
23+
)
24+
25+
{:ok, blob_sidecar: blob_sidecar}
26+
end
27+
28+
defp new_block_info() do
29+
Block.signed_beacon_block() |> BlockInfo.from_block()
30+
end
31+
32+
describe "Blobs unit tests" do
33+
@tag :tmp_dir
34+
test "Basic blobs saving and loading", %{blob_sidecar: blob_sidecar} do
35+
Blobs.add_blobs([blob_sidecar])
36+
block_root = Ssz.hash_tree_root!(blob_sidecar.signed_block_header.message)
37+
index = blob_sidecar.index
38+
{:ok, recovered_blob} = BlobDb.get_blob_sidecar(block_root, index)
39+
40+
assert(blob_sidecar == recovered_blob)
41+
end
42+
43+
@tag :tmp_dir
44+
test "One missing blob from block, then add, then no missing blobs", %{
45+
blob_sidecar: blob_sidecar
46+
} do
47+
blob_sidecar = %BlobSidecar{blob_sidecar | index: 0}
48+
49+
# Create random block info
50+
block_info = new_block_info()
51+
# add blob_sidecar kzg_commitment to the block_info
52+
block_info =
53+
put_in(
54+
block_info.signed_block.message.body.blob_kzg_commitments,
55+
[blob_sidecar.kzg_commitment]
56+
)
57+
58+
# change block root to the one from the blob
59+
block_info = %BlockInfo{
60+
block_info
61+
| root: Ssz.hash_tree_root!(blob_sidecar.signed_block_header.message)
62+
}
63+
64+
# check that the blob is detetected as missing
65+
missing = Blobs.missing_for_block(block_info)
66+
assert(length(missing) == 1)
67+
# add blob to db
68+
Blobs.add_blobs([blob_sidecar])
69+
# check that the blob is not missing
70+
missing = Blobs.missing_for_block(block_info)
71+
assert(Enum.empty?(missing))
72+
end
73+
end
74+
end

0 commit comments

Comments
 (0)