@@ -8,7 +8,7 @@ import LRUCache from 'lru-cache'
8
8
import Common from '@ethereumjs/common'
9
9
// note: relative path only valid in .js file in dist
10
10
const { version : pVersion } = require ( '../../package.json' )
11
- import { pk2id , createDeferred , formatLogId } from '../util'
11
+ import { pk2id , createDeferred , formatLogId , buffer2int } from '../util'
12
12
import { Peer , DISCONNECT_REASONS , Capabilities } from './peer'
13
13
import { DPT , PeerInfo } from '../dpt'
14
14
@@ -31,6 +31,7 @@ export interface RLPxOptions {
31
31
export class RLPx extends EventEmitter {
32
32
_privateKey : Buffer
33
33
_id : Buffer
34
+
34
35
_timeout : number
35
36
_maxPeers : number
36
37
_clientId : Buffer
@@ -39,11 +40,14 @@ export class RLPx extends EventEmitter {
39
40
_common : Common
40
41
_listenPort : number | null
41
42
_dpt : DPT | null
43
+
42
44
_peersLRU : LRUCache < string , boolean >
43
45
_peersQueue : { peer : PeerInfo ; ts : number } [ ]
44
46
_server : net . Server | null
45
47
_peers : Map < string , net . Socket | Peer >
48
+
46
49
_refillIntervalId : NodeJS . Timeout
50
+ _refillIntervalSelectionCounter : number = 0
47
51
48
52
constructor ( privateKey : Buffer , options : RLPxOptions ) {
49
53
super ( )
@@ -98,7 +102,9 @@ export class RLPx extends EventEmitter {
98
102
this . _peers = new Map ( )
99
103
this . _peersQueue = [ ]
100
104
this . _peersLRU = new LRUCache ( { max : 25000 } )
101
- this . _refillIntervalId = setInterval ( ( ) => this . _refillConnections ( ) , ms ( '10s' ) )
105
+ const REFILL_INTERVALL = ms ( '10s' )
106
+ const refillIntervalSubdivided = Math . floor ( REFILL_INTERVALL / 10 )
107
+ this . _refillIntervalId = setInterval ( ( ) => this . _refillConnections ( ) , refillIntervalSubdivided )
102
108
}
103
109
104
110
listen ( ...args : any [ ] ) {
@@ -261,17 +267,27 @@ export class RLPx extends EventEmitter {
261
267
_refillConnections ( ) {
262
268
if ( ! this . _isAlive ( ) ) return
263
269
debug (
264
- `refill connections.. peers: ${ this . _peers . size } , queue size : ${
265
- this . _peersQueue . length
266
- } , open slots: ${ this . _getOpenSlots ( ) } `
270
+ `refill connections.. (selector ${ this . _refillIntervalSelectionCounter } ) peers : ${
271
+ this . _peers . size
272
+ } , queue size: ${ this . _peersQueue . length } , open slots: ${ this . _getOpenSlots ( ) } `
267
273
)
274
+ // Rotating selection counter going in loop from 0..9
275
+ this . _refillIntervalSelectionCounter = ( this . _refillIntervalSelectionCounter + 1 ) % 10
268
276
269
277
this . _peersQueue = this . _peersQueue . filter ( ( item ) => {
270
278
if ( this . _getOpenSlots ( ) === 0 ) return true
271
279
if ( item . ts > Date . now ( ) ) return true
272
280
273
- this . _connectToPeer ( item . peer )
274
- return false
281
+ // Randomly distributed selector based on peer ID
282
+ // to decide on subdivided execution
283
+ const selector = buffer2int ( ( item . peer . id ! as Buffer ) . slice ( 0 , 1 ) ) % 10
284
+ if ( selector === this . _refillIntervalSelectionCounter ) {
285
+ this . _connectToPeer ( item . peer )
286
+ return false
287
+ } else {
288
+ // Still keep peer in queue
289
+ return true
290
+ }
275
291
} )
276
292
}
277
293
}
0 commit comments