Skip to content

Commit a46afb3

Browse files
committed
all: impl neox protocol to transfer blobs
1 parent a25bcf8 commit a46afb3

32 files changed

+1431
-412
lines changed

consensus/dbft/dbft.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ type DBFT struct {
190190
// about a new consensus payload to be sent.
191191
broadcast func(m *dbftproto.Message) error
192192

193+
// blobsFeed is a feed to notify new blobs arrivals.
194+
blobsFeed event.Feed
195+
193196
// requestTxs is a callback which is called to request the missing
194197
// transactions from neighbor nodes. Requested transactions hashes are
195198
// stored in txCbList and checked against the incoming transactions. If
@@ -572,6 +575,14 @@ func (c *DBFT) processBlockCb(b dbft.Block[common.Hash]) error {
572575
}
573576
}
574577

578+
// Notify new blobs arrival.
579+
if len(dbftBlock.sidecars) > 0 {
580+
c.blobsFeed.Send(core.NewBlobsEvent{Blob: &types.BlockBlobs{
581+
BlockHash: res.Hash(),
582+
Sidecars: res.Sidecars(),
583+
}})
584+
}
585+
575586
c.postBlock(res.Header(), state)
576587
return nil
577588
}

consensus/dbft/subscribe.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package dbft
2+
3+
import (
4+
"github.com/ethereum/go-ethereum/core"
5+
"github.com/ethereum/go-ethereum/event"
6+
)
7+
8+
func (dbft *DBFT) SubscribeBlobs(ch chan<- core.NewBlobsEvent) event.Subscription {
9+
return dbft.blobsFeed.Subscribe(ch)
10+
}

core/blockchain.go

Lines changed: 8 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1365,16 +1365,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
13651365
if n, err := bc.hc.ValidateHeaderChain(headers); err != nil {
13661366
return n, err
13671367
}
1368-
1369-
// check DA after cancun
1370-
lastBlk := blockChain[len(blockChain)-1]
1371-
if bc.chainConfig.DBFT != nil && bc.chainConfig.IsCancun(lastBlk.Number(), lastBlk.Time()) {
1372-
if _, err := CheckDataAvailableInBatch(bc, blockChain); err != nil {
1373-
log.Debug("CheckDataAvailableInBatch", "err", err)
1374-
return 0, err
1375-
}
1376-
}
1377-
13781368
// Hold the mutation lock
13791369
if !bc.chainmu.TryLock() {
13801370
return 0, errChainStopped
@@ -1433,7 +1423,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
14331423
return 0, consensus.ErrUnknownAncestor
14341424
}
14351425
td := new(big.Int).Add(ptd, blockChain[0].Difficulty())
1436-
writeSize, err := rawdb.WriteAncientBlocksWithBlobs(bc.db, blockChain, receiptChain, td)
1426+
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
14371427
if err != nil {
14381428
log.Error("Error importing chain data to ancients", "err", err)
14391429
return 0, err
@@ -1499,9 +1489,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
14991489
rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64())
15001490
rawdb.WriteBlock(batch, block)
15011491
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
1502-
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
1503-
rawdb.WriteBlobSidecars(batch, block.Hash(), block.NumberU64(), block.Sidecars())
1504-
}
15051492

