Skip to content

Commit 27ab68c

Browse files
0x00101010nalepae
andauthored
feat: implement reconstruct and broadcast data columns (#15023)
* Implement reconstructAndBroadcastDataColumns * Fix merge error * Fix tests * Minor changes. --------- Co-authored-by: Manu NALEPA <[email protected]>
1 parent ddf5a39 commit 27ab68c

File tree

9 files changed

+235
-152
lines changed

9 files changed

+235
-152
lines changed

beacon-chain/blockchain/testing/mock.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ type ChainService struct {
7474
BlockSlot primitives.Slot
7575
SyncingRoot [32]byte
7676
Blobs []blocks.VerifiedROBlob
77+
DataColumns []blocks.VerifiedRODataColumn
7778
TargetRoot [32]byte
7879
}
7980

@@ -703,7 +704,8 @@ func (c *ChainService) ReceiveBlob(_ context.Context, b blocks.VerifiedROBlob) e
703704
}
704705

705706
// ReceiveDataColumn implements the same method in chain service
706-
func (*ChainService) ReceiveDataColumn(_ blocks.VerifiedRODataColumn) error {
707+
func (c *ChainService) ReceiveDataColumn(dc blocks.VerifiedRODataColumn) error {
708+
c.DataColumns = append(c.DataColumns, dc)
707709
return nil
708710
}
709711

beacon-chain/execution/engine_client.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -642,13 +642,13 @@ func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.
642642
}
643643

