Skip to content

Commit 816e0bc

Browse files
authored
Merge pull request #1036 from ethereumjs/network-improvements
Network improvements
2 parents 5ac5bc7 + c110b70 commit 816e0bc

File tree

8 files changed

+69
-25
lines changed

8 files changed

+69
-25
lines changed

packages/client/lib/sync/fullsync.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,17 @@ export class FullSynchronizer extends Synchronizer {
7171
let oldHead = Buffer.alloc(0)
7272
const newHeadBlock = await this.vm.blockchain.getHead()
7373
let newHead = newHeadBlock.hash()
74-
const firstHeadBlock = newHeadBlock
74+
let firstHeadBlock = newHeadBlock
7575
let lastHeadBlock = newHeadBlock
7676
while (!newHead.equals(oldHead) && !this.stopSyncing) {
7777
oldHead = newHead
7878
this.vmPromise = this.vm.runBlockchain(this.vm.blockchain, 1)
7979
await this.vmPromise
8080
const headBlock = await this.vm.blockchain.getHead()
8181
newHead = headBlock.hash()
82+
if (blockCounter === 0) {
83+
firstHeadBlock = headBlock
84+
}
8285
// check if we did run a new block:
8386
if (!newHead.equals(oldHead)) {
8487
blockCounter += 1

packages/devp2p/src/dpt/dpt.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ export class DPT extends EventEmitter {
6868
this._kbucket = new KBucket(this._id)
6969
this._kbucket.on('added', (peer: PeerInfo) => this.emit('peer:added', peer))
7070
this._kbucket.on('removed', (peer: PeerInfo) => this.emit('peer:removed', peer))
71-
this._kbucket.on('ping', this._onKBucketPing)
71+
this._kbucket.on('ping', this._onKBucketPing.bind(this))
7272

7373
this._server = new DPTServer(this, this.privateKey, {
7474
timeout: options.timeout,
@@ -108,15 +108,23 @@ export class DPT extends EventEmitter {
108108
})
109109
.then(() => {
110110
if (++count < oldPeers.length) return
111-
112111
if (err === null) this.banlist.add(newPeer, ms('5m'))
113112
else this._kbucket.add(newPeer)
114113
})
115114
}
116115
}
117116

118-
_onServerPeers(peers: any[]): void {
119-
for (const peer of peers) this.addPeer(peer).catch(() => {})
117+
_onServerPeers(peers: PeerInfo[]): void {
118+
const DIFF_TIME_MS = 200
119+
let ms = 0
120+
for (const peer of peers) {
121+
setTimeout(() => {
122+
this.addPeer(peer).catch((error) => {
123+
this.emit('error', error)
124+
})
125+
}, ms)
126+
ms += DIFF_TIME_MS
127+
}
120128
}
121129

122130
async bootstrap(peer: PeerInfo): Promise<void> {
@@ -173,7 +181,9 @@ export class DPT extends EventEmitter {
173181
this._refreshIntervalSelectionCounter = (this._refreshIntervalSelectionCounter + 1) % 10
174182

175183
const peers = this.getPeers()
176-
debug(`call .refresh (${peers.length} peers in table)`)
184+
debug(
185+
`call .refresh() (selector ${this._refreshIntervalSelectionCounter}) (${peers.length} peers in table)`
186+
)
177187

178188
for (const peer of peers) {
179189
// Randomly distributed selector based on peer ID

packages/devp2p/src/dpt/kbucket.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ export class KBucket extends EventEmitter {
3232
this.emit('removed', peer)
3333
})
3434

35-
this._kbucket.on('ping', (oldPeers: PeerInfo[], newPeer: PeerInfo) => {
36-
this.emit('ping', { oldPeers, newPeer })
35+
this._kbucket.on('ping', (oldPeers: PeerInfo[], newPeer: PeerInfo | undefined) => {
36+
this.emit('ping', oldPeers, newPeer)
3737
})
3838
}
3939

packages/devp2p/src/rlpx/peer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,7 @@ export class Peer extends EventEmitter {
529529
break
530530
}
531531
} catch (err) {
532+
this.disconnect(DISCONNECT_REASONS.SUBPROTOCOL_ERROR)
532533
debug(`Error on peer socket data handling: ${err}`)
533534
this.emit('error', err)
534535
}

packages/devp2p/src/rlpx/rlpx.ts

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import LRUCache from 'lru-cache'
88
import Common from '@ethereumjs/common'
99
// note: relative path only valid in .js file in dist
1010
const { version: pVersion } = require('../../package.json')
11-
import { pk2id, createDeferred, formatLogId } from '../util'
11+
import { pk2id, createDeferred, formatLogId, buffer2int } from '../util'
1212
import { Peer, DISCONNECT_REASONS, Capabilities } from './peer'
1313
import { DPT, PeerInfo } from '../dpt'
1414

@@ -31,6 +31,7 @@ export interface RLPxOptions {
3131
export class RLPx extends EventEmitter {
3232
_privateKey: Buffer
3333
_id: Buffer
34+
3435
_timeout: number
3536
_maxPeers: number
3637
_clientId: Buffer
@@ -39,11 +40,14 @@ export class RLPx extends EventEmitter {
3940
_common: Common
4041
_listenPort: number | null
4142
_dpt: DPT | null
43+
4244
_peersLRU: LRUCache<string, boolean>
4345
_peersQueue: { peer: PeerInfo; ts: number }[]
4446
_server: net.Server | null
4547
_peers: Map<string, net.Socket | Peer>
48+
4649
_refillIntervalId: NodeJS.Timeout
50+
_refillIntervalSelectionCounter: number = 0
4751

4852
constructor(privateKey: Buffer, options: RLPxOptions) {
4953
super()
@@ -80,9 +84,11 @@ export class RLPx extends EventEmitter {
8084
if (this._getOpenSlots() > 0) return this._connectToPeer(peer)
8185
this._peersQueue.push({ peer: peer, ts: 0 }) // save to queue
8286
})
83-
this._dpt.on('peer:removed', (peer: any) => {
87+
this._dpt.on('peer:removed', (peer: PeerInfo) => {
8488
// remove from queue
85-
this._peersQueue = this._peersQueue.filter((item: any) => !item.peer.id.equals(peer.id))
89+
this._peersQueue = this._peersQueue.filter(
90+
(item) => !(item.peer.id! as Buffer).equals(peer.id as Buffer)
91+
)
8692
})
8793
}
8894

@@ -96,7 +102,9 @@ export class RLPx extends EventEmitter {
96102
this._peers = new Map()
97103
this._peersQueue = []
98104
this._peersLRU = new LRUCache({ max: 25000 })
99-
this._refillIntervalId = setInterval(() => this._refillConnections(), ms('10s'))
105+
const REFILL_INTERVALL = ms('10s')
106+
const refillIntervalSubdivided = Math.floor(REFILL_INTERVALL / 10)
107+
this._refillIntervalId = setInterval(() => this._refillConnections(), refillIntervalSubdivided)
100108
}
101109

102110
listen(...args: any[]) {
@@ -259,17 +267,27 @@ export class RLPx extends EventEmitter {
259267
_refillConnections() {
260268
if (!this._isAlive()) return
261269
debug(
262-
`refill connections.. queue size: ${this._peersQueue.length}, peers: ${
270+
`refill connections.. (selector ${this._refillIntervalSelectionCounter}) peers: ${
263271
this._peers.size
264-
}, open slots: ${this._getOpenSlots()}`
272+
}, queue size: ${this._peersQueue.length}, open slots: ${this._getOpenSlots()}`
265273
)
274+
// Rotating selection counter going in loop from 0..9
275+
this._refillIntervalSelectionCounter = (this._refillIntervalSelectionCounter + 1) % 10
266276

267-
this._peersQueue = this._peersQueue.filter((item: any) => {
277+
this._peersQueue = this._peersQueue.filter((item) => {
268278
if (this._getOpenSlots() === 0) return true
269279
if (item.ts > Date.now()) return true
270280

271-
this._connectToPeer(item.peer)
272-
return false
281+
// Randomly distributed selector based on peer ID
282+
// to decide on subdivided execution
283+
const selector = buffer2int((item.peer.id! as Buffer).slice(0, 1)) % 10
284+
if (selector === this._refillIntervalSelectionCounter) {
285+
this._connectToPeer(item.peer)
286+
return false
287+
} else {
288+
// Still keep peer in queue
289+
return true
290+
}
273291
})
274292
}
275293
}

packages/devp2p/src/util.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import { randomBytes } from 'crypto'
33
import { privateKeyVerify, publicKeyConvert } from 'secp256k1'
44
import createKeccakHash from 'keccak'
55
import { decode } from 'rlp'
6+
import { ETH } from './eth'
7+
import { LES } from './les'
68

79
export function keccak256(...buffers: Buffer[]) {
810
const buffer = Buffer.concat(buffers)
@@ -53,7 +55,14 @@ export function xor(a: Buffer, b: any): Buffer {
5355
return buffer
5456
}
5557

56-
export function assertEq(expected: any, actual: any, msg: string, debug: any): void {
58+
type assertInput = Buffer | Buffer[] | ETH.StatusMsg | LES.Status | number | null
59+
60+
export function assertEq(
61+
expected: assertInput,
62+
actual: assertInput,
63+
msg: string,
64+
debug: Function
65+
): void {
5766
let message
5867
if (Buffer.isBuffer(expected) && Buffer.isBuffer(actual)) {
5968
if (expected.equals(actual)) return

packages/devp2p/test/integration/dpt-simulator.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ test('DPT: remove node', async (t) => {
3232
async.series(
3333
[
3434
function (cb) {
35-
dpts[0].on('peer:added', function (peer: any) {
35+
dpts[0].on('peer:added', function (peer) {
3636
dpts[0].removePeer(peer)
3737
cb(null)
3838
})
@@ -64,13 +64,13 @@ test('DPT: ban node', async (t) => {
6464
async.series(
6565
[
6666
function (cb) {
67-
dpts[0].on('peer:added', function (peer: any) {
67+
dpts[0].on('peer:added', function (peer) {
6868
dpts[0].banPeer(peer)
6969
cb(null)
7070
})
7171
},
7272
function (cb) {
73-
dpts[0].on('peer:removed', function (peer: any) {
73+
dpts[0].on('peer:removed', function (peer) {
7474
t.equal(dpts[0].banlist.has(peer), true, 'ban-list should contain peer')
7575
t.equal(
7676
dpts[0].getPeers().length,
@@ -150,11 +150,14 @@ test('DPT: simulate bootstrap', async (t) => {
150150
}
151151

152152
await delay(250)
153-
util.destroyDPTs(dpts)
154153

155154
// dpts.forEach((dpt, i) => console.log(`${i}:${dpt.getPeers().length}`))
156-
for (const dpt of dpts)
155+
for (const dpt of dpts) {
157156
t.equal(dpt.getPeers().length, numDPTs, 'Peers should be distributed to all DPTs')
157+
}
158+
await delay(1000)
159+
160+
util.destroyDPTs(dpts)
158161

159162
t.end()
160163
})

packages/devp2p/test/integration/util.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import Common from '@ethereumjs/common'
55
export const localhost = '127.0.0.1'
66
export const basePort = 30306
77

8-
export function getTestDPTs(numDPTs: any) {
8+
export function getTestDPTs(numDPTs: number) {
99
const dpts = []
1010

1111
for (let i = 0; i < numDPTs; ++i) {
@@ -30,7 +30,7 @@ export function initTwoPeerDPTSetup() {
3030
return dpts
3131
}
3232

33-
export function destroyDPTs(dpts: any) {
33+
export function destroyDPTs(dpts: DPT[]) {
3434
for (const dpt of dpts) dpt.destroy()
3535
}
3636

0 commit comments

Comments
 (0)