15061493
// Write everything belongs to the blocks into the database. So that
15071494
// we can ensure all components of body is completed(body, receipts)
@@ -1576,10 +1563,6 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e
15761563
batch := bc.db.NewBatch()
15771564
rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td)
15781565
rawdb.WriteBlock(batch, block)
1579-
// if cancun is enabled, here need to write sidecars too
1580-
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
1581-
rawdb.WriteBlobSidecars(batch, block.Hash(), block.NumberU64(), block.Sidecars())
1582-
}
15831566
if err := batch.Write(); err != nil {
15841567
log.Crit("Failed to write block into disk", "err", err)
15851568
}
@@ -1618,17 +1601,10 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
16181601
rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
16191602
rawdb.WriteBlock(blockBatch, block)
16201603
rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts)
1621-
// if cancun is enabled, here need to write sidecars too
1622-
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
1623-
rawdb.WriteBlobSidecars(blockBatch, block.Hash(), block.NumberU64(), block.Sidecars())
1624-
}
16251604
rawdb.WritePreimages(blockBatch, statedb.Preimages())
16261605
if err := blockBatch.Write(); err != nil {
16271606
log.Crit("Failed to write block into disk", "err", err)
16281607
}
1629-
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
1630-
bc.sidecarsCache.Add(block.Hash(), block.Sidecars())
1631-
}
16321608
// Commit all cached state changes into underlying memory database.
16331609
root, err := statedb.Commit(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()), bc.chainConfig.IsCancun(block.Number(), block.Time()))
16341610
if err != nil {
@@ -1711,6 +1687,13 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
17111687
if err := bc.writeBlockWithState(block, receipts, state); err != nil {
17121688
return NonStatTy, err
17131689
}
1690+
// if cancun is enabled, here need to write sidecars too
1691+
if bc.chainConfig.IsCancun(block.Number(), block.Time()) && block.HasBlobTxs() {
1692+
if len(block.Sidecars()) > 0 {
1693+
rawdb.WriteBlobSidecars(bc.db, block.Hash(), block.NumberU64(), block.Sidecars())
1694+
bc.sidecarsCache.Add(block.Hash(), block.Sidecars())
1695+
}
1696+
}
17141697
currentBlock := bc.CurrentBlock()
17151698

17161699
// Reorganise the chain if the parent is not the head block
@@ -1843,14 +1826,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
18431826
bc.chainHeadFeed.Send(ChainHeadEvent{Header: lastCanon.Header()})
18441827
}
18451828
}()
1846-
1847-
// check block data available first
1848-
if bc.chainConfig.DBFT != nil {
1849-
if index, err := CheckDataAvailableInBatch(bc, chain); err != nil {
1850-
return nil, index, err
1851-
}
1852-
}
1853-
18541829
// Start the parallel header verifier
18551830
headers := make([]*types.Header, len(chain))
18561831
for i, block := range chain {
@@ -2365,9 +2340,6 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator, ma
23652340
// Append the next block to our batch
23662341
block := bc.GetBlock(hashes[i], numbers[i])
23672342

2368-
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
2369-
block = block.WithSidecars(bc.GetSidecarsByHash(hashes[i]))
2370-
}
23712343
blocks = append(blocks, block)
23722344
memory += block.Size()
23732345

@@ -2439,9 +2411,6 @@ func (bc *BlockChain) recoverAncestors(block *types.Block, makeWitness bool) (co
24392411
} else {
24402412
b = bc.GetBlock(hashes[i], numbers[i])
24412413
}
2442-
if bc.chainConfig.IsCancun(b.Number(), b.Time()) {
2443-
b = b.WithSidecars(bc.GetSidecarsByHash(b.Hash()))
2444-
}
24452414
if _, _, err := bc.insertChain(types.Blocks{b}, false, makeWitness && i == 0); err != nil {
24462415
return b.ParentHash(), err
24472416
}
@@ -2774,7 +2743,6 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool {
27742743
}
27752744

