Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/devp2p/internal/ethtest/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func (s *Suite) TestGetReceipts(t *utesting.T) {
}

// Create block bodies request.
req := &eth.GetReceiptsPacket{
req := &eth.GetReceiptsPacket69{
RequestId: 66,
GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes),
}
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ func (d *Downloader) commitSnapSyncData(results []*fetchResult, stateSync *state
receipts := make([]rlp.RawValue, len(results))
for i, result := range results {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.body())
receipts[i] = result.Receipts
receipts[i] = types.EncodeBlockReceiptLists([]types.Receipts{result.Receipts})[0]
}
if index, err := d.blockchain.InsertReceiptChain(blocks, receipts, d.ancientLimit); err != nil {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
Expand All @@ -1063,7 +1063,7 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
log.Debug("Committing snap sync pivot as new head", "number", block.Number(), "hash", block.Hash())

// Commit the pivot block as the new head, will require full sync from here on
if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []rlp.RawValue{result.Receipts}, d.ancientLimit); err != nil {
if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, types.EncodeBlockReceiptLists([]types.Receipts{result.Receipts}), d.ancientLimit); err != nil {
return err
}
if err := d.blockchain.SnapSyncCommitHead(block.Hash()); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et
// RequestReceipts constructs a getReceipts method associated with a particular
// peer in the download tester. The returned function can be used to retrieve
// batches of block receipts from the particularly requested peer.
func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) {
func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, _ uint64, sink chan *eth.Response) (*eth.Request, error) {
blobs := eth.ServiceGetReceiptsQuery68(dlp.chain, hashes)

receipts := make([]types.Receipts, len(blobs))
Expand Down
21 changes: 18 additions & 3 deletions eth/downloader/fetchers_concurrent_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)

// receiptQueue implements typedQueue and is a type adapter between the generic
Expand Down Expand Up @@ -82,16 +84,29 @@ func (q *receiptQueue) request(peer *peerConnection, req *fetchRequest, resCh ch
for _, header := range req.Headers {
hashes = append(hashes, header.Hash())
}
return peer.peer.RequestReceipts(hashes, resCh)
return peer.peer.RequestReceipts(hashes, req.From, resCh)
}

// deliver is responsible for taking a generic response packet from the concurrent
// fetcher, unpacking the receipt data and delivering it to the downloader's queue.
func (q *receiptQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) {
receipts := *packet.Res.(*eth.ReceiptsRLPResponse)
receiptsRLP := *packet.Res.(*eth.ReceiptsRLPResponse)
hashes := packet.Meta.([]common.Hash) // {receipt hashes}

accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes)
receipts := make([][]*types.Receipt, len(receiptsRLP))
for i, rc := range receiptsRLP {
var rcStorage []*types.ReceiptForStorage
if err := rlp.DecodeBytes(rc, &rcStorage); err != nil {
peer.log.Error("Failed to decode retreived receipts", "err", err)
return 0, err
}
receipts[i] = make([]*types.Receipt, len(rcStorage))
for j := range rcStorage {
receipts[i][j] = (*types.Receipt)(rcStorage[j])
}
}

accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes, false)
switch {
case err == nil && len(receipts) == 0:
peer.log.Trace("Requested receipts delivered")
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Peer interface {
RequestHeadersByNumber(uint64, int, int, bool, chan *eth.Response) (*eth.Request, error)

RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error)
RequestReceipts([]common.Hash, chan *eth.Response) (*eth.Request, error)
RequestReceipts([]common.Hash, uint64, chan *eth.Response) (*eth.Request, error)
}

// newPeerConnection creates a new downloader peer.
Expand Down
Loading
Loading