Skip to content

Commit 40a3856

Browse files
tailingchenkaralabe
authored andcommitted
eth/fetcher: check the origin of filter tasks (#14975)
* eth/fetcher: check the origin of filter task * eth/fetcher: add some details to fetcher logs
1 parent 89860f4 commit 40a3856

File tree

3 files changed

+56
-45
lines changed

3 files changed

+56
-45
lines changed

eth/fetcher/fetcher.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,15 @@ type announce struct {
8383

8484
// headerFilterTask represents a batch of headers needing fetcher filtering.
8585
type headerFilterTask struct {
86+
peer string // The source peer of block headers
8687
headers []*types.Header // Collection of headers to filter
8788
time time.Time // Arrival time of the headers
8889
}
8990

9091
// headerFilterTask represents a batch of block bodies (transactions and uncles)
9192
// needing fetcher filtering.
9293
type bodyFilterTask struct {
94+
peer string // The source peer of block bodies
9395
transactions [][]*types.Transaction // Collection of transactions per block bodies
9496
uncles [][]*types.Header // Collection of uncles per block bodies
9597
time time.Time // Arrival time of the blocks' contents
@@ -218,8 +220,8 @@ func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
218220

219221
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
220222
// returning those that should be handled differently.
221-
func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*types.Header {
222-
log.Trace("Filtering headers", "headers", len(headers))
223+
func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
224+
log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
223225

224226
// Send the filter channel to the fetcher
225227
filter := make(chan *headerFilterTask)
@@ -231,7 +233,7 @@ func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*type
231233
}
232234
// Request the filtering of the header list
233235
select {
234-
case filter <- &headerFilterTask{headers: headers, time: time}:
236+
case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
235237
case <-f.quit:
236238
return nil
237239
}
@@ -246,8 +248,8 @@ func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*type
246248

247249
// FilterBodies extracts all the block bodies that were explicitly requested by
248250
// the fetcher, returning those that should be handled differently.
249-
func (f *Fetcher) FilterBodies(transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
250-
log.Trace("Filtering bodies", "txs", len(transactions), "uncles", len(uncles))
251+
func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
252+
log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
251253

252254
// Send the filter channel to the fetcher
253255
filter := make(chan *bodyFilterTask)
@@ -259,7 +261,7 @@ func (f *Fetcher) FilterBodies(transactions [][]*types.Transaction, uncles [][]*
259261
}
260262
// Request the filtering of the body list
261263
select {
262-
case filter <- &bodyFilterTask{transactions: transactions, uncles: uncles, time: time}:
264+
case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
263265
case <-f.quit:
264266
return nil, nil
265267
}
@@ -444,7 +446,7 @@ func (f *Fetcher) loop() {
444446
hash := header.Hash()
445447

446448
// Filter fetcher-requested headers from other synchronisation algorithms
447-
if announce := f.fetching[hash]; announce != nil && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
449+
if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
448450
// If the delivered header does not match the promised number, drop the announcer
449451
if header.Number.Uint64() != announce.number {
450452
log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
@@ -523,7 +525,7 @@ func (f *Fetcher) loop() {
523525
txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
524526
uncleHash := types.CalcUncleHash(task.uncles[i])
525527

526-
if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash {
528+
if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer {
527529
// Mark the body matched, reassemble if still unknown
528530
matched = true
529531

eth/fetcher/fetcher_test.go

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func (f *fetcherTester) dropPeer(peer string) {
153153
}
154154

155155
// makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer.
156-
func (f *fetcherTester) makeHeaderFetcher(blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn {
156+
func (f *fetcherTester) makeHeaderFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn {
157157
closure := make(map[common.Hash]*types.Block)
158158
for hash, block := range blocks {
159159
closure[hash] = block
@@ -166,14 +166,14 @@ func (f *fetcherTester) makeHeaderFetcher(blocks map[common.Hash]*types.Block, d
166166
headers = append(headers, block.Header())
167167
}
168168
// Return on a new thread
169-
go f.fetcher.FilterHeaders(headers, time.Now().Add(drift))
169+
go f.fetcher.FilterHeaders(peer, headers, time.Now().Add(drift))
170170

171171
return nil
172172
}
173173
}
174174

175175
// makeBodyFetcher retrieves a block body fetcher associated with a simulated peer.
176-
func (f *fetcherTester) makeBodyFetcher(blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn {
176+
func (f *fetcherTester) makeBodyFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn {
177177
closure := make(map[common.Hash]*types.Block)
178178
for hash, block := range blocks {
179179
closure[hash] = block
@@ -191,7 +191,7 @@ func (f *fetcherTester) makeBodyFetcher(blocks map[common.Hash]*types.Block, dri
191191
}
192192
}
193193
// Return on a new thread
194-
go f.fetcher.FilterBodies(transactions, uncles, time.Now().Add(drift))
194+
go f.fetcher.FilterBodies(peer, transactions, uncles, time.Now().Add(drift))
195195

196196
return nil
197197
}
@@ -282,8 +282,8 @@ func testSequentialAnnouncements(t *testing.T, protocol int) {
282282
hashes, blocks := makeChain(targetBlocks, 0, genesis)
283283

284284
tester := newTester()
285-
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
286-
bodyFetcher := tester.makeBodyFetcher(blocks, 0)
285+
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
286+
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
287287

288288
// Iteratively announce blocks until all are imported
289289
imported := make(chan *types.Block)
@@ -309,22 +309,28 @@ func testConcurrentAnnouncements(t *testing.T, protocol int) {
309309

310310
// Assemble a tester with a built in counter for the requests
311311
tester := newTester()
312-
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
313-
bodyFetcher := tester.makeBodyFetcher(blocks, 0)
312+
firstHeaderFetcher := tester.makeHeaderFetcher("first", blocks, -gatherSlack)
313+
firstBodyFetcher := tester.makeBodyFetcher("first", blocks, 0)
314+
secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack)
315+
secondBodyFetcher := tester.makeBodyFetcher("second", blocks, 0)
314316

315317
counter := uint32(0)
316-
headerWrapper := func(hash common.Hash) error {
318+
firstHeaderWrapper := func(hash common.Hash) error {
319+
atomic.AddUint32(&counter, 1)
320+
return firstHeaderFetcher(hash)
321+
}
322+
secondHeaderWrapper := func(hash common.Hash) error {
317323
atomic.AddUint32(&counter, 1)
318-
return headerFetcher(hash)
324+
return secondHeaderFetcher(hash)
319325
}
320326
// Iteratively announce blocks until all are imported
321327
imported := make(chan *types.Block)
322328
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
323329

324330
for i := len(hashes) - 2; i >= 0; i-- {
325-
tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher)
326-
tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), headerWrapper, bodyFetcher)
327-
tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), headerWrapper, bodyFetcher)
331+
tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), firstHeaderWrapper, firstBodyFetcher)
332+
tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
333+
tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
328334
verifyImportEvent(t, imported, true)
329335
}
330336
verifyImportDone(t, imported)
@@ -347,8 +353,8 @@ func testOverlappingAnnouncements(t *testing.T, protocol int) {
347353
hashes, blocks := makeChain(targetBlocks, 0, genesis)
348354

349355
tester := newTester()
350-
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
351-
bodyFetcher := tester.makeBodyFetcher(blocks, 0)
356+
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
357+
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
352358

353359
// Iteratively announce blocks, but overlap them continuously
354360
overlap := 16
@@ -381,8 +387,8 @@ func testPendingDeduplication(t *testing.T, protocol int) {
381387

382388
// Assemble a tester with a built in counter and delayed fetcher
383389
tester := newTester()
384-
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
385-
bodyFetcher := tester.makeBodyFetcher(blocks, 0)
390+
headerFetcher := tester.makeHeaderFetcher("repeater", blocks, -gatherSlack)
391+
bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0)
386392

387393
delay := 50 * time.Millisecond
388394
counter := uint32(0)
@@ -425,8 +431,8 @@ func testRandomArrivalImport(t *testing.T, protocol int) {
425431
skip := targetBlocks / 2
426432

427433
tester := newTester()
428-
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
429-
bodyFetcher := tester.makeBodyFetcher(blocks, 0)
434+
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
435+
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
430436

431437
// Iteratively announce blocks, skipping one entry
432438
imported := make(chan *types.Block, len(hashes)-1)
@@ -456,8 +462,8 @@ func testQueueGapFill(t *testing.T, protocol int) {
456462
skip := targetBlocks / 2
457463

458464
tester := newTester()
459-
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
460-
bodyFetcher := tester.makeBodyFetcher(blocks, 0)
465+
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
466+
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
461467

462468
// Iteratively announce blocks, skipping one entry
463469
imported := make(chan *types.Block, len(hashes)-1)
@@ -486,8 +492,8 @@ func testImportDeduplication(t *testing.T, protocol int) {
486492

487493
// Create the tester and wrap the importer with a counter
488494
tester := newTester()
489-
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
490-
bodyFetcher := tester.makeBodyFetcher(blocks, 0)
495+
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
496+
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
491497

492498
counter := uint32(0)
493499
tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) {
@@ -570,8 +576,8 @@ func testDistantAnnouncementDiscarding(t *testing.T, protocol int) {
570576
tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
571577
tester.lock.Unlock()
572578

573-
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
574-
bodyFetcher := tester.makeBodyFetcher(blocks, 0)
579+
headerFetcher := tester.makeHeaderFetcher("lower", blocks, -gatherSlack)
580+
bodyFetcher := tester.makeBodyFetcher("lower", blocks, 0)
575581

576582
fetching := make(chan struct{}, 2)
577583
tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} }
@@ -603,14 +609,14 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
603609
hashes, blocks := makeChain(1, 0, genesis)
604610

605611
tester := newTester()
606-
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
607-
bodyFetcher := tester.makeBodyFetcher(blocks, 0)
612+
badHeaderFetcher := tester.makeHeaderFetcher("bad", blocks, -gatherSlack)
613+
badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0)
608614

609615
imported := make(chan *types.Block)
610616
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
611617

612618
// Announce a block with a bad number, check for immediate drop
613-
tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
619+
tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher)
614620
verifyImportEvent(t, imported, false)
615621

616622
tester.lock.RLock()
@@ -620,8 +626,11 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
620626
if !dropped {
621627
t.Fatalf("peer with invalid numbered announcement not dropped")
622628
}
629+
630+
goodHeaderFetcher := tester.makeHeaderFetcher("good", blocks, -gatherSlack)
631+
goodBodyFetcher := tester.makeBodyFetcher("good", blocks, 0)
623632
// Make sure a good announcement passes without a drop
624-
tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
633+
tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), goodHeaderFetcher, goodBodyFetcher)
625634
verifyImportEvent(t, imported, true)
626635

627636
tester.lock.RLock()
@@ -645,8 +654,8 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
645654
hashes, blocks := makeChain(32, 0, genesis)
646655

647656
tester := newTester()
648-
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
649-
bodyFetcher := tester.makeBodyFetcher(blocks, 0)
657+
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
658+
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
650659

651660
// Add a monitoring hook for all internal events
652661
fetching := make(chan []common.Hash)
@@ -697,12 +706,12 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
697706
// Create a valid chain and an infinite junk chain
698707
targetBlocks := hashLimit + 2*maxQueueDist
699708
hashes, blocks := makeChain(targetBlocks, 0, genesis)
700-
validHeaderFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
701-
validBodyFetcher := tester.makeBodyFetcher(blocks, 0)
709+
validHeaderFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
710+
validBodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
702711

703712
attack, _ := makeChain(targetBlocks, 0, unknownBlock)
704-
attackerHeaderFetcher := tester.makeHeaderFetcher(nil, -gatherSlack)
705-
attackerBodyFetcher := tester.makeBodyFetcher(nil, 0)
713+
attackerHeaderFetcher := tester.makeHeaderFetcher("attacker", nil, -gatherSlack)
714+
attackerBodyFetcher := tester.makeBodyFetcher("attacker", nil, 0)
706715

707716
// Feed the tester a huge hashset from the attacker, and a limited from the valid peer
708717
for i := 0; i < len(attack); i++ {

eth/handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
450450
return nil
451451
}
452452
// Irrelevant of the fork checks, send the header to the fetcher just in case
453-
headers = pm.fetcher.FilterHeaders(headers, time.Now())
453+
headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
454454
}
455455
if len(headers) > 0 || !filter {
456456
err := pm.downloader.DeliverHeaders(p.id, headers)
@@ -503,7 +503,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
503503
// Filter out any explicitly requested bodies, deliver the rest to the downloader
504504
filter := len(trasactions) > 0 || len(uncles) > 0
505505
if filter {
506-
trasactions, uncles = pm.fetcher.FilterBodies(trasactions, uncles, time.Now())
506+
trasactions, uncles = pm.fetcher.FilterBodies(p.id, trasactions, uncles, time.Now())
507507
}
508508
if len(trasactions) > 0 || len(uncles) > 0 || !filter {
509509
err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)

0 commit comments

Comments
 (0)