Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/devp2p/internal/ethtest/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ func (s *Suite) dialAs(key *ecdsa.PrivateKey) (*Conn, error) {
return nil, err
}
conn.caps = []p2p.Cap{
{Name: "eth", Version: 69},
{Name: "eth", Version: 70},
}
conn.ourHighestProtoVersion = 69
conn.ourHighestProtoVersion = 70
return &conn, nil
}

Expand Down
9 changes: 5 additions & 4 deletions cmd/devp2p/internal/ethtest/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,15 +434,16 @@ func (s *Suite) TestGetReceipts(t *utesting.T) {
}

// Create block bodies request.
req := &eth.GetReceiptsPacket{
RequestId: 66,
GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes),
req := &eth.GetReceiptsPacket70{
RequestId: 66,
GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes),
FirstBlockReceiptIndex: 0,
}
if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil {
t.Fatalf("could not write to connection: %v", err)
}
// Wait for response.
resp := new(eth.ReceiptsPacket[*eth.ReceiptList69])
resp := new(eth.ReceiptsPacket70)
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
t.Fatalf("error reading block bodies msg: %v", err)
}
Expand Down
21 changes: 15 additions & 6 deletions eth/downloader/fetchers_concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,15 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
// reschedule the timeout timer.
index, live := ordering[res.Req]
if live {
timeouts.Remove(index)
req := timeouts.Remove(index)
delete(ordering, res.Req)

if res.Partial {
ttl := d.peers.rates.TargetTimeout()
ordering[req] = timeouts.Size()
timeouts.Push(req, -time.Now().Add(ttl).UnixNano())
}

if index == 0 {
if !timeout.Stop() {
<-timeout.C
Expand All @@ -317,16 +325,17 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
timeout.Reset(time.Until(time.Unix(0, -exp)))
}
}
delete(ordering, res.Req)
}
// Delete the pending request (if it still exists) and mark the peer idle
delete(pending, res.Req.Peer)
delete(stales, res.Req.Peer)
if !res.Partial {
// Delete the pending request (if it still exists) and mark the peer idle
delete(pending, res.Req.Peer)
delete(stales, res.Req.Peer)

res.Req.Close()
}
// Signal the dispatcher that the round trip is done. We'll drop the
// peer if the data turns out to be junk.
res.Done <- nil
res.Req.Close()

// If the peer was previously banned and failed to deliver its pack
// in a reasonable time frame, ignore its message.
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/fetchers_concurrent_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (q *receiptQueue) deliver(peer *peerConnection, packet *eth.Response) (int,
receipts := *packet.Res.(*eth.ReceiptsRLPResponse)
hashes := packet.Meta.([]common.Hash) // {receipt hashes}

accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes)
accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes, packet.Partial, packet.From)
switch {
case err == nil && len(receipts) == 0:
peer.log.Trace("Requested receipts delivered")
Expand Down
26 changes: 17 additions & 9 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,13 +629,13 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListH
result.SetBodyDone()
}
return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool,
bodyReqTimer, bodyInMeter, bodyDropMeter, len(txLists), validate, reconstruct)
bodyReqTimer, bodyInMeter, bodyDropMeter, len(txLists), validate, reconstruct, false, 0)
}

// DeliverReceipts injects a receipt retrieval response into the results queue.
// The method returns the number of transaction receipts accepted from the delivery
// and also wakes any threads waiting for data delivery.
func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptListHashes []common.Hash) (int, error) {
func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptListHashes []common.Hash, incomplete bool, from int) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()

Expand All @@ -650,7 +650,7 @@ func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptLi
result.SetReceiptsDone()
}
return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool,
receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct)
receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct, incomplete, from)
}

// deliver injects a data retrieval response into the results queue.
Expand All @@ -662,14 +662,16 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest,
reqTimer *metrics.Timer, resInMeter, resDropMeter *metrics.Meter,
results int, validate func(index int, header *types.Header) error,
reconstruct func(index int, result *fetchResult)) (int, error) {
reconstruct func(index int, result *fetchResult), incomplete bool, from int) (int, error) {
// Short circuit if the data was never requested
request := pendPool[id]
if request == nil {
resDropMeter.Mark(int64(results))
return 0, errNoFetchesPending
}
delete(pendPool, id)
if !incomplete {
delete(pendPool, id)
}

reqTimer.UpdateSince(request.Time)
resInMeter.Mark(int64(results))
Expand All @@ -687,7 +689,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
i int
hashes []common.Hash
)
for _, header := range request.Headers {
for _, header := range request.Headers[from:] {
// Short circuit assembly if no more fetch results are found
if i >= results {
break
Expand All @@ -701,7 +703,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
i++
}

for _, header := range request.Headers[:i] {
for _, header := range request.Headers[from : from+i] {
if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil && !stale {
reconstruct(accepted, res)
} else {
Expand All @@ -718,8 +720,14 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
resDropMeter.Mark(int64(results - accepted))

// Return all failed or missing fetches to the queue
for _, header := range request.Headers[accepted:] {
taskQueue.Push(header, -int64(header.Number.Uint64()))
if incomplete {
for _, header := range request.Headers[from+accepted : from+results] {
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
} else {
for _, header := range request.Headers[from+accepted:] {
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
// Wake up Results
if accepted > 0 {
Expand Down
Loading
Loading