Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

Commit c2a9238

Browse files
authored
feat!: specify stream limits (#348)
Adds two config params `maxInboundStreams` and `maxOutboundStreams` that allow us to configure how many protocol streams are allowed to be running in parallel. Initial default values are completely arbitrary, will need to tune these for typical workloads. BREAKING CHANGE: Updates to new registrar API
1 parent 1dfc744 commit c2a9238

File tree

8 files changed

+42
-18
lines changed

8 files changed

+42
-18
lines changed

package.json

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,22 +133,22 @@
133133
"release": "aegir release"
134134
},
135135
"dependencies": {
136-
"@libp2p/components": "^1.0.0",
136+
"@libp2p/components": "^2.0.0",
137137
"@libp2p/crypto": "^1.0.0",
138138
"@libp2p/interface-address-manager": "^1.0.1",
139-
"@libp2p/interface-connection": "^1.0.1",
139+
"@libp2p/interface-connection": "^2.0.0",
140140
"@libp2p/interface-connection-manager": "^1.0.0",
141141
"@libp2p/interface-dht": "^1.0.0",
142142
"@libp2p/interface-peer-discovery": "^1.0.0",
143143
"@libp2p/interface-peer-id": "^1.0.2",
144144
"@libp2p/interface-peer-info": "^1.0.1",
145145
"@libp2p/interface-peer-store": "^1.0.0",
146-
"@libp2p/interface-registrar": "^1.0.0",
146+
"@libp2p/interface-registrar": "^2.0.0",
147147
"@libp2p/interfaces": "^3.0.2",
148-
"@libp2p/logger": "^1.1.6",
148+
"@libp2p/logger": "^2.0.0",
149149
"@libp2p/peer-id": "^1.1.13",
150150
"@libp2p/record": "^2.0.0",
151-
"@libp2p/topology": "^2.0.0",
151+
"@libp2p/topology": "^3.0.0",
152152
"@multiformats/multiaddr": "^10.1.5",
153153
"abortable-iterator": "^4.0.2",
154154
"any-signal": "^3.0.0",
@@ -178,9 +178,9 @@
178178
"varint": "^6.0.0"
179179
},
180180
"devDependencies": {
181-
"@libp2p/interface-mocks": "^1.0.1",
181+
"@libp2p/interface-mocks": "^2.0.0",
182182
"@libp2p/peer-id-factory": "^1.0.9",
183-
"@libp2p/peer-store": "^2.0.0",
183+
"@libp2p/peer-store": "^3.0.0",
184184
"@types/lodash.random": "^3.2.6",
185185
"@types/lodash.range": "^3.2.6",
186186
"@types/varint": "^6.0.0",

src/index.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ export interface KadDHTInit {
4545
* be evicted from the routing table or not (default 10)
4646
*/
4747
pingConcurrency?: number
48+
49+
/**
50+
* How many parallel incoming streams to allow on the DHT protocol per-connection
51+
*/
52+
maxInboundStreams?: number
53+
54+
/**
55+
* How many parallel outgoing streams to allow on the DHT protocol per-connection
56+
*/
57+
maxOutboundStreams?: number
4858
}
4959

5060
export class KadDHT extends DualKadDHT {

src/kad-dht.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ import { selectors as recordSelectors } from '@libp2p/record/selectors'
2727
import { symbol } from '@libp2p/interface-peer-discovery'
2828
import { PROTOCOL_DHT, PROTOCOL_PREFIX, LAN_PREFIX } from './constants.js'
2929

30+
export const DEFAULT_MAX_INBOUND_STREAMS = 32
31+
export const DEFAULT_MAX_OUTBOUND_STREAMS = 64
32+
3033
export interface SingleKadDHTInit extends KadDHTInit {
3134
/**
3235
* Whether to start up in lan or wan mode
@@ -60,6 +63,8 @@ export class KadDHT extends EventEmitter<PeerDiscoveryEvents> implements DHT, In
6063
private readonly rpc: RPC
6164
private readonly topologyListener: TopologyListener
6265
private readonly querySelf: QuerySelf
66+
private readonly maxInboundStreams: number
67+
private readonly maxOutboundStreams: number
6368

6469
/**
6570
* Create a new KadDHT
@@ -76,7 +81,9 @@ export class KadDHT extends EventEmitter<PeerDiscoveryEvents> implements DHT, In
7681
lan,
7782
protocolPrefix,
7883
pingTimeout,
79-
pingConcurrency
84+
pingConcurrency,
85+
maxInboundStreams,
86+
maxOutboundStreams
8087
} = init
8188

8289
this.running = false
@@ -85,6 +92,8 @@ export class KadDHT extends EventEmitter<PeerDiscoveryEvents> implements DHT, In
8592
this.protocol = `${protocolPrefix ?? PROTOCOL_PREFIX}${lan === true ? LAN_PREFIX : ''}${PROTOCOL_DHT}`
8693
this.kBucketSize = kBucketSize ?? 20
8794
this.clientMode = clientMode ?? true
95+
this.maxInboundStreams = maxInboundStreams ?? DEFAULT_MAX_INBOUND_STREAMS
96+
this.maxOutboundStreams = maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS
8897
this.routingTable = new RoutingTable({
8998
kBucketSize,
9099
lan: this.lan,
@@ -264,7 +273,10 @@ export class KadDHT extends EventEmitter<PeerDiscoveryEvents> implements DHT, In
264273
} else {
265274
this.log('enabling server mode')
266275
this.clientMode = false
267-
await this.components.getRegistrar().handle(this.protocol, this.rpc.onIncomingStream.bind(this.rpc))
276+
await this.components.getRegistrar().handle(this.protocol, this.rpc.onIncomingStream.bind(this.rpc), {
277+
maxInboundStreams: this.maxInboundStreams,
278+
maxOutboundStreams: this.maxOutboundStreams
279+
})
268280
}
269281
}
270282

src/network.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,7 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable, I
9797

9898
try {
9999
const connection = await this.components.getConnectionManager().openConnection(to, options)
100-
const streamData = await connection.newStream(this.protocol, options)
101-
stream = streamData.stream
100+
const stream = await connection.newStream(this.protocol, options)
102101

103102
const response = await this._writeReadMessage(stream, msg.serialize(), options)
104103

@@ -134,8 +133,7 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable, I
134133

135134
try {
136135
const connection = await this.components.getConnectionManager().openConnection(to, options)
137-
const data = await connection.newStream(this.protocol, options)
138-
stream = data.stream
136+
const stream = await connection.newStream(this.protocol, options)
139137

140138
await this._writeMessage(stream, msg.serialize(), options)
141139

src/routing-table/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ export class RoutingTable implements Startable, Initializable {
158158

159159
this.log('pinging old contact %p', oldContact.peer)
160160
const connection = await this.components.getConnectionManager().openConnection(oldContact.peer, options)
161-
const { stream } = await connection.newStream(this.protocol, options)
161+
const stream = await connection.newStream(this.protocol, options)
162162
stream.close()
163163
responded++
164164
} catch (err: any) {

test/network.spec.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,14 @@ describe('Network', () => {
8080
finish()
8181
}
8282

83+
const stream = mockStream({ source, sink })
84+
8385
return {
84-
protocol,
85-
stream: mockStream({ source, sink })
86+
...stream,
87+
stat: {
88+
...stream.stat,
89+
protocol
90+
}
8691
}
8792
}
8893
}

test/routing-table.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ describe('Routing Table', () => {
126126
table.kb.add(oldPeer)
127127

128128
// simulate connection succeeding
129-
const newStreamStub = sinon.stub().withArgs(PROTOCOL_DHT).resolves({ stream: { close: sinon.stub() } })
129+
const newStreamStub = sinon.stub().withArgs(PROTOCOL_DHT).resolves({ close: sinon.stub() })
130130
const openConnectionStub = sinon.stub().withArgs(oldPeer.peer).resolves({
131131
newStream: newStreamStub
132132
})

test/rpc/index.spec.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ describe('rpc', () => {
9393
}
9494

9595
await rpc.onIncomingStream({
96-
protocol: 'protocol',
9796
stream: mockStream(duplexStream),
9897
connection: await mockConnection(mockMultiaddrConnection(duplexStream, otherPeerId))
9998
})

0 commit comments

Comments
 (0)