Skip to content

Commit a62ec06

Browse files
committed
fix: remove partial sink
1 parent 1f32d89 commit a62ec06

File tree

8 files changed

+94
-300
lines changed

8 files changed

+94
-300
lines changed

eth/downloader/fetchers_concurrent.go

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -307,15 +307,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
307307
// reschedule the timeout timer.
308308
index, live := ordering[res.Req]
309309
if live {
310-
req := timeouts.Remove(index)
311-
delete(ordering, res.Req)
312-
313-
if res.Partial {
314-
ttl := d.peers.rates.TargetTimeout()
315-
ordering[req] = timeouts.Size()
316-
timeouts.Push(req, -time.Now().Add(ttl).UnixNano())
317-
}
318-
310+
timeouts.Remove(index)
319311
if index == 0 {
320312
if !timeout.Stop() {
321313
<-timeout.C
@@ -325,17 +317,16 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
325317
timeout.Reset(time.Until(time.Unix(0, -exp)))
326318
}
327319
}
320+
delete(ordering, res.Req)
328321
}
329-
if !res.Partial {
330-
// Delete the pending request (if it still exists) and mark the peer idle
331-
delete(pending, res.Req.Peer)
332-
delete(stales, res.Req.Peer)
322+
// Delete the pending request (if it still exists) and mark the peer idle
323+
delete(pending, res.Req.Peer)
324+
delete(stales, res.Req.Peer)
333325

334-
res.Req.Close()
335-
}
336326
// Signal the dispatcher that the round trip is done. We'll drop the
337327
// peer if the data turns out to be junk.
338328
res.Done <- nil
329+
res.Req.Close()
339330

340331
// If the peer was previously banned and failed to deliver its pack
341332
// in a reasonable time frame, ignore its message.

eth/downloader/fetchers_concurrent_receipts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func (q *receiptQueue) deliver(peer *peerConnection, packet *eth.Response) (int,
9191
receipts := *packet.Res.(*eth.ReceiptsRLPResponse)
9292
hashes := packet.Meta.([]common.Hash) // {receipt hashes}
9393

94-
accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes, packet.Partial, packet.From)
94+
accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes)
9595
switch {
9696
case err == nil && len(receipts) == 0:
9797
peer.log.Trace("Requested receipts delivered")

eth/downloader/queue.go

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -629,13 +629,13 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListH
629629
result.SetBodyDone()
630630
}
631631
return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool,
632-
bodyReqTimer, bodyInMeter, bodyDropMeter, len(txLists), validate, reconstruct, false, 0)
632+
bodyReqTimer, bodyInMeter, bodyDropMeter, len(txLists), validate, reconstruct)
633633
}
634634

635635
// DeliverReceipts injects a receipt retrieval response into the results queue.
636636
// The method returns the number of transaction receipts accepted from the delivery
637637
// and also wakes any threads waiting for data delivery.
638-
func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptListHashes []common.Hash, incomplete bool, from int) (int, error) {
638+
func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptListHashes []common.Hash) (int, error) {
639639
q.lock.Lock()
640640
defer q.lock.Unlock()
641641

@@ -650,7 +650,7 @@ func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptLi
650650
result.SetReceiptsDone()
651651
}
652652
return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool,
653-
receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct, incomplete, from)
653+
receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct)
654654
}
655655

