Skip to content

Commit ddf5a39

Browse files
nirannalepae
andauthored
Fetch data columns from multiple peers instead of just supernodes (#14977)
* Extract the block fetcher's peer selection logic for data columns so it can be used in both by range and by root requests * Refactor data column sidecar request to send requests to multiple peers instead of supernodes * Remove comment * Remove unused method * Add tests for dmissiblePeersForDataColumns * Extract data column fetching into standalone functions * Remove AdmissibleCustodyGroupsPeers and replace the final call with requests to multiple peers * Apply suggestions from code review Co-authored-by: Manu NALEPA <[email protected]> * Wrap errors * Use cached peedas.Info and properly convert custody groups to custody columns * Rename filterPeersForRangeReq * Preserve debugging descriptions when filtering out peers * Remove unused functions. * Initialize nested maps * Fix comment * First pass at retry logic for data column requests * Select fresh peers for each retry * Return an error if there are requested columns remaining * Adjust errors * Improve slightly the godoc. * Improve wrapped error messages. * `AdmissiblePeersForDataColumns`: Use value or `range`. * Remove `convertCustodyGroupsToDataColumnsByPeer` since used only once. * Minor fixes. * Retry until we run out of peers * Delete from the map of peers instead of filtering * Remove unneeded break * WIP: TestRequestDataColumnSidecars * `RequestDataColumnSidecars`: Move the happy path in the for loop. * Convert the peer ID to a node ID instead of using peer.EnodeID * Extract AdmissiblePeersForDataColumns from a method into a function and use it (instead of a mock) in TestRequestDataColumnSidecars * Track data column requests in tests to compare vs expectations * Run gazelle * Clean up test config changes so other tests don't break * Clean up comments * Minor changes. * Add tests for peers that don't respond with all requested columns * Respect MaxRequestDataColumnSidecars --------- Co-authored-by: Manu NALEPA <[email protected]> Co-authored-by: Manu NALEPA <[email protected]>
1 parent 92d2fc1 commit ddf5a39

File tree

14 files changed

+1314
-646
lines changed

14 files changed

+1314
-646
lines changed

beacon-chain/p2p/custody.go

Lines changed: 1 addition & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -2,90 +2,13 @@ package p2p
22

33
import (
44
"github.com/libp2p/go-libp2p/core/peer"
5-
"github.com/pkg/errors"
65
"github.com/sirupsen/logrus"
76

87
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
98
"github.com/prysmaticlabs/prysm/v5/config/params"
109
)
1110

12-
// AdmissibleCustodyGroupsPeers returns a list of peers that custody a super set of the local node's custody groups.
13-
func (s *Service) AdmissibleCustodyGroupsPeers(peers []peer.ID) ([]peer.ID, error) {
14-
localCustodyGroupCount := s.cfg.CustodyInfo.ActualGroupCount()
15-
return s.custodyGroupsAdmissiblePeers(peers, localCustodyGroupCount)
16-
}
17-
18-
// AdmissibleCustodySamplingPeers returns a list of peers that custody a super set of the local node's sampling columns.
19-
func (s *Service) AdmissibleCustodySamplingPeers(peers []peer.ID) ([]peer.ID, error) {
20-
localSubnetSamplingSize := s.cfg.CustodyInfo.CustodyGroupSamplingSize(peerdas.Actual)
21-
return s.custodyGroupsAdmissiblePeers(peers, localSubnetSamplingSize)
22-
}
23-
24-
// custodyGroupsAdmissiblePeers filters out `peers` that do not custody a super set of our own custody groups.
25-
func (s *Service) custodyGroupsAdmissiblePeers(peers []peer.ID, custodyGroupCount uint64) ([]peer.ID, error) {
26-
// Get the total number of custody groups.
27-
numberOfCustodyGroups := params.BeaconConfig().NumberOfCustodyGroups
28-
29-
// Retrieve the local node ID.
30-
localNodeId := s.NodeID()
31-
32-
// Retrieve the local node info.
33-
localNodeInfo, _, err := peerdas.Info(localNodeId, custodyGroupCount)
34-
if err != nil {
35-
return nil, errors.Wrap(err, "peer info")
36-
}
37-
38-
// Retrieve the needed custody groups.
39-
neededCustodyGroups := localNodeInfo.CustodyGroups
40-
41-
// Find the valid peers.
42-
validPeers := make([]peer.ID, 0, len(peers))
43-
44-
loop:
45-
for _, pid := range peers {
46-
// Get the custody group count of the remote peer.
47-
remoteCustodyGroupCount := s.CustodyGroupCountFromPeer(pid)
48-
49-
// If the remote peer custodies less groups than we do, skip it.
50-
if remoteCustodyGroupCount < custodyGroupCount {
51-
continue
52-
}
53-
54-
// Get the remote node ID from the peer ID.
55-
remoteNodeID, err := ConvertPeerIDToNodeID(pid)
56-
if err != nil {
57-
return nil, errors.Wrap(err, "convert peer ID to node ID")
58-
}
59-
60-
// Retrieve the remote peer info.
61-
remotePeerInfo, _, err := peerdas.Info(remoteNodeID, remoteCustodyGroupCount)
62-
if err != nil {
63-
return nil, errors.Wrap(err, "peer info")
64-
}
65-
66-
// Retrieve the custody groups of the remote peer.
67-
remoteCustodyGroups := remotePeerInfo.CustodyGroups
68-
remoteCustodyGroupsCount := uint64(len(remoteCustodyGroups))
69-
70-
// If the remote peers custodies all the possible columns, add it to the list.
71-
if remoteCustodyGroupsCount == numberOfCustodyGroups {
72-
validPeers = append(validPeers, pid)
73-
continue
74-
}
75-
76-
// Filter out invalid peers.
77-
for custodyGroup := range neededCustodyGroups {
78-
if !remoteCustodyGroups[custodyGroup] {
79-
continue loop
80-
}
81-
}
82-
83-
// Add valid peer to list
84-
validPeers = append(validPeers, pid)
85-
}
86-
87-
return validPeers, nil
88-
}
11+
var _ DataColumnsHandler = (*Service)(nil)
8912

9013
// custodyGroupCountFromPeerENR retrieves the custody count from the peer ENR.
9114
// If the ENR is not available, it defaults to the minimum number of custody groups

beacon-chain/p2p/custody_test.go

Lines changed: 0 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -2,109 +2,20 @@ package p2p
22

33
import (
44
"context"
5-
"crypto/ecdsa"
6-
"net"
75
"testing"
8-
"time"
96

10-
"github.com/ethereum/go-ethereum/p2p/discover"
11-
"github.com/ethereum/go-ethereum/p2p/enode"
127
"github.com/ethereum/go-ethereum/p2p/enr"
13-
"github.com/libp2p/go-libp2p/core/crypto"
148
"github.com/libp2p/go-libp2p/core/network"
15-
"github.com/libp2p/go-libp2p/core/peer"
169
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
1710
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
1811
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/scorers"
1912
"github.com/prysmaticlabs/prysm/v5/config/params"
2013
"github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper"
21-
ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa"
22-
prysmNetwork "github.com/prysmaticlabs/prysm/v5/network"
2314
pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
2415
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/metadata"
2516
"github.com/prysmaticlabs/prysm/v5/testing/require"
2617
)
2718

28-
func createPeer(t *testing.T, privateKeyOffset int, custodyCount uint64) (*enr.Record, peer.ID, *ecdsa.PrivateKey) {
29-
privateKeyBytes := make([]byte, 32)
30-
for i := 0; i < 32; i++ {
31-
privateKeyBytes[i] = byte(privateKeyOffset + i)
32-
}
33-
34-
unmarshalledPrivateKey, err := crypto.UnmarshalSecp256k1PrivateKey(privateKeyBytes)
35-
require.NoError(t, err)
36-
37-
privateKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(unmarshalledPrivateKey)
38-
require.NoError(t, err)
39-
40-
peerID, err := peer.IDFromPrivateKey(unmarshalledPrivateKey)
41-
require.NoError(t, err)
42-
43-
record := &enr.Record{}
44-
record.Set(peerdas.Cgc(custodyCount))
45-
record.Set(enode.Secp256k1(privateKey.PublicKey))
46-
47-
return record, peerID, privateKey
48-
}
49-
50-
func TestAdmissibleCustodyGroupsPeers(t *testing.T) {
51-
genesisValidatorRoot := make([]byte, 32)
52-
53-
for i := 0; i < 32; i++ {
54-
genesisValidatorRoot[i] = byte(i)
55-
}
56-
57-
service := &Service{
58-
cfg: &Config{CustodyInfo: &peerdas.CustodyInfo{}},
59-
genesisTime: time.Now(),
60-
genesisValidatorsRoot: genesisValidatorRoot,
61-
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
62-
ScorerParams: &scorers.Config{},
63-
}),
64-
}
65-
66-
ipAddrString, err := prysmNetwork.ExternalIPv4()
67-
require.NoError(t, err)
68-
ipAddr := net.ParseIP(ipAddrString)
69-
70-
custodyRequirement := params.BeaconConfig().CustodyRequirement
71-
dataColumnSidecarSubnetCount := params.BeaconConfig().DataColumnSidecarSubnetCount
72-
73-
// Peer 1 custodies exactly the same groups than us.
74-
// (We use the same keys pair than ours for simplicity)
75-
peer1Record, peer1ID, localPrivateKey := createPeer(t, 1, custodyRequirement)
76-
77-
// Peer 2 custodies all the groups.
78-
peer2Record, peer2ID, _ := createPeer(t, 2, dataColumnSidecarSubnetCount)
79-
80-
// Peer 3 custodies different groups than us (but the same count).
81-
// (We use the same public key than peer 2 for simplicity)
82-
peer3Record, peer3ID, _ := createPeer(t, 3, custodyRequirement)
83-
84-
// Peer 4 custodies less groups than us.
85-
peer4Record, peer4ID, _ := createPeer(t, 4, custodyRequirement-1)
86-
87-
createListener := func() (*discover.UDPv5, error) {
88-
return service.createListener(ipAddr, localPrivateKey)
89-
}
90-
91-
listener, err := newListener(createListener)
92-
require.NoError(t, err)
93-
94-
service.dv5Listener = listener
95-
96-
service.peers.Add(peer1Record, peer1ID, nil, network.DirOutbound)
97-
service.peers.Add(peer2Record, peer2ID, nil, network.DirOutbound)
98-
service.peers.Add(peer3Record, peer3ID, nil, network.DirOutbound)
99-
service.peers.Add(peer4Record, peer4ID, nil, network.DirOutbound)
100-
101-
actual, err := service.AdmissibleCustodyGroupsPeers([]peer.ID{peer1ID, peer2ID, peer3ID, peer4ID})
102-
require.NoError(t, err)
103-
104-
expected := []peer.ID{peer1ID, peer2ID}
105-
require.DeepSSZEqual(t, expected, actual)
106-
}
107-
10819
func TestCustodyGroupCountFromPeer(t *testing.T) {
10920
const (
11021
expectedENR uint64 = 7

beacon-chain/p2p/interfaces.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,4 @@ type MetadataProvider interface {
115115

116116
type DataColumnsHandler interface {
117117
CustodyGroupCountFromPeer(peer.ID) uint64
118-
AdmissibleCustodyGroupsPeers([]peer.ID) ([]peer.ID, error)
119-
AdmissibleCustodySamplingPeers([]peer.ID) ([]peer.ID, error)
120118
}

beacon-chain/p2p/testing/fuzz_p2p.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,3 @@ func (*FakeP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.Disc
188188
func (*FakeP2P) CustodyGroupCountFromPeer(peer.ID) uint64 {
189189
return 0
190190
}
191-
192-
func (*FakeP2P) AdmissibleCustodyGroupsPeers(peers []peer.ID) ([]peer.ID, error) {
193-
return peers, nil
194-
}
195-
196-
func (*FakeP2P) AdmissibleCustodySamplingPeers(peers []peer.ID) ([]peer.ID, error) {
197-
return peers, nil
198-
}

beacon-chain/p2p/testing/p2p.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -466,11 +466,3 @@ func (s *TestP2P) CustodyGroupCountFromPeer(pid peer.ID) uint64 {
466466

467467
return custodyGroupCount
468468
}
469-
470-
func (*TestP2P) AdmissibleCustodyGroupsPeers(peers []peer.ID) ([]peer.ID, error) {
471-
return peers, nil
472-
}
473-
474-
func (*TestP2P) AdmissibleCustodySamplingPeers(peers []peer.ID) ([]peer.ID, error) {
475-
return peers, nil
476-
}

beacon-chain/sync/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
"block_batcher.go",
88
"broadcast_bls_changes.go",
99
"context.go",
10+
"data_columns.go",
1011
"data_columns_reconstruct.go",
1112
"data_columns_sampling.go",
1213
"deadlines.go",
@@ -132,6 +133,7 @@ go_library(
132133
"//time:go_default_library",
133134
"//time/slots:go_default_library",
134135
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
136+
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
135137
"@com_github_hashicorp_golang_lru//:go_default_library",
136138
"@com_github_libp2p_go_libp2p//core:go_default_library",
137139
"@com_github_libp2p_go_libp2p//core/host:go_default_library",
@@ -164,6 +166,7 @@ go_test(
164166
"context_test.go",
165167
"data_columns_reconstruct_test.go",
166168
"data_columns_sampling_test.go",
169+
"data_columns_test.go",
167170
"decode_pubsub_test.go",
168171
"error_test.go",
169172
"fork_watcher_test.go",
@@ -249,6 +252,7 @@ go_test(
249252
"//container/leaky-bucket:go_default_library",
250253
"//container/slice:go_default_library",
251254
"//crypto/bls:go_default_library",
255+
"//crypto/ecdsa:go_default_library",
252256
"//crypto/rand:go_default_library",
253257
"//encoding/bytesutil:go_default_library",
254258
"//encoding/ssz/equality:go_default_library",
@@ -268,6 +272,7 @@ go_test(
268272
"@com_github_d4l3k_messagediff//:go_default_library",
269273
"@com_github_ethereum_go_ethereum//common:go_default_library",
270274
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
275+
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
271276
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
272277
"@com_github_golang_snappy//:go_default_library",
273278
"@com_github_libp2p_go_libp2p//:go_default_library",

0 commit comments

Comments
 (0)