diff --git a/cmd/devp2p/internal/ethtest/conn.go b/cmd/devp2p/internal/ethtest/conn.go index 5182d71ce19..246dfce9065 100644 --- a/cmd/devp2p/internal/ethtest/conn.go +++ b/cmd/devp2p/internal/ethtest/conn.go @@ -66,9 +66,9 @@ func (s *Suite) dialAs(key *ecdsa.PrivateKey) (*Conn, error) { return nil, err } conn.caps = []p2p.Cap{ - {Name: "eth", Version: 69}, + {Name: "eth", Version: 70}, } - conn.ourHighestProtoVersion = 69 + conn.ourHighestProtoVersion = 70 return &conn, nil } diff --git a/cmd/devp2p/internal/ethtest/suite.go b/cmd/devp2p/internal/ethtest/suite.go index c23360bf821..fedb400ddd3 100644 --- a/cmd/devp2p/internal/ethtest/suite.go +++ b/cmd/devp2p/internal/ethtest/suite.go @@ -434,15 +434,16 @@ func (s *Suite) TestGetReceipts(t *utesting.T) { } // Create block bodies request. - req := ð.GetReceiptsPacket{ - RequestId: 66, - GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes), + req := ð.GetReceiptsPacket70{ + RequestId: 66, + GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes), + FirstBlockReceiptIndex: 0, } if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil { t.Fatalf("could not write to connection: %v", err) } // Wait for response. - resp := new(eth.ReceiptsPacket[*eth.ReceiptList69]) + resp := new(eth.ReceiptsPacket70) if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil { t.Fatalf("error reading block bodies msg: %v", err) } diff --git a/eth/downloader/fetchers_concurrent.go b/eth/downloader/fetchers_concurrent.go index 9d8cd114c12..0662200aefd 100644 --- a/eth/downloader/fetchers_concurrent.go +++ b/eth/downloader/fetchers_concurrent.go @@ -307,7 +307,15 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error { // reschedule the timeout timer. index, live := ordering[res.Req] if live { - timeouts.Remove(index) + req := timeouts.Remove(index) + delete(ordering, res.Req) + + if res.Partial { + ttl := d.peers.rates.TargetTimeout() + ordering[req] = timeouts.Size() + timeouts.Push(req, -time.Now().Add(ttl).UnixNano()) + } + if index == 0 { if !timeout.Stop() { <-timeout.C @@ -317,16 +325,17 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error { timeout.Reset(time.Until(time.Unix(0, -exp))) } } - delete(ordering, res.Req) } - // Delete the pending request (if it still exists) and mark the peer idle - delete(pending, res.Req.Peer) - delete(stales, res.Req.Peer) + if !res.Partial { + // Delete the pending request (if it still exists) and mark the peer idle + delete(pending, res.Req.Peer) + delete(stales, res.Req.Peer) + res.Req.Close() + } // Signal the dispatcher that the round trip is done. We'll drop the // peer if the data turns out to be junk. res.Done <- nil - res.Req.Close() // If the peer was previously banned and failed to deliver its pack // in a reasonable time frame, ignore its message. diff --git a/eth/downloader/fetchers_concurrent_receipts.go b/eth/downloader/fetchers_concurrent_receipts.go index dbea30e881e..42ee5cf8ce5 100644 --- a/eth/downloader/fetchers_concurrent_receipts.go +++ b/eth/downloader/fetchers_concurrent_receipts.go @@ -91,7 +91,7 @@ func (q *receiptQueue) deliver(peer *peerConnection, packet *eth.Response) (int, receipts := *packet.Res.(*eth.ReceiptsRLPResponse) hashes := packet.Meta.([]common.Hash) // {receipt hashes} - accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes) + accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes, packet.Partial, packet.From) switch { case err == nil && len(receipts) == 0: peer.log.Trace("Requested receipts delivered") diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 9fe169d5f74..4d8810b28b6 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -629,13 +629,13 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListH result.SetBodyDone() } return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, - bodyReqTimer, bodyInMeter, bodyDropMeter, len(txLists), validate, reconstruct) + bodyReqTimer, bodyInMeter, bodyDropMeter, len(txLists), validate, reconstruct, false, 0) } // DeliverReceipts injects a receipt retrieval response into the results queue. // The method returns the number of transaction receipts accepted from the delivery // and also wakes any threads waiting for data delivery. -func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptListHashes []common.Hash) (int, error) { +func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptListHashes []common.Hash, incomplete bool, from int) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -650,7 +650,7 @@ func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptLi result.SetReceiptsDone() } return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, - receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct) + receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct, incomplete, from) } // deliver injects a data retrieval response into the results queue. @@ -662,14 +662,16 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest, reqTimer *metrics.Timer, resInMeter, resDropMeter *metrics.Meter, results int, validate func(index int, header *types.Header) error, - reconstruct func(index int, result *fetchResult)) (int, error) { + reconstruct func(index int, result *fetchResult), incomplete bool, from int) (int, error) { // Short circuit if the data was never requested request := pendPool[id] if request == nil { resDropMeter.Mark(int64(results)) return 0, errNoFetchesPending } - delete(pendPool, id) + if !incomplete { + delete(pendPool, id) + } reqTimer.UpdateSince(request.Time) resInMeter.Mark(int64(results)) @@ -687,7 +689,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, i int hashes []common.Hash ) - for _, header := range request.Headers { + for _, header := range request.Headers[from:] { // Short circuit assembly if no more fetch results are found if i >= results { break @@ -701,7 +703,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, i++ } - for _, header := range request.Headers[:i] { + for _, header := range request.Headers[from : from+i] { if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil && !stale { reconstruct(accepted, res) } else { @@ -718,8 +720,14 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, resDropMeter.Mark(int64(results - accepted)) // Return all failed or missing fetches to the queue - for _, header := range request.Headers[accepted:] { - taskQueue.Push(header, -int64(header.Number.Uint64())) + if incomplete { + for _, header := range request.Headers[from+accepted : from+results] { + taskQueue.Push(header, -int64(header.Number.Uint64())) + } + } else { + for _, header := range request.Headers[from+accepted:] { + taskQueue.Push(header, -int64(header.Number.Uint64())) + } } // Wake up Results if accepted > 0 { diff --git a/eth/downloader/queue_test.go b/eth/downloader/queue_test.go index ca71a769de2..0e973bc84ce 100644 --- a/eth/downloader/queue_test.go +++ b/eth/downloader/queue_test.go @@ -32,32 +32,45 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" ) +type blockConfig struct { + txPeriod int + txCount int +} + +var emptyBlock = blockConfig{txPeriod: 0, txCount: 0} +var defaultBlock = blockConfig{txPeriod: 2, txCount: 1} + // makeChain creates a chain of n blocks starting at and including parent. // The returned hash chain is ordered head->parent. // If empty is false, every second block (i%2==0) contains one transaction. +// If config.txCount > 0, every config.txPeriod-th block contains config.txCount transactions. // No uncles are added. -func makeChain(n int, seed byte, parent *types.Block, empty bool) ([]*types.Block, []types.Receipts) { +func makeChain(n int, seed byte, parent *types.Block, config blockConfig) ([]*types.Block, []types.Receipts) { blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testDB, n, func(i int, block *core.BlockGen) { block.SetCoinbase(common.Address{seed}) - // Add one tx to every second block - if !empty && i%2 == 0 { - signer := types.MakeSigner(params.TestChainConfig, block.Number(), block.Timestamp()) - tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey) - if err != nil { - panic(err) + // Add transactions according to config + if config.txCount > 0 && i%config.txPeriod == 0 { + for range config.txCount { + signer := types.MakeSigner(params.TestChainConfig, block.Number(), block.Timestamp()) + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey) + if err != nil { + panic(err) + } + block.AddTx(tx) } - block.AddTx(tx) } }) return blocks, receipts } type chainData struct { - blocks []*types.Block - offset int + blocks []*types.Block + receipts []types.Receipts + offset int } var chain *chainData @@ -66,11 +79,11 @@ var emptyChain *chainData func init() { // Create a chain of blocks to import targetBlocks := 128 - blocks, _ := makeChain(targetBlocks, 0, testGenesis, false) - chain = &chainData{blocks, 0} + blocks, receipts := makeChain(targetBlocks, 0, testGenesis, defaultBlock) + chain = &chainData{blocks, receipts, 0} - blocks, _ = makeChain(targetBlocks, 0, testGenesis, true) - emptyChain = &chainData{blocks, 0} + blocks, receipts = makeChain(targetBlocks, 0, testGenesis, emptyBlock) + emptyChain = &chainData{blocks, receipts, 0} } func (chain *chainData) headers() []*types.Header { @@ -261,13 +274,149 @@ func TestEmptyBlocks(t *testing.T) { } } +// TestPartialReceiptDelivery checks two points: +// 1. Receipts that fail validation should be re-requested from other peers. +// 2. Partial delivery should not expire. +func TestPartialReceiptDelivery(t *testing.T) { + blocks, receipts := makeChain(64, 0, testGenesis, blockConfig{txPeriod: 1, txCount: 5}) + chain := chainData{blocks: blocks, receipts: receipts, offset: 0} + + numBlock := len(chain.blocks) + + q := newQueue(10, 10) + if !q.Idle() { + t.Errorf("new queue should be idle") + } + q.Prepare(1, SnapSync) + if res := q.Results(false); len(res) != 0 { + t.Fatal("new queue should have 0 results") + } + + // Schedule a batch of headers + headers := chain.headers() + hashes := make([]common.Hash, len(headers)) + for i, header := range headers { + hashes[i] = header.Hash() + } + q.Schedule(headers, hashes, 1) + + peer := dummyPeer("peer-1") + req, _, _ := q.ReserveReceipts(peer, numBlock) + + t.Logf("request: length %d", len(req.Headers)) + + // 1. Deliver a partial receipt: this must not clear the remaining receipts from the pending list + firstCutoff := len(req.Headers) / 3 + receiptRLP, rcHashes := getPartialReceiptsDelivery(0, firstCutoff, receipts) + accepted, err := q.DeliverReceipts(peer.id, receiptRLP, rcHashes, true, 0) + if err != nil || accepted != firstCutoff { + t.Fatalf("delivery failed: err %v, accepted %d\n", err, accepted) + } + + if pending := q.PendingReceipts(); pending != numBlock-len(req.Headers) { + t.Fatalf("wrong pending receipt length, got %d, exp %d", pending, numBlock-len(req.Headers)) + } + for i := range firstCutoff { + headerNumber := req.Headers[i].Number.Uint64() + res, _, _, _, err := q.resultCache.getFetchResult(headerNumber) + if err != nil { + t.Fatalf("fetch result get failed: err %v", err) + } + if res == nil { + t.Fatalf("fetch result is nil: header number %d", headerNumber) + } + if !res.Done(receiptType) { + t.Fatalf("wrong result, block %d receipt not done", headerNumber) + } + } + if flight := q.InFlightReceipts(); !flight { + t.Fatalf("there should be in flight receipts") + } + + // 2. Deliver a partial receipt containing an invalid entry: the invalid receipt should be removed from the pending list + secondCutoff := firstCutoff + len(req.Headers)/3 + receiptRLP, rcHashes = getPartialReceiptsDelivery(firstCutoff, secondCutoff, receipts) + // one invalid receipt + rcHashes[len(rcHashes)-1] = common.Hash{} + accepted, err = q.DeliverReceipts(peer.id, receiptRLP, rcHashes, true, firstCutoff) + if accepted != len(rcHashes)-1 { + t.Fatalf("wrong accepted, got %d, exp %d", accepted, len(rcHashes)-1) + } + if err == nil { + t.Fatalf("delivery should fail") + } + + // The invalid receipt should be returned to the pending pool + if pending := q.PendingReceipts(); pending != numBlock-len(req.Headers)+1 { + t.Fatalf("wrong pending receipt length, got %d, exp %d", pending, numBlock-len(req.Headers)) + } + for i := range len(rcHashes) - 1 { + headerNumber := req.Headers[firstCutoff+i].Number.Uint64() + res, _, _, _, err := q.resultCache.getFetchResult(headerNumber) + if err != nil { + t.Fatalf("fetch result get failed: err %v", err) + } + if res == nil { + t.Fatalf("fetch result is nil: header number %d", headerNumber) + } + if !res.Done(receiptType) { + t.Fatalf("wrong result, block %d receipt not done", headerNumber) + } + } + + // 3. Deliver the remaining receipts to complete the request + thirdCutoff := len(req.Headers) + receiptRLP, rcHashes = getPartialReceiptsDelivery(secondCutoff, thirdCutoff, receipts) + accepted, err = q.DeliverReceipts(peer.id, receiptRLP, rcHashes, false, secondCutoff) + if accepted != len(rcHashes) { + t.Fatalf("wrong accepted, got %d, exp %d", accepted, len(rcHashes)-1) + } + if err != nil || accepted != thirdCutoff-secondCutoff { + t.Fatalf("delivery failed: err %v, accepted %d\n", err, accepted) + } + + for i := range len(rcHashes) { + headerNumber := req.Headers[secondCutoff+i].Number.Uint64() + res, _, _, _, err := q.resultCache.getFetchResult(headerNumber) + if err != nil { + t.Fatalf("fetch result get failed: err %v", err) + } + if res == nil { + t.Fatalf("fetch result is nil: header number %d", headerNumber) + } + if !res.Done(receiptType) { + t.Fatalf("wrong result, block %d receipt not done", headerNumber) + } + } + if q.InFlightReceipts() { + t.Fatal("there shouldn't be any remaning in-flight receipts") + } +} + +func getPartialReceiptsDelivery(from int, to int, receipts []types.Receipts) ([]rlp.RawValue, []common.Hash) { + if from < 0 { + from = 0 + } + if to > len(receipts) { + to = len(receipts) + } + + hasher := trie.NewStackTrie(nil) + rcHashes := make([]common.Hash, to-from) + for i, rc := range receipts[from:to] { + rcHashes[i] = types.DeriveSha(rc, hasher) + } + + return types.EncodeBlockReceiptLists(receipts[from:to]), rcHashes +} + // XTestDelivery does some more extensive testing of events that happen, // blocks that become known and peers that make reservations and deliveries. // disabled since it's not really a unit-test, but can be executed to test // some more advanced scenarios func XTestDelivery(t *testing.T) { // the outside network, holding blocks - blo, rec := makeChain(128, 0, testGenesis, false) + blo, rec := makeChain(128, 0, testGenesis, defaultBlock) world := newNetwork() world.receipts = rec world.chain = blo @@ -368,7 +517,7 @@ func XTestDelivery(t *testing.T) { for i, receipt := range rcs { hashes[i] = types.DeriveSha(receipt, hasher) } - _, err := q.DeliverReceipts(peer.id, types.EncodeBlockReceiptLists(rcs), hashes) + _, err := q.DeliverReceipts(peer.id, types.EncodeBlockReceiptLists(rcs), hashes, false, 0) if err != nil { fmt.Printf("delivered %d receipts %v\n", len(rcs), err) } @@ -444,7 +593,7 @@ func (n *network) progress(numBlocks int) { n.lock.Lock() defer n.lock.Unlock() //fmt.Printf("progressing...\n") - newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], false) + newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], emptyBlock) n.chain = append(n.chain, newBlocks...) n.receipts = append(n.receipts, newR...) n.cond.Broadcast() diff --git a/eth/protocols/eth/dispatcher.go b/eth/protocols/eth/dispatcher.go index cba40596fcf..76546cdf67b 100644 --- a/eth/protocols/eth/dispatcher.go +++ b/eth/protocols/eth/dispatcher.go @@ -53,6 +53,9 @@ type Request struct { Peer string // Demultiplexer if cross-peer requests are batched together Sent time.Time // Timestamp when the request was sent + + reRequest bool + previous uint64 // id of previous index (to find sink) } // Close aborts an in-flight request. Although there's no way to notify the @@ -105,6 +108,9 @@ type Response struct { Meta interface{} // Metadata generated locally on the receiver thread Time time.Duration // Time it took for the request to be served Done chan error // Channel to signal message handling to the reader + + From int + Partial bool } // response is a wrapper around a remote Response that has an error channel to @@ -201,6 +207,16 @@ func (p *Peer) dispatcher() { reqOp.fail <- err if err == nil { + // reuse sink if it is re-request + if req.reRequest { + if _, ok := pending[req.previous]; ok { + req.sink = pending[req.previous].sink + } else { + reqOp.fail <- fmt.Errorf("Cannot find previous request index") + continue + } + delete(pending, req.previous) + } pending[req.id] = req } @@ -214,6 +230,11 @@ func (p *Peer) dispatcher() { } // Stop tracking the request delete(pending, cancelOp.id) + + // Not sure if the request is about the receipt, but removing it anyway + delete(p.receiptBuffer, cancelOp.id) + delete(p.requestedReceipts, cancelOp.id) + cancelOp.fail <- nil case resOp := <-p.resDispatch: @@ -245,7 +266,10 @@ func (p *Peer) dispatcher() { resOp.fail <- nil // Stop tracking the request, the response dispatcher will deliver - delete(pending, res.id) + // For partial response, pending should be removed after re-request + if res.Partial { + delete(pending, res.id) + } } case <-p.term: diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 2467e0c713b..04d35fdb04a 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -49,6 +49,8 @@ const ( // containing 200+ transactions nowadays, the practical limit will always // be softResponseLimit. maxReceiptsServe = 1024 + + maxPacketSize = 10 * 1024 * 1024 ) // Handler is a callback to invoke from an outside runner after the boilerplate @@ -176,7 +178,7 @@ var eth68 = map[uint64]msgHandler{ GetBlockBodiesMsg: handleGetBlockBodies, BlockBodiesMsg: handleBlockBodies, GetReceiptsMsg: handleGetReceipts68, - ReceiptsMsg: handleReceipts[*ReceiptList68], + ReceiptsMsg: handleReceipts69[*ReceiptList68], GetPooledTransactionsMsg: handleGetPooledTransactions, PooledTransactionsMsg: handlePooledTransactions, } @@ -189,7 +191,21 @@ var eth69 = map[uint64]msgHandler{ GetBlockBodiesMsg: handleGetBlockBodies, BlockBodiesMsg: handleBlockBodies, GetReceiptsMsg: handleGetReceipts69, - ReceiptsMsg: handleReceipts[*ReceiptList69], + ReceiptsMsg: handleReceipts69[*ReceiptList69], + GetPooledTransactionsMsg: handleGetPooledTransactions, + PooledTransactionsMsg: handlePooledTransactions, + BlockRangeUpdateMsg: handleBlockRangeUpdate, +} + +var eth70 = map[uint64]msgHandler{ + TransactionsMsg: handleTransactions, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, + GetBlockHeadersMsg: handleGetBlockHeaders, + BlockHeadersMsg: handleBlockHeaders, + GetBlockBodiesMsg: handleGetBlockBodies, + BlockBodiesMsg: handleBlockBodies, + GetReceiptsMsg: handleGetReceipts70, + ReceiptsMsg: handleReceipts70, GetPooledTransactionsMsg: handleGetPooledTransactions, PooledTransactionsMsg: handlePooledTransactions, BlockRangeUpdateMsg: handleBlockRangeUpdate, @@ -209,11 +225,14 @@ func handleMessage(backend Backend, peer *Peer) error { defer msg.Discard() var handlers map[uint64]msgHandler - if peer.version == ETH68 { + switch peer.version { + case ETH68: handlers = eth68 - } else if peer.version == ETH69 { + case ETH69: handlers = eth69 - } else { + case ETH70: + handlers = eth70 + default: return fmt.Errorf("unknown eth protocol version: %v", peer.version) } diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index 65c491f815f..f934d394ffd 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -538,7 +538,7 @@ func testGetBlockReceipts(t *testing.T, protocol uint) { } // Send the hash request and verify the response - p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket{ + p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket69{ RequestId: 123, GetReceiptsRequest: hashes, }) @@ -550,6 +550,100 @@ func testGetBlockReceipts(t *testing.T, protocol uint) { } } +func TestGetBlockPartialReceipts(t *testing.T) { testGetBlockPartialReceipts(t, ETH70) } + +func testGetBlockPartialReceipts(t *testing.T, protocol int) { + // First, generate the chain and overwrite the receipts. + generator := func(_ int, block *core.BlockGen) { + for j := 0; j < 5; j++ { + tx, err := types.SignTx( + types.NewTransaction(block.TxNonce(testAddr), testAddr, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), + types.LatestSignerForChainID(params.TestChainConfig.ChainID), + testKey, + ) + if err != nil { + t.Fatalf("failed to sign tx: %v", err) + } + block.AddTx(tx) + } + } + backend := newTestBackendWithGenerator(4, true, false, generator) + defer backend.close() + + blockCutoff := 2 + receiptCutoff := 4 + + // Replace the receipts in the database with larger receipts. + targetBlock := backend.chain.GetBlockByNumber(uint64(blockCutoff)) + receipts := backend.chain.GetReceiptsByHash(targetBlock.Hash()) + receiptSize := params.MaxTxGas / params.LogDataGas // ~2MiB per receipt + for i := range receipts { + payload := make([]byte, receiptSize) + for j := range payload { + payload[j] = byte(i + j) + } + receipts[i].Logs = []*types.Log{ + { + Address: common.BytesToAddress([]byte{byte(i + 1)}), + Data: payload, + }, + } + } + + rawdb.WriteReceipts(backend.db, targetBlock.Hash(), targetBlock.NumberU64(), receipts) + + peer, _ := newTestPeer("peer", uint(protocol), backend) + defer peer.close() + + var ( + hashes []common.Hash + partialReceipt []*ReceiptList69 + ) + for i := uint64(0); i <= backend.chain.CurrentBlock().Number.Uint64(); i++ { + block := backend.chain.GetBlockByNumber(i) + hashes = append(hashes, block.Hash()) + } + for i := 0; i <= blockCutoff; i++ { + block := backend.chain.GetBlockByNumber(uint64(i)) + trs := backend.chain.GetReceiptsByHash(block.Hash()) + limit := len(trs) + if i == blockCutoff { + limit = receiptCutoff + } + partialReceipt = append(partialReceipt, NewReceiptList69(trs[:limit])) + } + + p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket70{ + RequestId: 123, + GetReceiptsRequest: hashes, + FirstBlockReceiptIndex: 0, + }) + if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, &ReceiptsPacket70{ + RequestId: 123, + List: partialReceipt, + LastBlockIncomplete: true, + }); err != nil { + t.Errorf("receipts mismatch: %v", err) + } + + // Simulate the continued request + partialReceipt = []*ReceiptList69{NewReceiptList69(receipts[receiptCutoff:])} + + p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket70{ + RequestId: 123, + GetReceiptsRequest: []common.Hash{hashes[blockCutoff]}, + FirstBlockReceiptIndex: uint64(receiptCutoff), + }) + + if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, &ReceiptsPacket70{ + RequestId: 123, + List: partialReceipt, + LastBlockIncomplete: false, + }); err != nil { + t.Errorf("receipts mismatch: %v", err) + } +} + type decoder struct { msg []byte } @@ -612,10 +706,10 @@ func setup() (*testBackend, *testPeer) { } func FuzzEthProtocolHandlers(f *testing.F) { - handlers := eth69 + handlers := eth70 backend, peer := setup() f.Fuzz(func(t *testing.T, code byte, msg []byte) { - handler := handlers[uint64(code)%protocolLengths[ETH69]] + handler := handlers[uint64(code)%protocolLengths[ETH70]] if handler == nil { return } diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index aad3353d88d..55423be64e3 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -17,6 +17,7 @@ package eth import ( + "bytes" "encoding/json" "errors" "fmt" @@ -250,22 +251,31 @@ func ServiceGetBlockBodiesQuery(chain *core.BlockChain, query GetBlockBodiesRequ func handleGetReceipts68(backend Backend, msg Decoder, peer *Peer) error { // Decode the block receipts retrieval message - var query GetReceiptsPacket + var query GetReceiptsPacket69 if err := msg.Decode(&query); err != nil { return err } response := ServiceGetReceiptsQuery68(backend.Chain(), query.GetReceiptsRequest) - return peer.ReplyReceiptsRLP(query.RequestId, response) + return peer.ReplyReceiptsRLP69(query.RequestId, response) } func handleGetReceipts69(backend Backend, msg Decoder, peer *Peer) error { // Decode the block receipts retrieval message - var query GetReceiptsPacket + var query GetReceiptsPacket69 if err := msg.Decode(&query); err != nil { return err } response := serviceGetReceiptsQuery69(backend.Chain(), query.GetReceiptsRequest) - return peer.ReplyReceiptsRLP(query.RequestId, response) + return peer.ReplyReceiptsRLP69(query.RequestId, response) +} + +func handleGetReceipts70(backend Backend, msg Decoder, peer *Peer) error { + var query GetReceiptsPacket70 + if err := msg.Decode(&query); err != nil { + return err + } + response, lastBlockIncomplete := serviceGetReceiptsQuery70(backend.Chain(), query.GetReceiptsRequest, query.FirstBlockReceiptIndex) + return peer.ReplyReceiptsRLP70(query.RequestId, response, lastBlockIncomplete) } // ServiceGetReceiptsQuery68 assembles the response to a receipt query. It is @@ -342,6 +352,86 @@ func serviceGetReceiptsQuery69(chain *core.BlockChain, query GetReceiptsRequest) return receipts } +// serviceGetReceiptsQuery70 assembles the response to a receipt query. +// If the receipts exceed 10 MiB, it trims them and sets the lastBlockIncomplete flag. +// Indices smaller than firstBlockReceiptIndex are omitted from the first block receipt list. +func serviceGetReceiptsQuery70(chain *core.BlockChain, query GetReceiptsRequest, firstBlockReceiptIndex uint64) ([]rlp.RawValue, bool) { + var ( + bytes int + receipts []rlp.RawValue + lastBlockIncomplete bool + ) + + for lookups, hash := range query { + if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe || + lookups >= 2*maxReceiptsServe { + break + } + + results := chain.GetReceiptsRLP(hash) + if results == nil { + if header := chain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { + continue + } + } else { + body := chain.GetBodyRLP(hash) + if body == nil { + continue + } + var err error + results, err = blockReceiptsToNetwork69(results, body) + if err != nil { + log.Error("Error in block receipts conversion", "hash", hash, "err", err) + continue + } + } + + // todo: buffer + if firstBlockReceiptIndex > 0 && lookups == 0 { + results, lastBlockIncomplete = trimReceiptsRLP(results, int(firstBlockReceiptIndex)) + } else if bytes+len(results) > maxPacketSize { + results, lastBlockIncomplete = trimReceiptsRLP(results, 0) + } + + receipts = append(receipts, results) + bytes += len(results) + } + + return receipts, lastBlockIncomplete +} + +// trimReceiptsRLP trims raw value from `from` index until it exceeds limit +func trimReceiptsRLP(receiptsRLP rlp.RawValue, from int) (rlp.RawValue, bool) { + var ( + out bytes.Buffer + buffer = rlp.NewEncoderBuffer(&out) + iter, _ = rlp.NewListIterator(receiptsRLP) + index int + bytes int + overflow bool + ) + + list := buffer.List() + for iter.Next() { + if index < from { + index++ + continue + } + receipt := iter.Value() + if bytes+len(receipt) > maxPacketSize { + overflow = true + break + } + buffer.Write(receipt) + bytes += len(receipt) + index++ + } + buffer.ListEnd(list) + buffer.Flush() + + return out.Bytes(), overflow +} + func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error { return errors.New("block announcements disallowed") // We dropped support for non-merge networks } @@ -399,7 +489,7 @@ func handleBlockBodies(backend Backend, msg Decoder, peer *Peer) error { }, metadata) } -func handleReceipts[L ReceiptsList](backend Backend, msg Decoder, peer *Peer) error { +func handleReceipts69[L ReceiptsList](backend Backend, msg Decoder, peer *Peer) error { // A batch of receipts arrived to one of our previous requests res := new(ReceiptsPacket[L]) if err := msg.Decode(res); err != nil { @@ -431,6 +521,54 @@ func handleReceipts[L ReceiptsList](backend Backend, msg Decoder, peer *Peer) er }, metadata) } +func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error { + res := new(ReceiptsPacket70) + if err := msg.Decode(res); err != nil { + return err + } + + from, err := peer.ReconstructReceiptsPacket(res) + if err != nil { + return err + } + + if res.LastBlockIncomplete { + err := peer.RequestPartialReceipts(res.RequestId) + if err != nil { + return err + } + } + + // Assign buffers shared between list elements + buffers := new(receiptListBuffers) + for i := range res.List { + res.List[i].setBuffers(buffers) + } + + metadata := func() interface{} { + hasher := trie.NewStackTrie(nil) + hashes := make([]common.Hash, len(res.List)) + for i := range res.List { + hashes[i] = types.DeriveSha(res.List[i], hasher) + } + return hashes + } + + var enc ReceiptsRLPResponse + for i := range res.List { + enc = append(enc, res.List[i].EncodeForStorage()) + } + + return peer.dispatchResponse(&Response{ + id: res.RequestId, + code: ReceiptsMsg, + Res: &enc, + + From: from, + Partial: res.LastBlockIncomplete, + }, metadata) +} + func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error { // New transaction announcement arrived, make sure we have // a valid and fresh chain to handle them diff --git a/eth/protocols/eth/handshake.go b/eth/protocols/eth/handshake.go index bb3d1b8eb42..e7f9c2ad5e4 100644 --- a/eth/protocols/eth/handshake.go +++ b/eth/protocols/eth/handshake.go @@ -37,6 +37,8 @@ const ( // network IDs, difficulties, head and genesis blocks. func (p *Peer) Handshake(networkID uint64, chain forkid.Blockchain, rangeMsg BlockRangeUpdatePacket) error { switch p.version { + case ETH70: + return p.handshake69(networkID, chain, rangeMsg) case ETH69: return p.handshake69(networkID, chain, rangeMsg) case ETH68: diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 40c54a35705..4bb36511d8c 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -17,6 +17,7 @@ package eth import ( + "fmt" "math/rand" "sync/atomic" @@ -24,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" ) @@ -41,6 +43,12 @@ const ( maxQueuedTxAnns = 4096 ) +type partialReceipt struct { + idx int // position in original request + list *ReceiptList69 // list of partially collected receipts + size uint64 // log size of list +} + // Peer is a collection of relevant information we have about a `eth` peer. type Peer struct { id string // Unique ID for the peer, cached @@ -59,6 +67,9 @@ type Peer struct { reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them + requestedReceipts map[uint64][]common.Hash // requested receipts list + receiptBuffer map[uint64]*partialReceipt // requestId to receiptlist map + term chan struct{} // Termination channel to stop the broadcasters } @@ -66,18 +77,20 @@ type Peer struct { // version. func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer { peer := &Peer{ - id: p.ID().String(), - Peer: p, - rw: rw, - version: version, - knownTxs: newKnownCache(maxKnownTxs), - txBroadcast: make(chan []common.Hash), - txAnnounce: make(chan []common.Hash), - reqDispatch: make(chan *request), - reqCancel: make(chan *cancel), - resDispatch: make(chan *response), - txpool: txpool, - term: make(chan struct{}), + id: p.ID().String(), + Peer: p, + rw: rw, + version: version, + knownTxs: newKnownCache(maxKnownTxs), + txBroadcast: make(chan []common.Hash), + txAnnounce: make(chan []common.Hash), + reqDispatch: make(chan *request), + reqCancel: make(chan *cancel), + resDispatch: make(chan *response), + txpool: txpool, + requestedReceipts: make(map[uint64][]common.Hash), + receiptBuffer: make(map[uint64]*partialReceipt), + term: make(chan struct{}), } // Start up all the broadcasters go peer.broadcastTransactions() @@ -208,10 +221,19 @@ func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error { } // ReplyReceiptsRLP is the response to GetReceipts. -func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error { - return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsRLPPacket{ +func (p *Peer) ReplyReceiptsRLP69(id uint64, receipts []rlp.RawValue) error { + return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsRLPPacket69{ + RequestId: id, + ReceiptsRLPResponse: receipts, + }) +} + +// ReplyReceiptsRLP is the response to GetReceipts. +func (p *Peer) ReplyReceiptsRLP70(id uint64, receipts []rlp.RawValue, lastBlockIncomplete bool) error { + return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsRLPPacket70{ RequestId: id, ReceiptsRLPResponse: receipts, + LastBlockIncomplete: lastBlockIncomplete, }) } @@ -323,20 +345,152 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) id := rand.Uint64() + var req *Request + if p.version > ETH69 { + req = &Request{ + id: id, + sink: sink, + code: GetReceiptsMsg, + want: ReceiptsMsg, + data: &GetReceiptsPacket70{ + RequestId: id, + GetReceiptsRequest: hashes, + FirstBlockReceiptIndex: 0, + }, + } + } else { + req = &Request{ + id: id, + sink: sink, + code: GetReceiptsMsg, + want: ReceiptsMsg, + data: &GetReceiptsPacket69{ + RequestId: id, + GetReceiptsRequest: hashes, + }, + } + } + if err := p.dispatchRequest(req); err != nil { + return nil, err + } + + p.requestedReceipts[id] = hashes + return req, nil +} + +// HandlePartialReceipts re-request partial receipts +func (p *Peer) RequestPartialReceipts(previousId uint64) error { + split := p.receiptBuffer[previousId].idx + id := rand.Uint64() + req := &Request{ id: id, - sink: sink, + sink: nil, code: GetReceiptsMsg, want: ReceiptsMsg, - data: &GetReceiptsPacket{ - RequestId: id, - GetReceiptsRequest: hashes, + data: &GetReceiptsPacket70{ + RequestId: id, + GetReceiptsRequest: p.requestedReceipts[previousId][split:], + FirstBlockReceiptIndex: uint64(len(p.receiptBuffer[previousId].list.items)), }, + reRequest: true, + previous: previousId, } - if err := p.dispatchRequest(req); err != nil { - return nil, err + + p.receiptBuffer[id] = p.receiptBuffer[previousId] + p.requestedReceipts[id] = p.requestedReceipts[previousId] + + delete(p.receiptBuffer, previousId) + delete(p.requestedReceipts, previousId) + + return p.dispatchRequest(req) +} + +// ReconstructReceiptsPacket validates a receipt packet and checks whether a partial request is complete. +// It also mutates the packet in place, trimming the partial response or appending previously collected receipts. +func (p *Peer) ReconstructReceiptsPacket(packet *ReceiptsPacket70) (int, error) { + from := 0 + requestId := packet.RequestId + if len(packet.List) == 0 { + return 0, nil } - return req, nil + + // Process the first block + // If the request was partially collected earlier, append the buffered data so this response completes it. + firstReceipt := packet.List[0] + if firstReceipt == nil { + return 0, fmt.Errorf("nil first receipt") + } + if _, ok := p.receiptBuffer[requestId]; ok { + // complete packet (hash validation will be performed later) + firstReceipt.items = append(p.receiptBuffer[requestId].list.items, firstReceipt.items...) + from = p.receiptBuffer[requestId].idx + delete(p.receiptBuffer, requestId) + if !packet.LastBlockIncomplete { + delete(p.requestedReceipts, requestId) + } + } + + // Trim and buffer the last block when the response is incomplete. + if packet.LastBlockIncomplete { + lastReceipts := packet.List[len(packet.List)-1] + + logSize, err := p.validateLastBlockReceipt(lastReceipts, packet.RequestId) + if err != nil { + return 0, err + } + + // Update the buffered data and trim the packet to exclude the incomplete block. + if buffer, ok := p.receiptBuffer[requestId]; ok { + buffer.idx = buffer.idx + len(packet.List) - 1 + buffer.list.items = append(buffer.list.items, lastReceipts.items...) + buffer.size = buffer.size + logSize + } else { + p.receiptBuffer[requestId] = &partialReceipt{ + idx: len(packet.List) - 1, + list: lastReceipts, + size: logSize, + } + } + packet.List = packet.List[:len(packet.List)-1] + } + + return from, nil +} + +// validateLastBlockReceipt validates receipts and return log size of last block receipt +func (p *Peer) validateLastBlockReceipt(lastReceipts *ReceiptList69, id uint64) (uint64, error) { + if lastReceipts == nil { + return 0, fmt.Errorf("nil partial receipt") + } + + var previousTxs int + var previousLog uint64 + var log uint64 + if buffer, ok := p.receiptBuffer[id]; ok { + previousTxs = len(buffer.list.items) + previousLog = buffer.size + } + + // 1. Verify that the total number of transactions delivered is under the limit. + if uint64(previousTxs+len(lastReceipts.items)) > lastReceipts.items[0].GasUsed/21_000 { + // should be dropped, don't clear the buffer + return 0, fmt.Errorf("total number of tx exceeded limit") + } + + // 2. Verify the size of each receipt against the maximum transaction size. + for _, rc := range lastReceipts.items { + if uint64(len(rc.Logs)) > params.MaxTxGas/params.LogDataGas { + return 0, fmt.Errorf("total size of receipt exceeded limit") + } + log += uint64(len(rc.Logs)) + } + // 3. Verify that the overall downloaded receipt size does not exceed the block gas limit. + if previousLog+log > params.MaxGasLimit/params.LogDataGas { + return 0, fmt.Errorf("total download receipt size exceeded the limit") + } + + return log, nil } // RequestTxs fetches a batch of transactions from a remote node. diff --git a/eth/protocols/eth/peer_test.go b/eth/protocols/eth/peer_test.go index efbbbc6fff8..b3ca35a343e 100644 --- a/eth/protocols/eth/peer_test.go +++ b/eth/protocols/eth/peer_test.go @@ -22,10 +22,12 @@ package eth import ( "crypto/rand" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/rlp" ) // testPeer is a simulated peer to allow testing direct network calls. @@ -88,3 +90,131 @@ func TestPeerSet(t *testing.T) { t.Fatalf("bad size") } } + +func TestPartialReceipt(t *testing.T) { + app, net := p2p.MsgPipe() + var id enode.ID + if _, err := rand.Read(id[:]); err != nil { + t.Fatalf("failed to create random peer: %v", err) + } + + peer := NewPeer(ETH70, p2p.NewPeer(id, "peer", nil), net, nil) + + packetCh := make(chan *GetReceiptsPacket70, 1) + go func() { + for { + msg, err := app.ReadMsg() + if err != nil { + return + } + if msg.Code == GetReceiptsMsg { + var pkt GetReceiptsPacket70 + if err := msg.Decode(&pkt); err == nil { + select { + case packetCh <- &pkt: + default: + } + } + } + msg.Discard() + } + }() + + hashes := []common.Hash{ + common.HexToHash("0xaa"), + common.HexToHash("0xbb"), + common.HexToHash("0xcc"), + common.HexToHash("0xdd"), + } + + sink := make(chan *Response, 1) + req, err := peer.RequestReceipts(hashes, sink) + if err != nil { + t.Fatalf("RequestReceipts failed: %v", err) + } + select { + case _ = <-packetCh: + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for request packet") + } + + delivery := &ReceiptsPacket70{ + RequestId: req.id, + LastBlockIncomplete: true, + List: []*ReceiptList69{ + { + items: []Receipt{ + {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, + }, + }, + { + items: []Receipt{ + {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 2))}, + }, + }, + }, + } + if _, err := peer.ReconstructReceiptsPacket(delivery); err != nil { + t.Fatalf("first ReconstructReceiptsPacket failed: %v", err) + } + + if err := peer.RequestPartialReceipts(req.id); err != nil { + t.Fatalf("RequestPartialReceipts failed: %v", err) + } + + var rereq *GetReceiptsPacket70 + select { + case rereq = <-packetCh: + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for re-request packet") + } + + if _, ok := peer.receiptBuffer[req.id]; ok { + t.Fatalf("receiptBuffer has stale request id") + } + if _, ok := peer.requestedReceipts[req.id]; ok { + t.Fatalf("requestedReceipts has stale request id") + } + + buffer, ok := peer.receiptBuffer[rereq.RequestId] + if !ok { + t.Fatalf("receiptBuffer should buffer incomplete receipts") + } + if rereq.FirstBlockReceiptIndex != uint64(len(buffer.list.items)) { + t.Fatalf("unexpected FirstBlockReceiptIndex, got %d want %d", rereq.FirstBlockReceiptIndex, len(buffer.list.items)) + } + if _, ok := peer.requestedReceipts[rereq.RequestId]; !ok { + t.Fatalf("requestedReceipts should buffer receipt hashes") + } + + delivery = &ReceiptsPacket70{ + RequestId: rereq.RequestId, + LastBlockIncomplete: false, + List: []*ReceiptList69{ + { + items: []Receipt{ + {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, + }, + }, + { + items: []Receipt{ + {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, + }, + }, + { + items: []Receipt{ + {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, + }, + }, + }, + } + if _, err := peer.ReconstructReceiptsPacket(delivery); err != nil { + t.Fatalf("second ReconstructReceiptsPacket failed: %v", err) + } + if _, ok := peer.receiptBuffer[rereq.RequestId]; ok { + t.Fatalf("receiptBuffer should be cleared after delivery") + } + if _, ok := peer.requestedReceipts[rereq.RequestId]; ok { + t.Fatalf("requestedReceipts should be cleared after delivery") + } +} diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 7c41e7a9963..7b7b396166c 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -32,6 +32,7 @@ import ( const ( ETH68 = 68 ETH69 = 69 + ETH70 = 70 ) // ProtocolName is the official short name of the `eth` protocol used during @@ -40,11 +41,11 @@ const ProtocolName = "eth" // ProtocolVersions are the supported versions of the `eth` protocol (first // is primary). -var ProtocolVersions = []uint{ETH69, ETH68} +var ProtocolVersions = []uint{ETH69, ETH68, ETH70} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{ETH68: 17, ETH69: 18} +var protocolLengths = map[uint]uint64{ETH68: 17, ETH69: 18, ETH70: 18} // maxMessageSize is the maximum cap on the size of a protocol message. const maxMessageSize = 10 * 1024 * 1024 @@ -259,11 +260,18 @@ func (p *BlockBodiesResponse) Unpack() ([][]*types.Transaction, [][]*types.Heade type GetReceiptsRequest []common.Hash // GetReceiptsPacket represents a block receipts query with request ID wrapping. -type GetReceiptsPacket struct { +type GetReceiptsPacket69 struct { RequestId uint64 GetReceiptsRequest } +// GetReceiptsPacket represents a block receipts query with request ID wrapping. +type GetReceiptsPacket70 struct { + RequestId uint64 + GetReceiptsRequest + FirstBlockReceiptIndex uint64 +} + // ReceiptsResponse is the network packet for block receipts distribution. type ReceiptsResponse []types.Receipts @@ -282,13 +290,25 @@ type ReceiptsPacket[L ReceiptsList] struct { List []L } +type ReceiptsPacket70 struct { + RequestId uint64 + List []*ReceiptList69 + LastBlockIncomplete bool +} + // ReceiptsRLPResponse is used for receipts, when we already have it encoded type ReceiptsRLPResponse []rlp.RawValue // ReceiptsRLPPacket is ReceiptsRLPResponse with request ID wrapping. -type ReceiptsRLPPacket struct { +type ReceiptsRLPPacket69 struct { + RequestId uint64 + ReceiptsRLPResponse +} + +type ReceiptsRLPPacket70 struct { RequestId uint64 ReceiptsRLPResponse + LastBlockIncomplete bool } // NewPooledTransactionHashesPacket represents a transaction announcement packet on eth/68 and newer. diff --git a/eth/protocols/eth/protocol_test.go b/eth/protocols/eth/protocol_test.go index 8a2559a6c50..19a09469ca3 100644 --- a/eth/protocols/eth/protocol_test.go +++ b/eth/protocols/eth/protocol_test.go @@ -84,7 +84,7 @@ func TestEmptyMessages(t *testing.T) { BlockBodiesPacket{1111, nil}, BlockBodiesRLPPacket{1111, nil}, // Receipts - GetReceiptsPacket{1111, nil}, + GetReceiptsPacket69{1111, nil}, // Transactions GetPooledTransactionsPacket{1111, nil}, PooledTransactionsPacket{1111, nil}, @@ -97,7 +97,7 @@ func TestEmptyMessages(t *testing.T) { BlockBodiesPacket{1111, BlockBodiesResponse([]*BlockBody{})}, BlockBodiesRLPPacket{1111, BlockBodiesRLPResponse([]rlp.RawValue{})}, // Receipts - GetReceiptsPacket{1111, GetReceiptsRequest([]common.Hash{})}, + GetReceiptsPacket69{1111, GetReceiptsRequest([]common.Hash{})}, ReceiptsPacket[*ReceiptList68]{1111, []*ReceiptList68{}}, ReceiptsPacket[*ReceiptList69]{1111, []*ReceiptList69{}}, // Transactions @@ -227,7 +227,7 @@ func TestMessages(t *testing.T) { common.FromHex("f902dc820457f902d6f902d3f8d2f867088504a817c8088302e2489435353535353535353535353535353535353535358202008025a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c12a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c10f867098504a817c809830334509435353535353535353535353535353535353535358202d98025a052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afba052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afbf901fcf901f9a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000940000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000bae820d0582115c8215b3821a0a827788a00000000000000000000000000000000000000000000000000000000000000000880000000000000000"), }, { - GetReceiptsPacket{1111, GetReceiptsRequest(hashes)}, + GetReceiptsPacket69{1111, GetReceiptsRequest(hashes)}, common.FromHex("f847820457f842a000000000000000000000000000000000000000000000000000000000deadc0dea000000000000000000000000000000000000000000000000000000000feedbeef"), }, { @@ -236,7 +236,7 @@ func TestMessages(t *testing.T) { }, { // Identical to the eth/68 encoding above. - ReceiptsRLPPacket{1111, ReceiptsRLPResponse([]rlp.RawValue{receiptsRlp})}, + ReceiptsRLPPacket69{1111, ReceiptsRLPResponse([]rlp.RawValue{receiptsRlp})}, common.FromHex("f902e6820457f902e0f902ddf901688082014dbf85ff85d940000000000000000000000000000000000000011f842a0000000000000000000000000000000000000000000000000000000000000deada0000000000000000000000000000000000000000000000000000000000000beef830100ffb9016f01f9016b018201bcbf862f860940000000000000000000000000000000000000022f842a00000000000000000000000000000000000000000000000000000000000005668a0000000000000000000000000000000000000000000000000000000000000977386020f0f0f0608"), }, { diff --git a/rlp/rlpgen/testdata/pkgclash.in.txt b/rlp/rlpgen/testdata/pkgclash.in.txt index 1d407881ce4..a8c4092601a 100644 --- a/rlp/rlpgen/testdata/pkgclash.in.txt +++ b/rlp/rlpgen/testdata/pkgclash.in.txt @@ -9,5 +9,5 @@ import ( type Test struct { A eth1.MinerAPI - B eth2.GetReceiptsPacket + B eth2.GetReceiptsPacket69 } diff --git a/rlp/rlpgen/testdata/pkgclash.out.txt b/rlp/rlpgen/testdata/pkgclash.out.txt index d119639b997..302f1ec1cdf 100644 --- a/rlp/rlpgen/testdata/pkgclash.out.txt +++ b/rlp/rlpgen/testdata/pkgclash.out.txt @@ -41,7 +41,7 @@ func (obj *Test) DecodeRLP(dec *rlp.Stream) error { } _tmp0.A = _tmp1 // B: - var _tmp2 eth1.GetReceiptsPacket + var _tmp2 eth1.GetReceiptsPacket69 { if _, err := dec.List(); err != nil { return err