644644
// ReconstructDataColumnSidecars reconstructs the verified data column sidecars for a given beacon block.
645-
// It retrieves the KZG commitments from the block body, fetches the associated blobs and cell proofs,
645+
// It retrieves the KZG commitments from the block body, fetches the associated blobs and cell proofs from the EL,
646646
// and constructs the corresponding verified read-only data column sidecars.
647647
func (s *Service) ReconstructDataColumnSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) ([]blocks.VerifiedRODataColumn, error) {
648648
blockBody := block.Block().Body()
649649
kzgCommitments, err := blockBody.BlobKzgCommitments()
650650
if err != nil {
651-
return nil, wrapWithBlockRoot(err, blockRoot, "could not get blob KZG commitments")
651+
return nil, wrapWithBlockRoot(err, blockRoot, "blob KZG commitments")
652652
}
653653

654654
// Collect KZG hashes for all blobs
@@ -657,20 +657,18 @@ func (s *Service) ReconstructDataColumnSidecars(ctx context.Context, block inter
657657
kzgHashes = append(kzgHashes, primitives.ConvertKzgCommitmentToVersionedHash(commitment))
658658
}
659659

660-
// Fetch all blobs from EL
661-
blobs, err := s.GetBlobsV2(ctx, kzgHashes)
660+
// Fetch all blobsAndCellsProofs from EL
661+
blobsAndCellsProofs, err := s.GetBlobsV2(ctx, kzgHashes)
662662
if err != nil {
663-
return nil, wrapWithBlockRoot(err, blockRoot, "could not get blobs")
663+
return nil, wrapWithBlockRoot(err, blockRoot, "get blobs V2")
664664
}
665665

666-
for _, blobAndCellProofs := range blobs {
666+
var cellsAndProofs []kzg.CellsAndProofs
667+
for _, blobAndCellProofs := range blobsAndCellsProofs {
667668
if blobAndCellProofs == nil {
668669
return nil, wrapWithBlockRoot(errors.New("unable to reconstruct data column sidecars, did not get all blobs from EL"), blockRoot, "")
669670
}
670-
}
671671

672-
var cellsAndProofs []kzg.CellsAndProofs
673-
for _, blobAndCellProofs := range blobs {
674672
var blob kzg.Blob
675673
copy(blob[:], blobAndCellProofs.Blob)
676674
cells, err := kzg.ComputeCells(&blob)
@@ -713,7 +711,7 @@ func (s *Service) ReconstructDataColumnSidecars(ctx context.Context, block inter
713711
verifiedRODataColumns[i] = blocks.NewVerifiedRODataColumn(roDataColumn)
714712
}
715713

716-
log.WithField("root", fmt.Sprintf("%x", blockRoot)).Debug("Data columns reconstructed successfully")
714+
log.WithField("root", fmt.Sprintf("%x", blockRoot)).Debug("Data columns reconstructed successfully from EL")
717715

718716
return verifiedRODataColumns, nil
719717
}

beacon-chain/execution/engine_client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2504,7 +2504,7 @@ func TestReconstructDataColumnSidecars(t *testing.T) {
25042504
ctx := context.Background()
25052505
t.Run("GetBlobsV2 is not supported", func(t *testing.T) {
25062506
_, err := client.ReconstructDataColumnSidecars(ctx, sb, r)
2507-
require.ErrorContains(t, "could not get blobs", err)
2507+
require.ErrorContains(t, "get blobs V2 for block", err)
25082508
})
25092509

25102510
t.Run("receiving all blobs", func(t *testing.T) {

beacon-chain/execution/testing/mock_engine_client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ type EngineClient struct {
3838
ErrGetPayload error
3939
BlobSidecars []blocks.VerifiedROBlob
4040
ErrorBlobSidecars error
41+
DataColumnSidecars []blocks.VerifiedRODataColumn
42+
ErrorDataColumnSidecars error
4143
}
4244

4345
// NewPayload --
@@ -114,7 +116,7 @@ func (e *EngineClient) ReconstructBlobSidecars(context.Context, interfaces.ReadO
114116
}
115117

116118
func (e *EngineClient) ReconstructDataColumnSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) ([]blocks.VerifiedRODataColumn, error) {
117-
return nil, nil
119+
return e.DataColumnSidecars, e.ErrorDataColumnSidecars
118120
}
119121

120122
// GetTerminalBlockHash --

beacon-chain/sync/initial-sync/blocks_fetcher_utils.go

Lines changed: 0 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ import (
88

99
"github.com/libp2p/go-libp2p/core/peer"
1010
"github.com/pkg/errors"
11-
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
1211
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
13-
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
1412
p2pTypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
1513
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
1614
"github.com/prysmaticlabs/prysm/v5/config/params"
@@ -382,33 +380,6 @@ func (f *blocksFetcher) calculateHeadAndTargetEpochs() (headEpoch, targetEpoch p
382380
return headEpoch, targetEpoch, peers
383381
}
384382

385-
// custodyGroupsFromPeer compute all the custody groups indexed by peer.
386-
func (f *blocksFetcher) custodyGroupsFromPeer(peers map[peer.ID]bool) (map[peer.ID]map[uint64]bool, error) {
387-
peerCount := len(peers)
388-
389-
custodyGroupsByPeer := make(map[peer.ID]map[uint64]bool, peerCount)
390-
for peer := range peers {
391-
// Get the node ID from the peer ID.
392-
nodeID, err := p2p.ConvertPeerIDToNodeID(peer)
393-
if err != nil {
394-
return nil, errors.Wrap(err, "convert peer ID to node ID")
395-
}
396-
397-
// Get the custody group count of the peer.
398-
custodyGroupCount := f.p2p.CustodyGroupCountFromPeer(peer)
399-
400-
// Retrieve the peer info.
401-
peerInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)
402-
if err != nil {
403-
return nil, errors.Wrap(err, "peer info")
404-
}
405-
406-
custodyGroupsByPeer[peer] = peerInfo.CustodyGroups
407-
}
408-
409-
return custodyGroupsByPeer, nil
410-
}
411-
412383
// uint64MapToSortedSlice produces a sorted uint64 slice from a map.
413384
func uint64MapToSortedSlice(input map[uint64]bool) []uint64 {
414385
output := make([]uint64, 0, len(input))
@@ -420,54 +391,6 @@ func uint64MapToSortedSlice(input map[uint64]bool) []uint64 {
420391
return output
421392
}
422393

423-
// `filterPeerWhichCustodyAtLeastOneDataColumn` filters peers which custody at least one data column
424-
// specified in `neededDataColumns`. It returns also a list of descriptions for non admissible peers.
425-
func filterPeerWhichCustodyAtLeastOneDataColumn(
426-
neededDataColumns map[uint64]bool,
427-
inputDataColumnsByPeer map[peer.ID]map[uint64]bool,
428-
) (map[peer.ID]map[uint64]bool, []string) {
429-
// Get the count of needed data columns.
430-
neededDataColumnsCount := uint64(len(neededDataColumns))
431-
432-
// Create pretty needed data columns for logs.
433-
var neededDataColumnsLog interface{} = "all"
434-
numberOfColumns := params.BeaconConfig().NumberOfColumns
435-
436-
if neededDataColumnsCount < numberOfColumns {
437-
neededDataColumnsLog = uint64MapToSortedSlice(neededDataColumns)
438-
}
439-
440-
outputDataColumnsByPeer := make(map[peer.ID]map[uint64]bool, len(inputDataColumnsByPeer))
441-
descriptions := make([]string, 0)
442-
443-
outerLoop:
444-
for peer, peerCustodyDataColumns := range inputDataColumnsByPeer {
445-
for neededDataColumn := range neededDataColumns {
446-
if peerCustodyDataColumns[neededDataColumn] {
447-
outputDataColumnsByPeer[peer] = peerCustodyDataColumns
448-
449-
continue outerLoop
450-
}
451-
}
452-
453-
peerCustodyColumnsCount := uint64(len(peerCustodyDataColumns))
454-
var peerCustodyColumnsLog interface{} = "all"
455-
456-
if peerCustodyColumnsCount < numberOfColumns {
457-
peerCustodyColumnsLog = uint64MapToSortedSlice(peerCustodyDataColumns)
458-
}
459-
460-
description := fmt.Sprintf(
461-
"peer %s: does not custody any needed column, custody columns: %v, needed columns: %v",
462-
peer, peerCustodyColumnsLog, neededDataColumnsLog,
463-
)
464-
465-
descriptions = append(descriptions, description)
466-
}
467-
468-
return outputDataColumnsByPeer, descriptions
469-
}
470-
471394
// Filter peers with head epoch lower than our target epoch for ByRange requests.
472395
func (f *blocksFetcher) filterPeersByTargetSlot(peers []peer.ID, targetSlot primitives.Slot) ([]peer.ID, []string, error) {
473396
filteredPeers := make([]peer.ID, 0, len(peers))

beacon-chain/sync/subscriber_beacon_blocks.go

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"path"
88

99
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
10+
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
1011
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition/interop"
1112
"github.com/prysmaticlabs/prysm/v5/config/features"
1213
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
@@ -35,6 +36,7 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
3536
return err
3637
}
3738

39+
// TODO: do we only do this for super nodes?
3840
go s.reconstructAndBroadcastBlobs(ctx, signed)
3941

4042
if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil {
@@ -60,18 +62,67 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
6062
}
6163

6264
// reconstructAndBroadcastBlobs processes and broadcasts blob sidecars for a given beacon block.
63-
// This function reconstructs the blob sidecars from the EL using the block's KZG commitments,
64-
// broadcasts the reconstructed blobs over P2P, and saves them into the blob storage.
6565
func (s *Service) reconstructAndBroadcastBlobs(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) {
66-
if block.Version() < version.Deneb {
66+
if block.Version() >= version.Fulu {
67+
s.reconstructAndBroadcastBlobsInDataColumn(ctx, block)
6768
return
6869
}
6970

70-
// TODO: Apply the equivalent strategy for data columns.
71-
if block.Version() >= version.Fulu {
71+
if block.Version() >= version.Deneb {
72+
s.reconstructAndBroadcastFullBlobs(ctx, block)
73+
return
74+
}
75+
}
76+
77+
// reconstructAndBroadcastBlobsInDataColumn reconstructs and broadcasts blobs in data column format for a given beacon block, it also saves data column sidecars into the blob storage.
78+
func (s *Service) reconstructAndBroadcastBlobsInDataColumn(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) {
79+
blockRoot, err := block.Block().HashTreeRoot()
80+
if err != nil {
81+
log.WithError(err).Error("Failed to calculate block root")
7282
return
7383
}
7484

85+
if s.cfg.blobStorage == nil {
86+
log.Warn("Blob storage is not enabled, skip saving data column, but continue to reconstruct and broadcast blobs")
87+
}
88+
89+
// when this function is called, it's from the time when the block is received, so in almost all situations we need to get the data column from EL instead of the blob storage.
90+
sidecars, err := s.cfg.executionReconstructor.ReconstructDataColumnSidecars(ctx, block, blockRoot)
91+
if err != nil {
92+
log.WithError(err).Debug("Cannot reconstruct data column sidecars after receiving the block")
93+
return
94+
}
95+
96+
nodeID := s.cfg.p2p.NodeID()
97+
s.cfg.custodyInfo.Mut.RLock()
98+
defer s.cfg.custodyInfo.Mut.RUnlock()
99+
samplingSize := s.cfg.custodyInfo.CustodyGroupSamplingSize(peerdas.Actual)
100+
info, _, err := peerdas.Info(nodeID, samplingSize)
101+
if err != nil {
102+
log.WithError(err).Error("Failed to get peer info")
103+
return
104+
}
105+
106+
// Broadcast data column and then save to db (if needs to be in custody)
107+
for _, sidecar := range sidecars {
108+
if !info.CustodyColumns[sidecar.ColumnIndex] {
109+
continue
110+
}
111+
112+
// first broadcast the data column
113+
if err := s.cfg.p2p.BroadcastDataColumn(ctx, blockRoot, sidecar.ColumnIndex, sidecar.DataColumnSidecar); err != nil {
114+
log.WithFields(dataColumnFields(sidecar.RODataColumn)).WithError(err).Error("Failed to broadcast data column")
115+
}
116+
117+
if err := s.receiveDataColumn(ctx, sidecar); err != nil {
118+
log.WithFields(dataColumnFields(sidecar.RODataColumn)).WithError(err).Error("Failed to receive data column")
119+
}
120+
}
121+
}
122+
123+
// reconstructAndBroadcastFullBlobs reconstructs the blob sidecars from the EL using the block's KZG commitments,
124+
// broadcasts the reconstructed blobs over P2P, and saves them into the blob storage.
125+
func (s *Service) reconstructAndBroadcastFullBlobs(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) {
75126
startTime, err := slots.ToTime(uint64(s.cfg.chain.GenesisTime().Unix()), block.Block().Slot())
76127
if err != nil {
77128
log.WithError(err).Error("Failed to convert slot to time")

0 commit comments

Comments
 (0)