Skip to content

Commit c258b35

Browse files
authored
fix: simplify connection upgrade (#2719)
- Unifies inbound/outbound upgrade - Adds default timeouts for inbound/outbound upgrades and stream protocol negotation - Ensure tcp listener closes sockets when upgrade times out - Passes abort signal to connection encrypter - Uses simple promise rejection to abort tcp connections
1 parent 33f464e commit c258b35

File tree

10 files changed

+232
-288
lines changed

10 files changed

+232
-288
lines changed

packages/interface-compliance-tests/src/transport/listen-test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ export default (common: TestSetup<TransportTestFixtures>): void => {
106106
expect(upgradeSpy.callCount).to.equal(2)
107107
})
108108

109-
it('should not handle connection if upgradeInbound throws', async () => {
110-
sinon.stub(upgrader, 'upgradeInbound').throws()
109+
it('should not handle connection if upgradeInbound rejects', async () => {
110+
sinon.stub(upgrader, 'upgradeInbound').rejects()
111111

112112
const listen = listener.createListener({
113113
upgrader

packages/interface-internal/src/connection-manager/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ export interface ConnectionManager {
7171
acceptIncomingConnection(maConn: MultiaddrConnection): Promise<boolean>
7272

7373
/**
74-
* Invoked after upgrading a multiaddr connection has finished
74+
* Invoked after upgrading an inbound multiaddr connection has finished
7575
*/
7676
afterUpgradeInbound(): void
7777

packages/interface/src/connection/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ export interface ConnectionProtector {
354354
* between its two peers from the PSK the Protector instance was
355355
* created with.
356356
*/
357-
protect(connection: MultiaddrConnection): Promise<MultiaddrConnection>
357+
protect(connection: MultiaddrConnection, options?: AbortOptions): Promise<MultiaddrConnection>
358358
}
359359

360360
export interface MultiaddrConnectionTimeline {

packages/libp2p/src/connection-manager/constants.defaults.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@
44
export const DIAL_TIMEOUT = 5e3
55

66
/**
7-
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#inboundUpgradeTimeout
7+
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#upgradeTimeout
88
*/
9-
export const INBOUND_UPGRADE_TIMEOUT = 2e3
9+
export const UPGRADE_TIMEOUT = 3e3
10+
11+
/**
12+
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#protocolNegotiationTimeout
13+
*/
14+
export const PROTOCOL_NEGOTIATION_TIMEOUT = 2e3
1015

1116
/**
1217
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxPeerAddrsToDial

packages/libp2p/src/connection-manager/index.ts

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,34 @@ export interface ConnectionManagerInit {
5757
/**
5858
* How long a dial attempt is allowed to take, including DNS resolution
5959
* of the multiaddr, opening a socket and upgrading it to a Connection.
60+
*
61+
* @default 5000
6062
*/
6163
dialTimeout?: number
6264

6365
/**
64-
* When a new inbound connection is opened, the upgrade process (e.g. protect,
65-
* encrypt, multiplex etc) must complete within this number of ms.
66+
* When a new incoming connection is opened, the upgrade process (e.g.
67+
* protect, encrypt, multiplex etc) must complete within this number of ms.
6668
*
67-
* @default 30000
69+
* @default 3000
6870
*/
6971
inboundUpgradeTimeout?: number
7072

73+
/**
74+
* When a new outbound connection is opened, the upgrade process (e.g.
75+
* protect, encrypt, multiplex etc) must complete within this number of ms.
76+
*
77+
* @default 3000
78+
*/
79+
outboundUpgradeTimeout?: number
80+
81+
/**
82+
* Protocol negotiation must complete within this number of ms
83+
*
84+
* @default 2000
85+
*/
86+
protocolNegotiationTimeout?: number
87+
7188
/**
7289
* Multiaddr resolvers to use when dialling
7390
*/
@@ -164,7 +181,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
164181
private readonly deny: Multiaddr[]
165182
private readonly maxIncomingPendingConnections: number
166183
private incomingPendingConnections: number
167-
private outboundPendingConnections: number
168184
private readonly maxConnections: number
169185

170186
public readonly dialQueue: DialQueue
@@ -203,7 +219,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
203219
this.allow = (init.allow ?? []).map(ma => multiaddr(ma))
204220
this.deny = (init.deny ?? []).map(ma => multiaddr(ma))
205221

206-
this.outboundPendingConnections = 0
207222
this.incomingPendingConnections = 0
208223
this.maxIncomingPendingConnections = init.maxIncomingPendingConnections ?? defaultOptions.maxIncomingPendingConnections
209224

@@ -266,8 +281,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
266281
const metric = {
267282
inbound: 0,
268283
'inbound pending': this.incomingPendingConnections,
269-
outbound: 0,
270-
'outbound pending': this.outboundPendingConnections
284+
outbound: 0
271285
}
272286

273287
for (const conns of this.connections.values()) {
@@ -468,54 +482,48 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
468482

469483
options.signal?.throwIfAborted()
470484

471-
try {
472-
this.outboundPendingConnections++
485+
const { peerId } = getPeerAddress(peerIdOrMultiaddr)
473486

474-
const { peerId } = getPeerAddress(peerIdOrMultiaddr)
487+
if (peerId != null && options.force !== true) {
488+
this.log('dial %p', peerId)
489+
const existingConnection = this.getConnections(peerId)
490+
.find(conn => conn.limits == null)
475491

476-
if (peerId != null && options.force !== true) {
477-
this.log('dial %p', peerId)
478-
const existingConnection = this.getConnections(peerId)
479-
.find(conn => conn.limits == null)
492+
if (existingConnection != null) {
493+
this.log('had an existing non-limited connection to %p', peerId)
480494

481-
if (existingConnection != null) {
482-
this.log('had an existing non-limited connection to %p', peerId)
483-
484-
options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected'))
485-
return existingConnection
486-
}
495+
options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected'))
496+
return existingConnection
487497
}
498+
}
488499

489-
const connection = await this.dialQueue.dial(peerIdOrMultiaddr, {
490-
...options,
491-
priority: options.priority ?? DEFAULT_DIAL_PRIORITY
492-
})
493-
let peerConnections = this.connections.get(connection.remotePeer)
494-
495-
if (peerConnections == null) {
496-
peerConnections = []
497-
this.connections.set(connection.remotePeer, peerConnections)
498-
}
500+
const connection = await this.dialQueue.dial(peerIdOrMultiaddr, {
501+
...options,
502+
priority: options.priority ?? DEFAULT_DIAL_PRIORITY
503+
})
504+
let peerConnections = this.connections.get(connection.remotePeer)
499505

500-
// we get notified of connections via the Upgrader emitting "connection"
501-
// events, double check we aren't already tracking this connection before
502-
// storing it
503-
let trackedConnection = false
506+
if (peerConnections == null) {
507+
peerConnections = []
508+
this.connections.set(connection.remotePeer, peerConnections)
509+
}
504510

505-
for (const conn of peerConnections) {
506-
if (conn.id === connection.id) {
507-
trackedConnection = true
508-
}
509-
}
511+
// we get notified of connections via the Upgrader emitting "connection"
512+
// events, double check we aren't already tracking this connection before
513+
// storing it
514+
let trackedConnection = false
510515

511-
if (!trackedConnection) {
512-
peerConnections.push(connection)
516+
for (const conn of peerConnections) {
517+
if (conn.id === connection.id) {
518+
trackedConnection = true
513519
}
520+
}
514521

515-
return connection
516-
} finally {
517-
this.outboundPendingConnections--
522+
if (!trackedConnection) {
523+
peerConnections.push(connection)
518524
}
525+
526+
return connection
519527
}
520528

521529
async closeConnections (peerId: PeerId, options: AbortOptions = {}): Promise<void> {

packages/libp2p/src/libp2p.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter
110110
this.components.upgrader = new DefaultUpgrader(this.components, {
111111
connectionEncrypters: (init.connectionEncrypters ?? []).map((fn, index) => this.configureComponent(`connection-encryption-${index}`, fn(this.components))),
112112
streamMuxers: (init.streamMuxers ?? []).map((fn, index) => this.configureComponent(`stream-muxers-${index}`, fn(this.components))),
113-
inboundUpgradeTimeout: init.connectionManager?.inboundUpgradeTimeout
113+
inboundUpgradeTimeout: init.connectionManager?.inboundUpgradeTimeout,
114+
outboundUpgradeTimeout: init.connectionManager?.outboundUpgradeTimeout
114115
})
115116

116117
// Setup the transport manager

0 commit comments

Comments
 (0)