Skip to content

Commit 08f0beb

Browse files
galaiozzzckck
authored andcommitted
p2p: fix forwarding block of proxied validator issue in EVN; (bnb-chain#3101)
1 parent aa1a499 commit 08f0beb

File tree

7 files changed

+196
-208
lines changed

7 files changed

+196
-208
lines changed

cmd/geth/chaincmd.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ func initNetwork(ctx *cli.Context) error {
479479
staticConnect = true
480480
}
481481

482-
configs, enodes, err := createConfigs(config, initDir, "node", ips, ports, sentryEnodes, connectOneExtraEnodes, staticConnect)
482+
configs, enodes, accounts, err := createConfigs(config, initDir, "node", ips, ports, sentryEnodes, connectOneExtraEnodes, staticConnect)
483483
if err != nil {
484484
utils.Fatalf("Failed to create node configs: %v", err)
485485
}
@@ -489,6 +489,11 @@ func initNetwork(ctx *cli.Context) error {
489489
nodeIDs[i] = enodes[i].ID()
490490
}
491491
// add more feature configs
492+
if enableSentryNode {
493+
for i := 0; i < len(sentryConfigs); i++ {
494+
sentryConfigs[i].Node.P2P.ProxyedValidatorAddresses = accounts[i]
495+
}
496+
}
492497
if ctx.Bool(utils.InitEVNValidatorWhitelist.Name) {
493498
for i := 0; i < size; i++ {
494499
configs[i].Node.P2P.EVNNodeIdsWhitelist = nodeIDs
@@ -501,7 +506,10 @@ func initNetwork(ctx *cli.Context) error {
501506
}
502507
if enableSentryNode && ctx.Bool(utils.InitEVNSentryWhitelist.Name) {
503508
for i := 0; i < len(sentryConfigs); i++ {
504-
sentryConfigs[i].Node.P2P.EVNNodeIdsWhitelist = sentryNodeIDs
509+
// whitelist all sentry nodes + proxyed validator NodeID
510+
wlNodeIDs := []enode.ID{nodeIDs[i]}
511+
wlNodeIDs = append(wlNodeIDs, sentryNodeIDs...)
512+
sentryConfigs[i].Node.P2P.EVNNodeIdsWhitelist = wlNodeIDs
505513
}
506514
}
507515
if enableSentryNode && ctx.Bool(utils.InitEVNSentryRegister.Name) {
@@ -555,8 +563,11 @@ func createSentryNodeConfigs(ctx *cli.Context, baseConfig gethConfig, initDir st
555563
if err != nil {
556564
utils.Fatalf("Failed to parse ports: %v", err)
557565
}
558-
559-
return createConfigs(baseConfig, initDir, "sentry", ips, ports, nil, false, true)
566+
configs, enodes, _, err := createConfigs(baseConfig, initDir, "sentry", ips, ports, nil, false, true)
567+
if err != nil {
568+
utils.Fatalf("Failed to create config: %v", err)
569+
}
570+
return configs, enodes, nil
560571
}
561572

562573
func createAndSaveFullNodeConfigs(ctx *cli.Context, inGenesisFile *os.File, baseConfig gethConfig, initDir string, extraEnodes []*enode.Node) ([]gethConfig, []*enode.Node, error) {
@@ -575,7 +586,7 @@ func createAndSaveFullNodeConfigs(ctx *cli.Context, inGenesisFile *os.File, base
575586
utils.Fatalf("Failed to parse ports: %v", err)
576587
}
577588

578-
configs, enodes, err := createConfigs(baseConfig, initDir, "fullnode", ips, ports, extraEnodes, false, false)
589+
configs, enodes, _, err := createConfigs(baseConfig, initDir, "fullnode", ips, ports, extraEnodes, false, false)
579590
if err != nil {
580591
utils.Fatalf("Failed to create config: %v", err)
581592
}
@@ -590,19 +601,24 @@ func createAndSaveFullNodeConfigs(ctx *cli.Context, inGenesisFile *os.File, base
590601
return configs, enodes, nil
591602
}
592603

593-
func createConfigs(base gethConfig, initDir string, prefix string, ips []string, ports []int, extraEnodes []*enode.Node, connectOneExtraEnodes bool, staticConnect bool) ([]gethConfig, []*enode.Node, error) {
604+
func createConfigs(base gethConfig, initDir string, prefix string, ips []string, ports []int, extraEnodes []*enode.Node, connectOneExtraEnodes bool, staticConnect bool) ([]gethConfig, []*enode.Node, [][]common.Address, error) {
594605
if len(ips) != len(ports) {
595-
return nil, nil, errors.New("mismatch of size and length of ports")
606+
return nil, nil, nil, errors.New("mismatch of size and length of ports")
596607
}
597608
size := len(ips)
598609
enodes := make([]*enode.Node, size)
610+
accounts := make([][]common.Address, size)
599611
for i := 0; i < size; i++ {
600612
nodeConfig := base.Node
601613
nodeConfig.DataDir = path.Join(initDir, fmt.Sprintf("%s%d", prefix, i))
602614
stack, err := node.New(&nodeConfig)
603615
if err != nil {
604-
return nil, nil, err
616+
return nil, nil, nil, err
605617
}
618+
if err := setAccountManagerBackends(stack.Config(), stack.AccountManager(), stack.KeyStoreDir()); err != nil {
619+
utils.Fatalf("Failed to set account manager backends: %v", err)
620+
}
621+
accounts[i] = stack.AccountManager().Accounts()
606622
pk := stack.Config().NodeKey()
607623
enodes[i] = enode.NewV4(&pk.PublicKey, net.ParseIP(ips[i]), ports[i], ports[i])
608624
}
@@ -618,7 +634,7 @@ func createConfigs(base gethConfig, initDir string, prefix string, ips []string,
618634
}
619635
configs[i] = createNodeConfig(base, ips[i], ports[i], allEnodes, index, staticConnect)
620636
}
621-
return configs, enodes, nil
637+
return configs, enodes, accounts, nil
622638
}
623639

624640
func writeConfig(inGenesisFile *os.File, config gethConfig, dir string) error {

eth/backend.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -377,22 +377,22 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
377377
// Permit the downloader to use the trie cache allowance during fast sync
378378
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
379379
if eth.handler, err = newHandler(&handlerConfig{
380-
NodeID: eth.p2pServer.Self().ID(),
381-
Database: chainDb,
382-
Chain: eth.blockchain,
383-
TxPool: eth.txPool,
384-
Network: networkID,
385-
Sync: config.SyncMode,
386-
BloomCache: uint64(cacheLimit),
387-
EventMux: eth.eventMux,
388-
RequiredBlocks: config.RequiredBlocks,
389-
DirectBroadcast: config.DirectBroadcast,
390-
EnableEVNFeatures: stack.Config().EnableEVNFeatures,
391-
EVNNodeIdsWhitelist: stack.Config().P2P.EVNNodeIdsWhitelist,
392-
ProxyedValidatorNodeIDs: stack.Config().P2P.ProxyedValidatorNodeIDs,
393-
DisablePeerTxBroadcast: config.DisablePeerTxBroadcast,
394-
PeerSet: peers,
395-
EnableQuickBlockFetching: stack.Config().EnableQuickBlockFetching,
380+
NodeID: eth.p2pServer.Self().ID(),
381+
Database: chainDb,
382+
Chain: eth.blockchain,
383+
TxPool: eth.txPool,
384+
Network: networkID,
385+
Sync: config.SyncMode,
386+
BloomCache: uint64(cacheLimit),
387+
EventMux: eth.eventMux,
388+
RequiredBlocks: config.RequiredBlocks,
389+
DirectBroadcast: config.DirectBroadcast,
390+
EnableEVNFeatures: stack.Config().EnableEVNFeatures,
391+
EVNNodeIdsWhitelist: stack.Config().P2P.EVNNodeIdsWhitelist,
392+
ProxyedValidatorAddresses: stack.Config().P2P.ProxyedValidatorAddresses,
393+
DisablePeerTxBroadcast: config.DisablePeerTxBroadcast,
394+
PeerSet: peers,
395+
EnableQuickBlockFetching: stack.Config().EnableQuickBlockFetching,
396396
}); err != nil {
397397
return nil, err
398398
}

eth/handler.go

Lines changed: 66 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -115,33 +115,33 @@ type votePool interface {
115115
// handlerConfig is the collection of initialization parameters to create a full
116116
// node network handler.
117117
type handlerConfig struct {
118-
NodeID enode.ID // P2P node ID used for tx propagation topology
119-
Database ethdb.Database // Database for direct sync insertions
120-
Chain *core.BlockChain // Blockchain to serve data from
121-
TxPool txPool // Transaction pool to propagate from
122-
VotePool votePool
123-
Network uint64 // Network identifier to adfvertise
124-
Sync ethconfig.SyncMode // Whether to snap or full sync
125-
BloomCache uint64 // Megabytes to alloc for snap sync bloom
126-
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
127-
RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges
128-
DirectBroadcast bool
129-
DisablePeerTxBroadcast bool
130-
PeerSet *peerSet
131-
EnableQuickBlockFetching bool
132-
EnableEVNFeatures bool
133-
EVNNodeIdsWhitelist []enode.ID
134-
ProxyedValidatorNodeIDs []enode.ID
118+
NodeID enode.ID // P2P node ID used for tx propagation topology
119+
Database ethdb.Database // Database for direct sync insertions
120+
Chain *core.BlockChain // Blockchain to serve data from
121+
TxPool txPool // Transaction pool to propagate from
122+
VotePool votePool
123+
Network uint64 // Network identifier to adfvertise
124+
Sync ethconfig.SyncMode // Whether to snap or full sync
125+
BloomCache uint64 // Megabytes to alloc for snap sync bloom
126+
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
127+
RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges
128+
DirectBroadcast bool
129+
DisablePeerTxBroadcast bool
130+
PeerSet *peerSet
131+
EnableQuickBlockFetching bool
132+
EnableEVNFeatures bool
133+
EVNNodeIdsWhitelist []enode.ID
134+
ProxyedValidatorAddresses []common.Address
135135
}
136136

137137
type handler struct {
138-
nodeID enode.ID
139-
networkID uint64
140-
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
141-
disablePeerTxBroadcast bool
142-
enableEVNFeatures bool
143-
evnNodeIdsWhitelistMap map[enode.ID]struct{}
144-
proxyedValidatorNodeIDMap map[enode.ID]struct{}
138+
nodeID enode.ID
139+
networkID uint64
140+
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
141+
disablePeerTxBroadcast bool
142+
enableEVNFeatures bool
143+
evnNodeIdsWhitelistMap map[enode.ID]struct{}
144+
proxyedValidatorAddressMap map[common.Address]struct{}
145145

146146
snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks)
147147
synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing)
@@ -196,32 +196,32 @@ func newHandler(config *handlerConfig) (*handler, error) {
196196
config.PeerSet = newPeerSet() // Nicety initialization for tests
197197
}
198198
h := &handler{
199-
nodeID: config.NodeID,
200-
networkID: config.Network,
201-
forkFilter: forkid.NewFilter(config.Chain),
202-
disablePeerTxBroadcast: config.DisablePeerTxBroadcast,
203-
eventMux: config.EventMux,
204-
database: config.Database,
205-
txpool: config.TxPool,
206-
votepool: config.VotePool,
207-
chain: config.Chain,
208-
peers: config.PeerSet,
209-
peersPerIP: make(map[string]int),
210-
requiredBlocks: config.RequiredBlocks,
211-
directBroadcast: config.DirectBroadcast,
212-
enableEVNFeatures: config.EnableEVNFeatures,
213-
evnNodeIdsWhitelistMap: make(map[enode.ID]struct{}),
214-
proxyedValidatorNodeIDMap: make(map[enode.ID]struct{}),
215-
quitSync: make(chan struct{}),
216-
handlerDoneCh: make(chan struct{}),
217-
handlerStartCh: make(chan struct{}),
218-
stopCh: make(chan struct{}),
199+
nodeID: config.NodeID,
200+
networkID: config.Network,
201+
forkFilter: forkid.NewFilter(config.Chain),
202+
disablePeerTxBroadcast: config.DisablePeerTxBroadcast,
203+
eventMux: config.EventMux,
204+
database: config.Database,
205+
txpool: config.TxPool,
206+
votepool: config.VotePool,
207+
chain: config.Chain,
208+
peers: config.PeerSet,
209+
peersPerIP: make(map[string]int),
210+
requiredBlocks: config.RequiredBlocks,
211+
directBroadcast: config.DirectBroadcast,
212+
enableEVNFeatures: config.EnableEVNFeatures,
213+
evnNodeIdsWhitelistMap: make(map[enode.ID]struct{}),
214+
proxyedValidatorAddressMap: make(map[common.Address]struct{}),
215+
quitSync: make(chan struct{}),
216+
handlerDoneCh: make(chan struct{}),
217+
handlerStartCh: make(chan struct{}),
218+
stopCh: make(chan struct{}),
219219
}
220220
for _, nodeID := range config.EVNNodeIdsWhitelist {
221221
h.evnNodeIdsWhitelistMap[nodeID] = struct{}{}
222222
}
223-
for _, nodeID := range config.ProxyedValidatorNodeIDs {
224-
h.proxyedValidatorNodeIDMap[nodeID] = struct{}{}
223+
for _, address := range config.ProxyedValidatorAddresses {
224+
h.proxyedValidatorAddressMap[address] = struct{}{}
225225
}
226226
if config.Sync == ethconfig.FullSync {
227227
// The database seems empty as the current block is the genesis. Yet the snap
@@ -333,6 +333,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
333333
block := types.NewBlockWithHeader(item.Header).WithBody(types.Body{Transactions: item.Txs, Uncles: item.Uncles})
334334
block = block.WithSidecars(item.Sidecars)
335335
block.ReceivedAt = time.Now()
336+
block.ReceivedFrom = p.ID()
336337
if err := block.SanityCheck(); err != nil {
337338
return nil, err
338339
}
@@ -399,7 +400,7 @@ func (h *handler) protoTracker() {
399400
if h.enableEVNFeatures {
400401
// add onchain validator p2p node list later, it will enable the direct broadcast + no tx broadcast feature
401402
// here check & enable peer broadcast features periodically, and it's a simple way to handle the peer change and the list change scenarios.
402-
h.peers.enableEVNFeatures(h.queryValidatorNodeIDsMap(), h.evnNodeIdsWhitelistMap, h.proxyedValidatorNodeIDMap)
403+
h.peers.enableEVNFeatures(h.queryValidatorNodeIDsMap(), h.evnNodeIdsWhitelistMap)
403404
}
404405
case <-h.quitSync:
405406
// Wait for all active handlers to finish.
@@ -833,38 +834,34 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
833834
}
834835

835836
for _, peer := range transfer {
836-
log.Debug("broadcast block to peer", "hash", hash, "peer", peer.ID(), "ProxyedValidatorFlag", peer.ProxyedValidatorFlag.Load(), "EVNPeerFlag", peer.EVNPeerFlag.Load())
837+
log.Debug("broadcast block to peer", "hash", hash, "peer", peer.ID(), "EVNPeerFlag", peer.EVNPeerFlag.Load())
837838
peer.AsyncSendNewBlock(block, td)
838839
}
839840

840841
// check if the block should be broadcast to more peers in EVN
841-
fullBroadcastInEVN := h.needFullBroadcastInEVN(block)
842842
var morePeers []*ethPeer
843-
for i := len(transfer); i < len(peers); i++ {
844-
if peers[i].ProxyedValidatorFlag.Load() {
845-
morePeers = append(morePeers, peers[i])
846-
continue
843+
if h.needFullBroadcastInEVN(block) {
844+
for i := len(transfer); i < len(peers); i++ {
845+
if peers[i].EVNPeerFlag.Load() {
846+
morePeers = append(morePeers, peers[i])
847+
}
847848
}
848-
if fullBroadcastInEVN && peers[i].EVNPeerFlag.Load() {
849-
morePeers = append(morePeers, peers[i])
850-
continue
849+
for _, peer := range morePeers {
850+
log.Debug("broadcast block to extra peer", "hash", hash, "peer", peer.ID(), "EVNPeerFlag", peer.EVNPeerFlag.Load())
851+
peer.AsyncSendNewBlock(block, td)
851852
}
852853
}
853-
for _, peer := range morePeers {
854-
log.Debug("broadcast block to extra peer", "hash", hash, "peer", peer.ID(), "ProxyedValidatorFlag", peer.ProxyedValidatorFlag.Load(), "EVNPeerFlag", peer.EVNPeerFlag.Load())
855-
peer.AsyncSendNewBlock(block, td)
856-
}
857854

858-
log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "extra", len(morePeers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
855+
log.Debug("Propagated block", "hash", hash, "recipients", len(transfer), "extra", len(morePeers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
859856
return
860857
}
861858
// Otherwise if the block is indeed in our own chain, announce it
862859
if h.chain.HasBlock(hash, block.NumberU64()) {
863860
for _, peer := range peers {
864-
log.Debug("Announced block to peer", "hash", hash, "peer", peer.ID(), "ProxyedValidatorFlag", peer.ProxyedValidatorFlag.Load(), "EVNPeerFlag", peer.EVNPeerFlag.Load())
861+
log.Debug("Announced block to peer", "hash", hash, "peer", peer.ID(), "EVNPeerFlag", peer.EVNPeerFlag.Load())
865862
peer.AsyncSendNewBlockHash(block)
866863
}
867-
log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
864+
log.Debug("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
868865
}
869866
}
870867

@@ -875,15 +872,19 @@ func (h *handler) needFullBroadcastInEVN(block *types.Block) bool {
875872
if !h.enableEVNFeatures {
876873
return false
877874
}
875+
878876
parlia, ok := h.chain.Engine().(*parlia.Parlia)
879877
if !ok {
880878
return false
881879
}
882-
if parlia.ConsensusAddress() == block.Coinbase() {
880+
coinbase := block.Coinbase()
881+
// check whether the block is created by self
882+
if parlia.ConsensusAddress() == coinbase {
883+
log.Debug("full broadcast mined block to EVN", "coinbase", coinbase)
883884
return true
884885
}
885886

886-
return h.peers.isProxyedValidator(block.Coinbase(), h.proxyedValidatorNodeIDMap)
887+
return h.peers.isProxyedValidator(coinbase, h.proxyedValidatorAddressMap)
887888
}
888889

889890
func (h *handler) queryValidatorNodeIDsMap() map[common.Address][]enode.ID {

0 commit comments

Comments
 (0)