Skip to content

Commit 8996000

Browse files
0x00101010nalepae
andauthored
feature: Implement data column support for different storage layouts (#15014)
* Implement data column support for different storage layouts * Fix errors * Fix linting * `slotFromFile`: First try to decode as a data column. --------- Co-authored-by: Manu NALEPA <[email protected]>
1 parent a2fcba2 commit 8996000

File tree

13 files changed

+442
-61
lines changed

13 files changed

+442
-61
lines changed

beacon-chain/blockchain/kzg/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ go_test(
3030
deps = [
3131
"//consensus-types/blocks:go_default_library",
3232
"//testing/require:go_default_library",
33-
"//testing/util:go_default_library",
33+
"@com_github_consensys_gnark_crypto//ecc/bls12-381/fr:go_default_library",
3434
"@com_github_crate_crypto_go_kzg_4844//:go_default_library",
35+
"@com_github_sirupsen_logrus//:go_default_library",
3536
],
3637
)

beacon-chain/blockchain/kzg/validation_test.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package kzg
22

33
import (
4+
"bytes"
5+
"crypto/sha256"
6+
"encoding/binary"
47
"testing"
58

9+
"github.com/consensys/gnark-crypto/ecc/bls12-381/fr"
610
GoKZG "github.com/crate-crypto/go-kzg-4844"
711
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
812
"github.com/prysmaticlabs/prysm/v5/testing/require"
9-
"github.com/prysmaticlabs/prysm/v5/testing/util"
13+
"github.com/sirupsen/logrus"
1014
)
1115

1216
func GenerateCommitmentAndProof(blob GoKZG.Blob) (GoKZG.KZGCommitment, GoKZG.KZGProof, error) {
@@ -37,11 +41,44 @@ func TestBytesToAny(t *testing.T) {
3741
}
3842

3943
func TestGenerateCommitmentAndProof(t *testing.T) {
40-
blob := util.GetRandBlob(123)
44+
blob := getRandBlob(123)
4145
commitment, proof, err := GenerateCommitmentAndProof(blob)
4246
require.NoError(t, err)
4347
expectedCommitment := GoKZG.KZGCommitment{180, 218, 156, 194, 59, 20, 10, 189, 186, 254, 132, 93, 7, 127, 104, 172, 238, 240, 237, 70, 83, 89, 1, 152, 99, 0, 165, 65, 143, 62, 20, 215, 230, 14, 205, 95, 28, 245, 54, 25, 160, 16, 178, 31, 232, 207, 38, 85}
4448
expectedProof := GoKZG.KZGProof{128, 110, 116, 170, 56, 111, 126, 87, 229, 234, 211, 42, 110, 150, 129, 206, 73, 142, 167, 243, 90, 149, 240, 240, 236, 204, 143, 182, 229, 249, 81, 27, 153, 171, 83, 70, 144, 250, 42, 1, 188, 215, 71, 235, 30, 7, 175, 86}
4549
require.Equal(t, expectedCommitment, commitment)
4650
require.Equal(t, expectedProof, proof)
4751
}
52+
53+
func deterministicRandomness(seed int64) [32]byte {
54+
// Converts an int64 to a byte slice
55+
buf := new(bytes.Buffer)
56+
err := binary.Write(buf, binary.BigEndian, seed)
57+
if err != nil {
58+
logrus.WithError(err).Error("Failed to write int64 to bytes buffer")
59+
return [32]byte{}
60+
}
61+
bytes := buf.Bytes()
62+
63+
return sha256.Sum256(bytes)
64+
}
65+
66+
// Returns a serialized random field element in big-endian
67+
func getRandFieldElement(seed int64) [32]byte {
68+
bytes := deterministicRandomness(seed)
69+
var r fr.Element
70+
r.SetBytes(bytes[:])
71+
72+
return GoKZG.SerializeScalar(r)
73+
}
74+
75+
// Returns a random blob using the passed seed as entropy
76+
func getRandBlob(seed int64) GoKZG.Blob {
77+
var blob GoKZG.Blob
78+
bytesPerBlob := GoKZG.ScalarsPerBlob * GoKZG.SerializedScalarSize
79+
for i := 0; i < bytesPerBlob; i += GoKZG.SerializedScalarSize {
80+
fieldElementBytes := getRandFieldElement(seed + int64(i))
81+
copy(blob[i:i+GoKZG.SerializedScalarSize], fieldElementBytes[:])
82+
}
83+
return blob
84+
}

beacon-chain/db/filesystem/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ go_library(
2626
"//consensus-types/primitives:go_default_library",
2727
"//encoding/bytesutil:go_default_library",
2828
"//io/file:go_default_library",
29+
"//proto/prysm/v1alpha1:go_default_library",
2930
"//runtime/logging:go_default_library",
3031
"//time/slots:go_default_library",
3132
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
@@ -49,10 +50,12 @@ go_test(
4950
],
5051
embed = [":go_default_library"],
5152
deps = [
53+
"//beacon-chain/blockchain/kzg:go_default_library",
5254
"//beacon-chain/db:go_default_library",
5355
"//beacon-chain/verification:go_default_library",
5456
"//config/fieldparams:go_default_library",
5557
"//config/params:go_default_library",
58+
"//consensus-types/blocks:go_default_library",
5659
"//consensus-types/primitives:go_default_library",
5760
"//encoding/bytesutil:go_default_library",
5861
"//proto/prysm/v1alpha1:go_default_library",

beacon-chain/db/filesystem/blob.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ func (bs *BlobStorage) GetColumn(root [32]byte, idx uint64) (blocks.VerifiedRODa
401401
return verification.VerifiedRODataColumnFromDisk(bs.fs, root, bs.layout.sszPath(ident))
402402
}
403403

404-
// Remove removes all blobs for a given root.
404+
// Remove removes all blobs or data columns for a given root.
405405
func (bs *BlobStorage) Remove(root [32]byte) error {
406406
dirIdent, err := bs.layout.dirIdent(root)
407407
if err != nil {

beacon-chain/db/filesystem/blob_test.go

Lines changed: 159 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,19 @@ package filesystem
22

33
import (
44
"bytes"
5+
"fmt"
56
"math"
67
"os"
78
"path"
89
"sync"
910
"testing"
1011

1112
ssz "github.com/prysmaticlabs/fastssz"
13+
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg"
1214
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
1315
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
1416
"github.com/prysmaticlabs/prysm/v5/config/params"
17+
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
1518
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
1619
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
1720
"github.com/prysmaticlabs/prysm/v5/testing/require"
@@ -56,7 +59,7 @@ func TestBlobStorage_SaveBlobData(t *testing.T) {
5659
require.NoError(t, bs.Save(sc))
5760
actualSc, err := bs.Get(sc.BlockRoot(), sc.Index)
5861
require.NoError(t, err)
59-
expectedIdx := blobIndexMask{false, false, true, false, false, false}
62+
expectedIdx := dataIndexMask{false, false, true, false, false, false}
6063
actualIdx := bs.Summary(actualSc.BlockRoot()).mask
6164
require.NoError(t, err)
6265
require.DeepEqual(t, expectedIdx, actualIdx)
@@ -221,3 +224,158 @@ func TestLayoutNames(t *testing.T) {
221224
_, err := newLayout(badLayoutName, nil, nil, nil)
222225
require.ErrorIs(t, err, errInvalidLayoutName)
223226
}
227+
228+
func TestBlobStorage_DataColumn_WithAllLayouts(t *testing.T) {
229+
for _, layout := range LayoutNames {
230+
t.Run(fmt.Sprintf("layout=%s", layout), func(t *testing.T) {
231+
sidecars := setupDataColumnTest(t)
232+
233+
t.Run("no error for duplicate", func(t *testing.T) {
234+
fs, bs := NewEphemeralBlobStorageAndFs(t, WithLayout(layout))
235+
sidecar := sidecars[0]
236+
237+
columnPath := bs.layout.sszPath(identForDataColumnSidecar(sidecar))
238+
data, err := ssz.MarshalSSZ(sidecar)
239+
require.NoError(t, err)
240+
241+
require.NoError(t, bs.SaveDataColumn(sidecar))
242+
// No error when attempting to write twice.
243+
require.NoError(t, bs.SaveDataColumn(sidecar))
244+
245+
content, err := afero.ReadFile(fs, columnPath)
246+
require.NoError(t, err)
247+
require.Equal(t, true, bytes.Equal(data, content))
248+
249+
// Deserialize the DataColumnSidecar from the saved file data.
250+
saved := &ethpb.DataColumnSidecar{}
251+
err = saved.UnmarshalSSZ(content)
252+
require.NoError(t, err)
253+
254+
// Compare the original Sidecar and the saved Sidecar.
255+
require.DeepSSZEqual(t, sidecar.DataColumnSidecar, saved)
256+
})
257+
258+
t.Run("indices", func(t *testing.T) {
259+
bs := NewEphemeralBlobStorage(t, WithLayout(layout))
260+
sidecar := sidecars[2]
261+
require.NoError(t, bs.SaveDataColumn(sidecar))
262+
actual, err := bs.GetColumn(sidecar.BlockRoot(), sidecar.ColumnIndex)
263+
require.NoError(t, err)
264+
require.DeepEqual(t, sidecar, actual)
265+
266+
expectedIdx := make(dataIndexMask, params.BeaconConfig().NumberOfColumns)
267+
expectedIdx[2] = true
268+
actualIdx := bs.Summary(actual.BlockRoot()).mask
269+
require.NoError(t, err)
270+
require.DeepEqual(t, expectedIdx, actualIdx)
271+
272+
sidecar = sidecars[10]
273+
expectedIdx[10] = true
274+
require.NoError(t, bs.SaveDataColumn(sidecar))
275+
actual, err = bs.GetColumn(sidecar.BlockRoot(), sidecar.ColumnIndex)
276+
require.NoError(t, err)
277+
require.DeepEqual(t, sidecar, actual)
278+
actualIdx = bs.Summary(actual.BlockRoot()).mask
279+
require.NoError(t, err)
280+
require.DeepEqual(t, expectedIdx, actualIdx)
281+
})
282+
283+
t.Run("write -> read -> delete", func(t *testing.T) {
284+
bs := NewEphemeralBlobStorage(t, WithLayout(layout))
285+
err := bs.SaveDataColumn(sidecars[0])
286+
require.NoError(t, err)
287+
288+
expected := sidecars[0]
289+
actual, err := bs.GetColumn(expected.BlockRoot(), expected.ColumnIndex)
290+
require.NoError(t, err)
291+
require.DeepEqual(t, expected, actual)
292+
293+
require.NoError(t, bs.Remove(expected.BlockRoot()))
294+
for i := range params.BeaconConfig().NumberOfColumns {
295+
_, err = bs.GetColumn(expected.BlockRoot(), uint64(i))
296+
require.Equal(t, true, db.IsNotFound(err))
297+
}
298+
})
299+
300+
t.Run("clear", func(t *testing.T) {
301+
bs := NewEphemeralBlobStorage(t, WithLayout(layout))
302+
err := bs.SaveDataColumn(sidecars[0])
303+
require.NoError(t, err)
304+
res, err := bs.GetColumn(sidecars[0].BlockRoot(), sidecars[0].ColumnIndex)
305+
require.NoError(t, err)
306+
require.NotNil(t, res)
307+
require.NoError(t, bs.Clear())
308+
// After clearing, the blob should not exist in the db.
309+
_, err = bs.GetColumn(sidecars[0].BlockRoot(), sidecars[0].ColumnIndex)
310+
require.ErrorIs(t, err, os.ErrNotExist)
311+
})
312+
})
313+
}
314+
}
315+
316+
func TestBlobStorage_DataColumn_WithMigrationFromFlatToByEpoch(t *testing.T) {
317+
sidecars := setupDataColumnTest(t)
318+
319+
// Setup flat layout
320+
fs, bs := NewEphemeralBlobStorageAndFs(t, WithLayout(LayoutNameFlat))
321+
sidecar := sidecars[0]
322+
columnPath := bs.layout.sszPath(identForDataColumnSidecar(sidecar))
323+
data, err := ssz.MarshalSSZ(sidecar)
324+
require.NoError(t, err)
325+
require.NoError(t, bs.SaveDataColumn(sidecar))
326+
content, err := afero.ReadFile(fs, columnPath)
327+
require.NoError(t, err)
328+
require.Equal(t, true, bytes.Equal(data, content))
329+
330+
// Setup by-epoch layout
331+
bs = NewWarmedEphemeralBlobStorageUsingFs(t, fs, WithLayout(LayoutNameByEpoch))
332+
333+
// Verify data is the same
334+
columnPath = bs.layout.sszPath(identForDataColumnSidecar(sidecar))
335+
content, err = afero.ReadFile(fs, columnPath)
336+
require.NoError(t, err)
337+
require.Equal(t, true, bytes.Equal(data, content))
338+
}
339+
340+
func TestBlobStorage_DataColumn_WithMigrationFromByEpochToFlat(t *testing.T) {
341+
sidecars := setupDataColumnTest(t)
342+
343+
// Setup by-epoch layout
344+
fs, bs := NewEphemeralBlobStorageAndFs(t, WithLayout(LayoutNameFlat))
345+
for _, sidecar := range sidecars {
346+
require.NoError(t, bs.SaveDataColumn(sidecar))
347+
}
348+
columnPath := bs.layout.sszPath(identForDataColumnSidecar(sidecars[0]))
349+
content, err := afero.ReadFile(fs, columnPath)
350+
require.NoError(t, err)
351+
data, err := ssz.MarshalSSZ(sidecars[0])
352+
require.NoError(t, err)
353+
require.Equal(t, true, bytes.Equal(data, content))
354+
355+
// Setup flat layout
356+
bs = NewWarmedEphemeralBlobStorageUsingFs(t, fs, WithLayout(LayoutNameByEpoch))
357+
358+
// Verify data is the same
359+
columnPath = bs.layout.sszPath(identForDataColumnSidecar(sidecars[0]))
360+
content, err = afero.ReadFile(fs, columnPath)
361+
require.NoError(t, err)
362+
require.Equal(t, true, bytes.Equal(data, content))
363+
}
364+
365+
func setupDataColumnTest(t *testing.T) []blocks.VerifiedRODataColumn {
366+
// load trusted setup
367+
err := kzg.Start()
368+
require.NoError(t, err)
369+
370+
// Setup right fork epoch
371+
params.SetupTestConfigCleanup(t)
372+
cfg := params.BeaconConfig().Copy()
373+
cfg.CapellaForkEpoch = 0
374+
cfg.DenebForkEpoch = 0
375+
cfg.ElectraForkEpoch = 0
376+
cfg.FuluForkEpoch = 0
377+
params.OverrideBeaconConfig(cfg)
378+
379+
_, scs := util.GenerateTestFuluBlockWithSidecar(t, [32]byte{}, 0, 1)
380+
return verification.FakeVerifyDataColumnSliceForTest(t, scs)
381+
}

beacon-chain/db/filesystem/cache.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,27 +10,24 @@ import (
1010
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
1111
)
1212

13-
// blobIndexMask is a bitmask representing the set of blob indices that are currently set.
14-
// TODO: Separate blobs from data columns
15-
type blobIndexMask []bool
13+
// dataIndexMask is a bitmask representing the set of blob or data column indices that are currently set.
14+
type dataIndexMask []bool
1615

17-
// type blobIndexMask [fieldparams.NumberOfColumns]bool
18-
19-
// BlobStorageSummary represents cached information about the BlobSidecars on disk for each root the cache knows about.
16+
// BlobStorageSummary represents cached information about the BlobSidecars or DataColumnSidecars on disk for each root the cache knows about.
2017
type BlobStorageSummary struct {
2118
epoch primitives.Epoch
22-
mask blobIndexMask
19+
mask dataIndexMask
2320
}
2421

25-
// HasIndex returns true if the BlobSidecar at the given index is available in the filesystem.
22+
// HasIndex returns true if the BlobSidecar or DataColumnSidecar at the given index is available in the filesystem.
2623
func (s BlobStorageSummary) HasIndex(idx uint64) bool {
2724
if idx >= uint64(len(s.mask)) {
2825
return false
2926
}
3027
return s.mask[idx]
3128
}
3229

33-
// HasDataColumnIndex true if the DataColumnSidecar at the given index is available in the filesystem.
30+
// HasDataColumnIndex returns true if the DataColumnSidecar at the given index is available in the filesystem.
3431
func (s BlobStorageSummary) HasDataColumnIndex(idx uint64) bool {
3532
// Protect from panic, but assume callers are sophisticated enough to not need an error telling them they have an invalid idx.
3633
numberOfColumns := params.BeaconConfig().NumberOfColumns
@@ -134,7 +131,7 @@ func (s *blobStorageSummaryCache) ensure(ident blobIdent) error {
134131
v := s.cache[ident.root]
135132
v.epoch = ident.epoch
136133
if v.mask == nil {
137-
v.mask = make(blobIndexMask, maskSize)
134+
v.mask = make(dataIndexMask, maskSize)
138135
}
139136
if !v.mask[ident.index] {
140137
s.updateMetrics(1)

beacon-chain/db/filesystem/cache_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func TestSlotByRoot_Summary(t *testing.T) {
2424
cases := []struct {
2525
name string
2626
root [32]byte
27-
expected blobIndexMask
27+
expected dataIndexMask
2828
}{
2929
{
3030
name: "not found",
@@ -193,7 +193,7 @@ func TestHasDataColumnIndex(t *testing.T) {
193193
}
194194
}
195195

196-
mask := make(blobIndexMask, maxIndex+1)
196+
mask := make(dataIndexMask, maxIndex+1)
197197

198198
for idx := range storedIndices {
199199
mask[idx] = true
@@ -263,7 +263,7 @@ func TestAllDataColumnAvailable(t *testing.T) {
263263
}
264264
}
265265

266-
mask := make(blobIndexMask, maxIndex+1)
266+
mask := make(dataIndexMask, maxIndex+1)
267267

268268
for idx := range c.storedIndices {
269269
mask[idx] = true

beacon-chain/db/filesystem/iteration_test.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package filesystem
22

33
import (
4-
"bytes"
54
"fmt"
65
"io"
76
"math"
@@ -220,28 +219,6 @@ func TestListDir(t *testing.T) {
220219
}
221220
}
222221

223-
func TestSlotFromBlob(t *testing.T) {
224-
cases := []struct {
225-
slot primitives.Slot
226-
}{
227-
{slot: 0},
228-
{slot: 2},
229-
{slot: 1123581321},
230-
{slot: math.MaxUint64},
231-
}
232-
for _, c := range cases {
233-
t.Run(fmt.Sprintf("slot %d", c.slot), func(t *testing.T) {
234-
_, sidecars := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, c.slot, 1)
235-
sc := sidecars[0]
236-
enc, err := sc.MarshalSSZ()
237-
require.NoError(t, err)
238-
slot, err := slotFromBlob(bytes.NewReader(enc))
239-
require.NoError(t, err)
240-
require.Equal(t, c.slot, slot)
241-
})
242-
}
243-
}
244-
245222
func TestIterationComplete(t *testing.T) {
246223
targets := []migrationTestTarget{
247224
{

0 commit comments

Comments
 (0)