Skip to content

Commit 1f82b70

Browse files
authored
fix(sensor): handle bor state sync tx type (#804)
* fix(sensor): handle bor state sync tx type * fix: support eth/69 (kinda) * fix: better logging
1 parent df43076 commit 1f82b70

File tree

2 files changed

+156
-29
lines changed

2 files changed

+156
-29
lines changed

p2p/protocol.go

Lines changed: 76 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/hex"
66
"errors"
77
"fmt"
8+
"io"
89
"math/big"
910
"time"
1011

@@ -14,6 +15,7 @@ import (
1415
"github.com/ethereum/go-ethereum/eth/protocols/eth"
1516
ethp2p "github.com/ethereum/go-ethereum/p2p"
1617
"github.com/ethereum/go-ethereum/p2p/enode"
18+
"github.com/ethereum/go-ethereum/rlp"
1719
"github.com/prometheus/client_golang/prometheus"
1820
"github.com/rs/zerolog"
1921
"github.com/rs/zerolog/log"
@@ -370,16 +372,25 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {
370372
}
371373

372374
func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error {
373-
var txs eth.TransactionsPacket
374-
if err := msg.Decode(&txs); err != nil {
375-
return err
375+
payload, err := io.ReadAll(msg.Payload)
376+
if err != nil {
377+
return fmt.Errorf("failed to read transactions payload: %w", err)
376378
}
377379

380+
var rawTxs []rlp.RawValue
381+
if err := rlp.DecodeBytes(payload, &rawTxs); err != nil {
382+
c.logger.Warn().Err(err).Msg("Failed to decode transactions")
383+
return nil
384+
}
385+
386+
txs := decodeTxs(rawTxs)
378387
tfs := time.Now()
379388

380-
c.countMsgReceived(txs.Name(), float64(len(txs)))
389+
c.countMsgReceived((&eth.TransactionsPacket{}).Name(), float64(len(txs)))
381390

382-
c.db.WriteTransactions(ctx, c.node, txs, tfs)
391+
if len(txs) > 0 {
392+
c.db.WriteTransactions(ctx, c.node, txs, tfs)
393+
}
383394

384395
return nil
385396
}
@@ -449,18 +460,18 @@ func (c *conn) handleGetBlockBodies(msg ethp2p.Msg) error {
449460
}
450461

451462
func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error {
452-
var packet eth.BlockBodiesPacket
463+
var packet eth.BlockBodiesRLPPacket
453464
if err := msg.Decode(&packet); err != nil {
454465
return err
455466
}
456467

457468
tfs := time.Now()
458469

459-
if len(packet.BlockBodiesResponse) == 0 {
470+
if len(packet.BlockBodiesRLPResponse) == 0 {
460471
return nil
461472
}
462473

463-
c.countMsgReceived(packet.Name(), float64(len(packet.BlockBodiesResponse)))
474+
c.countMsgReceived((&eth.BlockBodiesPacket{}).Name(), float64(len(packet.BlockBodiesRLPResponse)))
464475

465476
hash, ok := c.requests.Get(packet.RequestId)
466477
if !ok {
@@ -474,7 +485,18 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error {
474485
return nil
475486
}
476487

477-
body := packet.BlockBodiesResponse[0]
488+
var decoded rawBlockBody
489+
if err := rlp.DecodeBytes(packet.BlockBodiesRLPResponse[0], &decoded); err != nil {
490+
c.logger.Warn().Err(err).Msg("Failed to decode block body")
491+
return nil
492+
}
493+
494+
body := &eth.BlockBody{
495+
Transactions: decodeTxs(decoded.Transactions),
496+
Uncles: decoded.Uncles,
497+
Withdrawals: decoded.Withdrawals,
498+
}
499+
478500
c.db.WriteBlockBody(ctx, body, hash, tfs)
479501

480502
// Update cache to store body
@@ -487,26 +509,39 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error {
487509
}
488510

489511
func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error {
490-
var block eth.NewBlockPacket
491-
if err := msg.Decode(&block); err != nil {
492-
return err
512+
payload, err := io.ReadAll(msg.Payload)
513+
if err != nil {
514+
return fmt.Errorf("failed to read new block payload: %w", err)
493515
}
494516

517+
var raw rawNewBlockPacket
518+
if err := rlp.DecodeBytes(payload, &raw); err != nil {
519+
c.logger.Warn().Err(err).Msg("Failed to decode new block")
520+
return nil
521+
}
522+
523+
block := types.NewBlockWithHeader(raw.Block.Header).WithBody(types.Body{
524+
Transactions: decodeTxs(raw.Block.Txs),
525+
Uncles: raw.Block.Uncles,
526+
Withdrawals: raw.Block.Withdrawals,
527+
})
528+
packet := &eth.NewBlockPacket{Block: block, TD: raw.TD}
529+
495530
tfs := time.Now()
496-
hash := block.Block.Hash()
531+
hash := packet.Block.Hash()
497532

498-
c.countMsgReceived(block.Name(), 1)
533+
c.countMsgReceived(packet.Name(), 1)
499534

500535
// Set the head block if newer.
501-
if c.conns.UpdateHeadBlock(block) {
536+
if c.conns.UpdateHeadBlock(*packet) {
502537
c.logger.Info().
503538
Str("hash", hash.Hex()).
504-
Uint64("number", block.Block.Number().Uint64()).
505-
Str("td", block.TD.String()).
539+
Uint64("number", packet.Block.Number().Uint64()).
540+
Str("td", packet.TD.String()).
506541
Msg("Updated head block")
507542
}
508543

509-
if err := c.getParentBlock(ctx, block.Block.Header()); err != nil {
544+
if err := c.getParentBlock(ctx, packet.Block.Header()); err != nil {
510545
return err
511546
}
512547

@@ -515,17 +550,17 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error {
515550
return nil
516551
}
517552

518-
c.db.WriteBlock(ctx, c.node, block.Block, block.TD, tfs)
553+
c.db.WriteBlock(ctx, c.node, packet.Block, packet.TD, tfs)
519554

520555
// Update cache to store the full block
521556
c.conns.Blocks().Add(hash, BlockCache{
522-
Header: block.Block.Header(),
557+
Header: packet.Block.Header(),
523558
Body: &eth.BlockBody{
524-
Transactions: block.Block.Transactions(),
525-
Uncles: block.Block.Uncles(),
526-
Withdrawals: block.Block.Withdrawals(),
559+
Transactions: packet.Block.Transactions(),
560+
Uncles: packet.Block.Uncles(),
561+
Withdrawals: packet.Block.Withdrawals(),
527562
},
528-
TD: block.TD,
563+
TD: packet.TD,
529564
})
530565

531566
return nil
@@ -549,7 +584,7 @@ func (c *conn) handleNewPooledTransactionHashes(version uint, msg ethp2p.Msg) er
549584
var name string
550585

551586
switch version {
552-
case 67, 68:
587+
case 67, 68, 69:
553588
var txs eth.NewPooledTransactionHashesPacket
554589
if err := msg.Decode(&txs); err != nil {
555590
return err
@@ -572,16 +607,28 @@ func (c *conn) handleNewPooledTransactionHashes(version uint, msg ethp2p.Msg) er
572607
}
573608

574609
func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) error {
575-
var packet eth.PooledTransactionsPacket
576-
if err := msg.Decode(&packet); err != nil {
577-
return err
610+
payload, err := io.ReadAll(msg.Payload)
611+
if err != nil {
612+
return fmt.Errorf("failed to read pooled transactions payload: %w", err)
613+
}
614+
615+
var raw rawPooledTransactionsPacket
616+
if err := rlp.DecodeBytes(payload, &raw); err != nil {
617+
c.logger.Warn().Err(err).Msg("Failed to decode pooled transactions")
618+
return nil
619+
}
620+
621+
packet := &eth.PooledTransactionsPacket{
622+
PooledTransactionsResponse: decodeTxs(raw.Txs),
578623
}
579624

580625
tfs := time.Now()
581626

582627
c.countMsgReceived(packet.Name(), float64(len(packet.PooledTransactionsResponse)))
583628

584-
c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse, tfs)
629+
if len(packet.PooledTransactionsResponse) > 0 {
630+
c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse, tfs)
631+
}
585632

586633
return nil
587634
}

p2p/types.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,22 @@ package p2p
33
import (
44
"crypto/ecdsa"
55
"fmt"
6+
"math/big"
67
"slices"
78
"time"
89

910
"github.com/ethereum/go-ethereum/common"
1011
"github.com/ethereum/go-ethereum/core/stateless"
12+
"github.com/ethereum/go-ethereum/core/types"
13+
"github.com/ethereum/go-ethereum/crypto"
1114
"github.com/ethereum/go-ethereum/eth/protocols/eth"
1215
"github.com/ethereum/go-ethereum/eth/protocols/snap"
1316
"github.com/ethereum/go-ethereum/p2p"
1417
"github.com/ethereum/go-ethereum/p2p/enode"
1518
"github.com/ethereum/go-ethereum/p2p/rlpx"
1619
"github.com/ethereum/go-ethereum/rlp"
1720
"github.com/rs/zerolog"
21+
"github.com/rs/zerolog/log"
1822
)
1923

2024
type Message interface {
@@ -402,3 +406,79 @@ func (c *rlpxConn) hasCap(name string, version uint) bool {
402406
cap := p2p.Cap{Name: name, Version: version}
403407
return slices.Contains(c.peerCaps, cap) && slices.Contains(c.caps, cap)
404408
}
409+
410+
// rawBlockBody is used to decode block bodies with any transaction type.
411+
type rawBlockBody struct {
412+
Transactions []rlp.RawValue
413+
Uncles []*types.Header
414+
Withdrawals []*types.Withdrawal `rlp:"optional"`
415+
}
416+
417+
// rawBlock is used to decode blocks with any transaction type.
418+
type rawBlock struct {
419+
Header *types.Header
420+
Txs []rlp.RawValue
421+
Uncles []*types.Header
422+
Withdrawals []*types.Withdrawal `rlp:"optional"`
423+
}
424+
425+
// rawNewBlockPacket is used to decode NewBlockMsg with any transaction type.
426+
type rawNewBlockPacket struct {
427+
Block rawBlock
428+
TD *big.Int
429+
}
430+
431+
// rawPooledTransactionsPacket is used to decode PooledTransactionsMsg with any transaction type.
432+
type rawPooledTransactionsPacket struct {
433+
RequestId uint64
434+
Txs []rlp.RawValue
435+
}
436+
437+
// decodeTx attempts to decode a transaction from an RLP-encoded raw value.
438+
func decodeTx(raw []byte) *types.Transaction {
439+
if len(raw) == 0 {
440+
return nil
441+
}
442+
443+
var bytes []byte
444+
if rlp.DecodeBytes(raw, &bytes) == nil {
445+
tx := new(types.Transaction)
446+
if tx.UnmarshalBinary(bytes) == nil {
447+
return tx
448+
}
449+
450+
log.Warn().
451+
Uint8("type", bytes[0]).
452+
Int("size", len(bytes)).
453+
Str("hash", crypto.Keccak256Hash(bytes).Hex()).
454+
Msg("Failed to decode transaction")
455+
456+
return nil
457+
}
458+
459+
tx := new(types.Transaction)
460+
if tx.UnmarshalBinary(raw) == nil {
461+
return tx
462+
}
463+
464+
log.Warn().
465+
Uint8("prefix", raw[0]).
466+
Int("size", len(raw)).
467+
Str("hash", crypto.Keccak256Hash(raw).Hex()).
468+
Msg("Failed to decode transaction")
469+
470+
return nil
471+
}
472+
473+
// decodeTxs decodes a list of transactions, returning only successfully decoded ones.
474+
func decodeTxs(rawTxs []rlp.RawValue) []*types.Transaction {
475+
var txs []*types.Transaction
476+
477+
for _, raw := range rawTxs {
478+
if tx := decodeTx(raw); tx != nil {
479+
txs = append(txs, tx)
480+
}
481+
}
482+
483+
return txs
484+
}

0 commit comments

Comments
 (0)