27762745
// reportBlock logs a bad block error.
2777-
// bad block need not save receipts & sidecars.
27782746
func (bc *BlockChain) reportBlock(block *types.Block, res *ProcessResult, err error) {
27792747
var receipts types.Receipts
27802748
if res != nil {

core/blockchain_reader.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,6 @@ func (bc *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
171171
if block == nil {
172172
return nil
173173
}
174-
sidecars := rawdb.ReadBlobSidecars(bc.db, hash, number)
175-
block = block.WithSidecars(sidecars)
176174
// Cache the found block for next time and return
177175
bc.blockCache.Add(block.Hash(), block)
178176
return block

core/events.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ type ReannoTxsEvent struct{ Txs []*types.Transaction }
2929
// NewMinedBlockEvent is posted when a block has been imported.
3030
type NewMinedBlockEvent struct{ Block *types.Block }
3131

32+
// NewBlobsEvent is posted when new blobs are imported.
33+
type NewBlobsEvent struct{ Blob *types.BlockBlobs }
34+
3235
// RemovedLogsEvent is posted when a reorg happens
3336
type RemovedLogsEvent struct{ Logs []*types.Log }
3437

core/rawdb/accessors_chain.go

Lines changed: 14 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -762,48 +762,6 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) {
762762
WriteHeader(db, block.Header())
763763
}
764764

765-
// WriteAncientBlocksWithBlobs writes entire block data with blobs into ancient store and returns the total written size.
766-
func WriteAncientBlocksWithBlobs(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
767-
// find cancun index, it's used for new added blob ancient table
768-
cancunIndex := -1
769-
for i, block := range blocks {
770-
if block.Sidecars() != nil {
771-
cancunIndex = i
772-
break
773-
}
774-
}
775-
log.Debug("WriteAncientBlocks", "startAt", blocks[0].Number(), "cancunIndex", cancunIndex, "len", len(blocks))
776-
777-
var (
778-
tdSum = new(big.Int).Set(td)
779-
preSize int64
780-
err error
781-
)
782-
if cancunIndex > 0 {
783-
preSize, err = WriteAncientBlocks(db, blocks[:cancunIndex], receipts[:cancunIndex], td)
784-
if err != nil {
785-
return preSize, err
786-
}
787-
for i, block := range blocks[:cancunIndex] {
788-
if i > 0 {
789-
tdSum.Add(tdSum, block.Difficulty())
790-
}
791-
}
792-
tdSum.Add(tdSum, blocks[cancunIndex].Difficulty())
793-
}
794-
795-
// It will reset blob ancient table at cancunIndex
796-
if cancunIndex >= 0 {
797-
if err = ResetEmptyBlobAncientTable(db, blocks[cancunIndex].NumberU64()); err != nil {
798-
return 0, err
799-
}
800-
blocks = blocks[cancunIndex:]
801-
receipts = receipts[cancunIndex:]
802-
}
803-
postSize, err := WriteAncientBlocks(db, blocks, receipts, tdSum)
804-
return preSize + postSize, err
805-
}
806-
807765
// WriteAncientBlocks writes entire block data into ancient store and returns the total written size.
808766
func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
809767
var (
@@ -846,11 +804,21 @@ func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *type
846804
if err := op.Append(ChainFreezerDifficultyTable, num, td); err != nil {
847805
return fmt.Errorf("can't append block %d total difficulty: %v", num, err)
848806
}
849-
if block.Sidecars() != nil {
850-
if err := op.Append(ChainFreezerBlobSidecarTable, num, block.Sidecars()); err != nil {
851-
return fmt.Errorf("can't append block %d blobs: %v", num, err)
852-
}
807+
return nil
808+
}
809+
810+
// WriteAncientSidecars writes the supplied blob sidecars into the ancient store and returns the total written size.
811+
func WriteAncientSidecars(db ethdb.AncientWriter, blockNum uint64, sidecars types.BlobSidecars) (int64, error) {
812+
return db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
813+
return writeAncientSidecars(op, blockNum, sidecars)
814+
})
815+
}
816+
817+
func writeAncientSidecars(op ethdb.AncientWriteOp, blockNum uint64, sidecars types.BlobSidecars) error {
818+
if err := op.Append(ChainFreezerBlobSidecarTable, blockNum, sidecars); err != nil {
819+
return fmt.Errorf("can't append block %d blobs: %v", blockNum, err)
853820
}
821+
854822
return nil
855823
}
856824

core/types/blob_sidecar.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,17 @@ func (s *BlobSidecar) SanityCheck(blockNumber *big.Int, blockHash common.Hash) e
6161
}
6262
return nil
6363
}
64+
65+
// BlockBlobs represents a block hash and its associated sidecars.
66+
type BlockBlobs struct {
67+
BlockHash common.Hash `json:"blockHash"` // hash of the block
68+
Sidecars BlobSidecars `json:"sidecars"` // sidecars associated with the block
69+
}
70+
71+
// NewBlockBlobs creates a new BlockBlobs from a block.
72+
func NewBlockBlobs(block *Block) *BlockBlobs {
73+
return &BlockBlobs{
74+
BlockHash: block.Header().Hash(),
75+
Sidecars: block.Sidecars(),
76+
}
77+
}