656656
// deliver injects a data retrieval response into the results queue.
@@ -662,16 +662,14 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
662662
taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest,
663663
reqTimer *metrics.Timer, resInMeter, resDropMeter *metrics.Meter,
664664
results int, validate func(index int, header *types.Header) error,
665-
reconstruct func(index int, result *fetchResult), incomplete bool, from int) (int, error) {
665+
reconstruct func(index int, result *fetchResult)) (int, error) {
666666
// Short circuit if the data was never requested
667667
request := pendPool[id]
668668
if request == nil {
669669
resDropMeter.Mark(int64(results))
670670
return 0, errNoFetchesPending
671671
}
672-
if !incomplete {
673-
delete(pendPool, id)
674-
}
672+
delete(pendPool, id)
675673

676674
reqTimer.UpdateSince(request.Time)
677675
resInMeter.Mark(int64(results))
@@ -689,7 +687,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
689687
i int
690688
hashes []common.Hash
691689
)
692-
for _, header := range request.Headers[from:] {
690+
for _, header := range request.Headers {
693691
// Short circuit assembly if no more fetch results are found
694692
if i >= results {
695693
break
@@ -703,7 +701,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
703701
i++
704702
}
705703

706-
for _, header := range request.Headers[from : from+i] {
704+
for _, header := range request.Headers[:i] {
707705
if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil && !stale {
708706
reconstruct(accepted, res)
709707
} else {
@@ -720,14 +718,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
720718
resDropMeter.Mark(int64(results - accepted))
721719

722720
// Return all failed or missing fetches to the queue
723-
if incomplete {
724-
for _, header := range request.Headers[from+accepted : from+results] {
725-
taskQueue.Push(header, -int64(header.Number.Uint64()))
726-
}
727-
} else {
728-
for _, header := range request.Headers[from+accepted:] {
729-
taskQueue.Push(header, -int64(header.Number.Uint64()))
730-
}
721+
for _, header := range request.Headers[accepted:] {
722+
taskQueue.Push(header, -int64(header.Number.Uint64()))
731723
}
732724
// Wake up Results
733725
if accepted > 0 {

eth/downloader/queue_test.go

Lines changed: 17 additions & 166 deletions
Original file line numberDiff line numberDiff line change
@@ -32,45 +32,32 @@ import (
3232
"github.com/ethereum/go-ethereum/core/types"
3333
"github.com/ethereum/go-ethereum/log"
3434
"github.com/ethereum/go-ethereum/params"
35-
"github.com/ethereum/go-ethereum/rlp"
3635
"github.com/ethereum/go-ethereum/trie"
3736
)
3837

39-
type blockConfig struct {
40-
txPeriod int
41-
txCount int
42-
}
43-
44-
var emptyBlock = blockConfig{txPeriod: 0, txCount: 0}
45-
var defaultBlock = blockConfig{txPeriod: 2, txCount: 1}
46-
4738
// makeChain creates a chain of n blocks starting at and including parent.
4839
// The returned hash chain is ordered head->parent.
4940
// If empty is false, every second block (i%2==0) contains one transaction.
50-
// If config.txCount > 0, every config.txPeriod-th block contains config.txCount transactions.
5141
// No uncles are added.
52-
func makeChain(n int, seed byte, parent *types.Block, config blockConfig) ([]*types.Block, []types.Receipts) {
42+
func makeChain(n int, seed byte, parent *types.Block, empty bool) ([]*types.Block, []types.Receipts) {
5343
blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testDB, n, func(i int, block *core.BlockGen) {
5444
block.SetCoinbase(common.Address{seed})
55-
// Add transactions according to config
56-
if config.txCount > 0 && i%config.txPeriod == 0 {
57-
for range config.txCount {
58-
signer := types.MakeSigner(params.TestChainConfig, block.Number(), block.Timestamp())
59-
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey)
60-
if err != nil {
61-
panic(err)
62-
}
63-
block.AddTx(tx)
45+
// Add one tx to every second block
46+
if !empty && i%2 == 0 {
47+
signer := types.MakeSigner(params.TestChainConfig, block.Number(), block.Timestamp())
48+
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey)
49+
if err != nil {
50+
panic(err)
6451
}
52+
block.AddTx(tx)
6553
}
6654
})
6755
return blocks, receipts
6856
}
6957

7058
type chainData struct {
71-
blocks []*types.Block
72-
receipts []types.Receipts
73-
offset int
59+
blocks []*types.Block
60+
offset int
7461
}
7562

7663
var chain *chainData
@@ -79,11 +66,11 @@ var emptyChain *chainData
7966
func init() {
8067
// Create a chain of blocks to import
8168
targetBlocks := 128
82-
blocks, receipts := makeChain(targetBlocks, 0, testGenesis, defaultBlock)
83-
chain = &chainData{blocks, receipts, 0}
69+
blocks, _ := makeChain(targetBlocks, 0, testGenesis, false)
70+
chain = &chainData{blocks, 0}
8471

85-
blocks, receipts = makeChain(targetBlocks, 0, testGenesis, emptyBlock)
86-
emptyChain = &chainData{blocks, receipts, 0}
72+
blocks, _ = makeChain(targetBlocks, 0, testGenesis, true)
73+
emptyChain = &chainData{blocks, 0}
8774
}
8875

8976
func (chain *chainData) headers() []*types.Header {
@@ -274,149 +261,13 @@ func TestEmptyBlocks(t *testing.T) {
274261
}
275262
}
276263

277-
// TestPartialReceiptDelivery checks two points:
278-
// 1. Receipts that fail validation should be re-requested from other peers.
279-
// 2. Partial delivery should not expire.
280-
func TestPartialReceiptDelivery(t *testing.T) {
281-
blocks, receipts := makeChain(64, 0, testGenesis, blockConfig{txPeriod: 1, txCount: 5})
282-
chain := chainData{blocks: blocks, receipts: receipts, offset: 0}
283-
284-
numBlock := len(chain.blocks)
285-
286-
q := newQueue(10, 10)
287-
if !q.Idle() {
288-
t.Errorf("new queue should be idle")
289-
}
290-
q.Prepare(1, SnapSync)
291-
if res := q.Results(false); len(res) != 0 {
292-
t.Fatal("new queue should have 0 results")
293-
}
294-
295-
// Schedule a batch of headers
296-
headers := chain.headers()
297-
hashes := make([]common.Hash, len(headers))
298-
for i, header := range headers {
299-
hashes[i] = header.Hash()
300-
}
301-
q.Schedule(headers, hashes, 1)
302-
303-
peer := dummyPeer("peer-1")
304-
req, _, _ := q.ReserveReceipts(peer, numBlock)
305-
306-
t.Logf("request: length %d", len(req.Headers))
307-
308-
// 1. Deliver a partial receipt: this must not clear the remaining receipts from the pending list
309-
firstCutoff := len(req.Headers) / 3
310-
receiptRLP, rcHashes := getPartialReceiptsDelivery(0, firstCutoff, receipts)
311-
accepted, err := q.DeliverReceipts(peer.id, receiptRLP, rcHashes, true, 0)
312-
if err != nil || accepted != firstCutoff {
313-
t.Fatalf("delivery failed: err %v, accepted %d\n", err, accepted)
314-
}
315-
316-
if pending := q.PendingReceipts(); pending != numBlock-len(req.Headers) {
317-
t.Fatalf("wrong pending receipt length, got %d, exp %d", pending, numBlock-len(req.Headers))
318-
}
319-
for i := range firstCutoff {
320-
headerNumber := req.Headers[i].Number.Uint64()
321-
res, _, _, _, err := q.resultCache.getFetchResult(headerNumber)
322-
if err != nil {
323-
t.Fatalf("fetch result get failed: err %v", err)
324-
}
325-
if res == nil {
326-
t.Fatalf("fetch result is nil: header number %d", headerNumber)
327-
}
328-
if !res.Done(receiptType) {
329-
t.Fatalf("wrong result, block %d receipt not done", headerNumber)
330-
}
331-
}
332-
if flight := q.InFlightReceipts(); !flight {
333-
t.Fatalf("there should be in flight receipts")
334-
}
335-
336-
// 2. Deliver a partial receipt containing an invalid entry: the invalid receipt should be removed from the pending list
337-
secondCutoff := firstCutoff + len(req.Headers)/3
338-
receiptRLP, rcHashes = getPartialReceiptsDelivery(firstCutoff, secondCutoff, receipts)
339-
// one invalid receipt
340-
rcHashes[len(rcHashes)-1] = common.Hash{}
341-
accepted, err = q.DeliverReceipts(peer.id, receiptRLP, rcHashes, true, firstCutoff)
342-
if accepted != len(rcHashes)-1 {
343-
t.Fatalf("wrong accepted, got %d, exp %d", accepted, len(rcHashes)-1)
344-
}
345-
if err == nil {
346-
t.Fatalf("delivery should fail")
347-
}
348-
349-
// The invalid receipt should be returned to the pending pool
350-
if pending := q.PendingReceipts(); pending != numBlock-len(req.Headers)+1 {
351-
t.Fatalf("wrong pending receipt length, got %d, exp %d", pending, numBlock-len(req.Headers))
352-
}
353-
for i := range len(rcHashes) - 1 {
354-
headerNumber := req.Headers[firstCutoff+i].Number.Uint64()
355-
res, _, _, _, err := q.resultCache.getFetchResult(headerNumber)
356-
if err != nil {
357-
t.Fatalf("fetch result get failed: err %v", err)
358-
}
359-
if res == nil {
360-
t.Fatalf("fetch result is nil: header number %d", headerNumber)
361-
}
362-
if !res.Done(receiptType) {
363-
t.Fatalf("wrong result, block %d receipt not done", headerNumber)
364-
}
365-
}
366-
367-
// 3. Deliver the remaining receipts to complete the request
368-
thirdCutoff := len(req.Headers)
369-
receiptRLP, rcHashes = getPartialReceiptsDelivery(secondCutoff, thirdCutoff, receipts)
370-
accepted, err = q.DeliverReceipts(peer.id, receiptRLP, rcHashes, false, secondCutoff)
371-
if accepted != len(rcHashes) {
372-
t.Fatalf("wrong accepted, got %d, exp %d", accepted, len(rcHashes)-1)
373-
}
374-
if err != nil || accepted != thirdCutoff-secondCutoff {
375-
t.Fatalf("delivery failed: err %v, accepted %d\n", err, accepted)
376-
}
377-
378-
for i := range len(rcHashes) {
379-
headerNumber := req.Headers[secondCutoff+i].Number.Uint64()
380-
res, _, _, _, err := q.resultCache.getFetchResult(headerNumber)
381-
if err != nil {
382-
t.Fatalf("fetch result get failed: err %v", err)
383-
}
384-
if res == nil {
385-
t.Fatalf("fetch result is nil: header number %d", headerNumber)
386-
}
387-
if !res.Done(receiptType) {
388-
t.Fatalf("wrong result, block %d receipt not done", headerNumber)
389-
}
390-
}
391-
if q.InFlightReceipts() {
392-
t.Fatal("there shouldn't be any remaning in-flight receipts")
393-
}
394-
}
395-
396-
func getPartialReceiptsDelivery(from int, to int, receipts []types.Receipts) ([]rlp.RawValue, []common.Hash) {
397-
if from < 0 {
398-
from = 0
399-
}
400-
if to > len(receipts) {
401-
to = len(receipts)
402-
}
403-
404-
hasher := trie.NewStackTrie(nil)
405-
rcHashes := make([]common.Hash, to-from)
406-
for i, rc := range receipts[from:to] {
407-
rcHashes[i] = types.DeriveSha(rc, hasher)
408-
}
409-
410-
return types.EncodeBlockReceiptLists(receipts[from:to]), rcHashes
411-
}
412-
413264
// XTestDelivery does some more extensive testing of events that happen,
414265
// blocks that become known and peers that make reservations and deliveries.
415266
// disabled since it's not really a unit-test, but can be executed to test
416267
// some more advanced scenarios
417268
func XTestDelivery(t *testing.T) {
418269
// the outside network, holding blocks
419-
blo, rec := makeChain(128, 0, testGenesis, defaultBlock)
270+
blo, rec := makeChain(128, 0, testGenesis, false)
420271
world := newNetwork()
421272
world.receipts = rec
422273
world.chain = blo
@@ -517,7 +368,7 @@ func XTestDelivery(t *testing.T) {
517368
for i, receipt := range rcs {
518369
hashes[i] = types.DeriveSha(receipt, hasher)
519370
}
520-
_, err := q.DeliverReceipts(peer.id, types.EncodeBlockReceiptLists(rcs), hashes, false, 0)
371+
_, err := q.DeliverReceipts(peer.id, types.EncodeBlockReceiptLists(rcs), hashes)
521372
if err != nil {
522373
fmt.Printf("delivered %d receipts %v\n", len(rcs), err)
523374
}
@@ -593,7 +444,7 @@ func (n *network) progress(numBlocks int) {
593444
n.lock.Lock()
594445
defer n.lock.Unlock()
595446
//fmt.Printf("progressing...\n")
596-
newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], emptyBlock)
447+
newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], false)
597448
n.chain = append(n.chain, newBlocks...)
598449
n.receipts = append(n.receipts, newR...)
599450
n.cond.Broadcast()

0 commit comments

Comments
 (0)