core/types/block.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,16 @@ func (b *Block) Hash() common.Hash {
602602
return h
603603
}
604604

605+
// HasBlobTxs returns whether the block contains blob transactions.
606+
func (b *Block) HasBlobTxs() bool {
607+
for _, tx := range b.transactions {
608+
if tx.Type() == BlobTxType {
609+
return true
610+
}
611+
}
612+
return false
613+
}
614+
605615
type Blocks []*Block
606616

607617
// HeaderParentHashFromRLP returns the parentHash of an RLP-encoded

eth/api_backend.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,23 @@ func (b *EthAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (type
278278
}
279279

280280
func (b *EthAPIBackend) GetBlobSidecars(ctx context.Context, hash common.Hash) (types.BlobSidecars, error) {
281-
return b.eth.blockchain.GetSidecarsByHash(hash), nil
281+
sidecars := b.eth.blockchain.GetSidecarsByHash(hash)
282+
if len(sidecars) > 0 {
283+
return sidecars, nil
284+
}
285+
286+
block, err := b.BlockByHash(ctx, hash)
287+
if err != nil {
288+
return nil, err
289+
}
290+
if !block.HasBlobTxs() {
291+
return types.BlobSidecars{}, nil
292+
}
293+
294+
if b.eth.neoxSrv != nil {
295+
return b.eth.neoxSrv.RetrieveSidecarsByHash(hash)
296+
}
297+
return nil, errors.New("neox service not available")
282298
}
283299

284300
func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {

eth/backend.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"github.com/ethereum/go-ethereum/eth/gasprice"
5050
dbftproto "github.com/ethereum/go-ethereum/eth/protocols/dbft"
5151
"github.com/ethereum/go-ethereum/eth/protocols/eth"
52+
"github.com/ethereum/go-ethereum/eth/protocols/neox"
5253
"github.com/ethereum/go-ethereum/eth/protocols/snap"
5354
"github.com/ethereum/go-ethereum/eth/tracers"
5455
"github.com/ethereum/go-ethereum/ethdb"
@@ -80,6 +81,7 @@ type Ethereum struct {
8081
localTxTracker *locals.TxTracker
8182
dbftSrv *dbftproto.Service
8283
blockchain *core.BlockChain
84+
neoxSrv *neox.Service
8385

8486
handler *handler
8587
discmix *enode.FairMix
@@ -369,6 +371,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
369371
if config.AntiMEVEnforceECDSABlockSignatureScheme {
370372
bft.EnforceECDSASignatures()
371373
}
374+
375+
eth.neoxSrv = neox.New(eth.blockchain, newNeoxCallback(chainDb, eth.blockchain), bft)
372376
}
373377

374378
// Start the RPC service
@@ -642,6 +646,9 @@ func (s *Ethereum) SyncMode() downloader.SyncMode {
642646
func (s *Ethereum) Protocols() []p2p.Protocol {
643647
protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.discmix)
644648
protos = append(protos, s.dbftSrv.MakeProtocols()...)
649+
if s.neoxSrv != nil {
650+
protos = append(protos, s.neoxSrv.MakeProtocols()...)
651+
}
645652
if s.config.SnapshotCache > 0 {
646653
protos = append(protos, snap.MakeProtocols((*snapHandler)(s.handler))...)
647654
}
@@ -667,6 +674,12 @@ func (s *Ethereum) Start() error {
667674
// start log indexer
668675
s.filterMaps.Start()
669676
go s.updateFilterMapsHeads()
677+
678+
// Start neox service
679+
if s.neoxSrv != nil {
680+
s.neoxSrv.Start()
681+
}
682+
670683
return nil
671684
}
672685

@@ -767,6 +780,9 @@ func (s *Ethereum) Stop() error {
767780
s.discmix.Close()
768781
s.dropper.Stop()
769782
s.handler.Stop()
783+
if s.neoxSrv != nil {
784+
s.neoxSrv.Stop()
785+
}
770786

771787
// Then stop everything else.
772788
ch := make(chan struct{})

0 commit comments

Comments
 (0)