Skip to content

Commit 90cca82

Browse files
authored
fix: ensure that the upgrader applies timeouts to incoming dials (#3000)
Refactors transports to ensure that the upgrader applies the configured incoming upgrade timeout instead of the individual transports. Where a transport performs part of the upgrade themselves they can obtain an abort signal from the upgrader which will be preconfigured with the correct incoming upgrade timeout. Deprecates incoming upgrade timeout config keys where they exist for transports, and also the `outboundUpgradeTimeout` key for the connection manager since it's covered by the `dialTimeout` key.
1 parent 6f8cfea commit 90cca82

File tree

40 files changed

+342
-318
lines changed

40 files changed

+342
-318
lines changed

packages/integration-tests/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@
3939
"@libp2p/bootstrap": "^11.0.13",
4040
"@libp2p/circuit-relay-v2": "^3.1.3",
4141
"@libp2p/crypto": "^5.0.7",
42-
"@libp2p/daemon-client": "^9.0.4",
43-
"@libp2p/daemon-server": "^8.0.3",
42+
"@libp2p/daemon-client": "^9.0.5",
43+
"@libp2p/daemon-server": "^8.0.4",
4444
"@libp2p/dcutr": "^2.0.12",
4545
"@libp2p/echo": "^2.1.3",
4646
"@libp2p/fetch": "^3.0.0",

packages/integration-tests/test/connections.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ describe('connections', () => {
263263
})
264264
],
265265
connectionManager: {
266-
outboundUpgradeTimeout: 10
266+
dialTimeout: 10
267267
}
268268
}, {
269269
transports: [

packages/interface-compliance-tests/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@
119119
"@multiformats/multiaddr-matcher": "^1.6.0",
120120
"abortable-iterator": "^5.1.0",
121121
"aegir": "^45.1.1",
122+
"any-signal": "^4.1.1",
122123
"delay": "^6.0.0",
123124
"it-all": "^3.0.6",
124125
"it-byte-stream": "^1.1.0",

packages/interface-compliance-tests/src/mocks/upgrader.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import { setMaxListeners } from '@libp2p/interface'
2+
import { anySignal } from 'any-signal'
13
import { mockConnection } from './connection.js'
2-
import type { Libp2pEvents, Connection, MultiaddrConnection, TypedEventTarget, Upgrader, UpgraderOptions } from '@libp2p/interface'
4+
import type { Libp2pEvents, Connection, MultiaddrConnection, TypedEventTarget, Upgrader, UpgraderOptions, ClearableSignal } from '@libp2p/interface'
35
import type { Registrar } from '@libp2p/interface-internal'
46

57
export interface MockUpgraderInit {
@@ -16,7 +18,7 @@ class MockUpgrader implements Upgrader {
1618
this.events = init.events
1719
}
1820

19-
async upgradeOutbound (multiaddrConnection: MultiaddrConnection, opts: UpgraderOptions = {}): Promise<Connection> {
21+
async upgradeOutbound (multiaddrConnection: MultiaddrConnection, opts: UpgraderOptions): Promise<Connection> {
2022
const connection = mockConnection(multiaddrConnection, {
2123
direction: 'outbound',
2224
registrar: this.registrar,
@@ -28,7 +30,7 @@ class MockUpgrader implements Upgrader {
2830
return connection
2931
}
3032

31-
async upgradeInbound (multiaddrConnection: MultiaddrConnection, opts: UpgraderOptions = {}): Promise<void> {
33+
async upgradeInbound (multiaddrConnection: MultiaddrConnection, opts: UpgraderOptions): Promise<void> {
3234
const connection = mockConnection(multiaddrConnection, {
3335
direction: 'inbound',
3436
registrar: this.registrar,
@@ -37,6 +39,16 @@ class MockUpgrader implements Upgrader {
3739

3840
this.events?.safeDispatchEvent('connection:open', { detail: connection })
3941
}
42+
43+
createInboundAbortSignal (signal?: AbortSignal): ClearableSignal {
44+
const output = anySignal([
45+
AbortSignal.timeout(10_000),
46+
signal
47+
])
48+
setMaxListeners(Infinity, output)
49+
50+
return output
51+
}
4052
}
4153

4254
export function mockUpgrader (init: MockUpgraderInit = {}): Upgrader {

packages/interface/src/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,13 @@ export interface TraceOptions {
768768
trace?: any
769769
}
770770

771+
/**
772+
* A signal that needs to be cleared when no longer in use
773+
*/
774+
export interface ClearableSignal extends AbortSignal {
775+
clear(): void
776+
}
777+
771778
/**
772779
* When a routing operation involves reading values, these options allow
773780
* controlling where the values are read from. By default libp2p will check

packages/interface/src/transport.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { Connection, ConnectionLimits, MultiaddrConnection } from './connection.js'
22
import type { TypedEventTarget } from './event-target.js'
3-
import type { AbortOptions } from './index.js'
3+
import type { AbortOptions, ClearableSignal } from './index.js'
44
import type { StreamMuxerFactory } from './stream-muxer.js'
55
import type { Multiaddr } from '@multiformats/multiaddr'
66
import type { ProgressOptions, ProgressEvent } from 'progress-events'
@@ -58,7 +58,7 @@ export interface CreateListenerOptions {
5858
upgrader: Upgrader
5959
}
6060

61-
export interface DialTransportOptions<DialEvents extends ProgressEvent = ProgressEvent> extends AbortOptions, ProgressOptions<DialEvents> {
61+
export interface DialTransportOptions<DialEvents extends ProgressEvent = ProgressEvent> extends Required<AbortOptions>, ProgressOptions<DialEvents> {
6262
/**
6363
* The upgrader turns a MultiaddrConnection into a Connection which should be
6464
* returned by the transport's dial method
@@ -122,7 +122,7 @@ export enum FaultTolerance {
122122
NO_FATAL
123123
}
124124

125-
export interface UpgraderOptions<ConnectionUpgradeEvents extends ProgressEvent = ProgressEvent> extends ProgressOptions<ConnectionUpgradeEvents>, AbortOptions {
125+
export interface UpgraderOptions<ConnectionUpgradeEvents extends ProgressEvent = ProgressEvent> extends ProgressOptions<ConnectionUpgradeEvents>, Required<AbortOptions> {
126126
skipEncryption?: boolean
127127
skipProtection?: boolean
128128
muxerFactory?: StreamMuxerFactory
@@ -149,4 +149,14 @@ export interface Upgrader {
149149
* notifies other libp2p components about the new connection
150150
*/
151151
upgradeInbound(maConn: MultiaddrConnection, opts?: UpgraderOptions<InboundConnectionUpgradeEvents>): Promise<void>
152+
153+
/**
154+
* Used by transports that perform part of the upgrade process themselves and
155+
* do some async work. This allows configuring inbound upgrade timeouts from a
156+
* single location.
157+
*
158+
* Regular transports should just pass the signal from their shutdown
159+
* controller to `upgradeInbound`.
160+
*/
161+
createInboundAbortSignal (signal: AbortSignal): ClearableSignal
152162
}

packages/libp2p/src/connection-manager/constants.defaults.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
/**
22
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#dialTimeout
33
*/
4-
export const DIAL_TIMEOUT = 5e3
4+
export const DIAL_TIMEOUT = 10_000
55

66
/**
7-
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#upgradeTimeout
7+
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#inboundUpgradeTimeout
88
*/
9-
export const UPGRADE_TIMEOUT = 3e3
9+
export const INBOUND_UPGRADE_TIMEOUT = 10_000
1010

1111
/**
1212
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#protocolNegotiationTimeout
1313
*/
14-
export const PROTOCOL_NEGOTIATION_TIMEOUT = 2e3
14+
export const PROTOCOL_NEGOTIATION_TIMEOUT = 10_000
1515

1616
/**
1717
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxPeerAddrsToDial

packages/libp2p/src/connection-manager/index.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,15 @@ export interface ConnectionManagerInit {
5959
* How long a dial attempt is allowed to take, including DNS resolution
6060
* of the multiaddr, opening a socket and upgrading it to a Connection.
6161
*
62-
* @default 5000
62+
* @default 10_000
6363
*/
6464
dialTimeout?: number
6565

6666
/**
6767
* When a new incoming connection is opened, the upgrade process (e.g.
6868
* protect, encrypt, multiplex etc) must complete within this number of ms.
6969
*
70-
* @default 3000
70+
* @default 10_000
7171
*/
7272
inboundUpgradeTimeout?: number
7373

@@ -77,7 +77,7 @@ export interface ConnectionManagerInit {
7777
*
7878
* Does not apply if an abort signal is passed to the `.dial` method.
7979
*
80-
* @default 3000
80+
* @deprecated This is handled by `dialTimeout`
8181
*/
8282
outboundUpgradeTimeout?: number
8383

@@ -92,16 +92,18 @@ export interface ConnectionManagerInit {
9292
/**
9393
* Outbound protocol negotiation must complete within this number of ms.
9494
*
95-
* Does not apply if an abort signal is passed to the `.dial` method.
95+
* Does not apply if an abort signal is passed to the `.dial` or
96+
* `.dialProtocol` method of the `ConnectionManager` or the `openStream`
97+
* method of the `Connection`.
9698
*
97-
* @default 2000
99+
* @default 10_000
98100
*/
99101
outboundStreamProtocolNegotiationTimeout?: number
100102

101103
/**
102104
* Inbound protocol negotiation must complete within this number of ms
103105
*
104-
* @default 2000
106+
* @default 10_000
105107
*/
106108
inboundStreamProtocolNegotiationTimeout?: number
107109

packages/libp2p/src/libp2p.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import { DefaultPeerRouting } from './peer-routing.js'
1818
import { RandomWalk } from './random-walk.js'
1919
import { Registrar } from './registrar.js'
2020
import { DefaultTransportManager } from './transport-manager.js'
21-
import { DefaultUpgrader } from './upgrader.js'
21+
import { Upgrader } from './upgrader.js'
2222
import { userAgent } from './user-agent.js'
2323
import * as pkg from './version.js'
2424
import type { Components } from './components.js'
@@ -113,11 +113,10 @@ export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter
113113
}
114114

115115
// Set up the Upgrader
116-
this.components.upgrader = new DefaultUpgrader(this.components, {
116+
this.components.upgrader = new Upgrader(this.components, {
117117
connectionEncrypters: (init.connectionEncrypters ?? []).map((fn, index) => this.configureComponent(`connection-encryption-${index}`, fn(this.components))),
118118
streamMuxers: (init.streamMuxers ?? []).map((fn, index) => this.configureComponent(`stream-muxers-${index}`, fn(this.components))),
119119
inboundUpgradeTimeout: init.connectionManager?.inboundUpgradeTimeout,
120-
outboundUpgradeTimeout: init.connectionManager?.outboundUpgradeTimeout,
121120
inboundStreamProtocolNegotiationTimeout: init.connectionManager?.inboundStreamProtocolNegotiationTimeout ?? init.connectionManager?.protocolNegotiationTimeout,
122121
outboundStreamProtocolNegotiationTimeout: init.connectionManager?.outboundStreamProtocolNegotiationTimeout ?? init.connectionManager?.protocolNegotiationTimeout
123122
})

packages/libp2p/src/upgrader.ts

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import { InvalidMultiaddrError, TooManyInboundProtocolStreamsError, TooManyOutboundProtocolStreamsError, LimitedConnectionError, setMaxListeners, InvalidPeerIdError } from '@libp2p/interface'
22
import * as mss from '@libp2p/multistream-select'
33
import { peerIdFromString } from '@libp2p/peer-id'
4+
import { anySignal } from 'any-signal'
45
import { CustomProgressEvent } from 'progress-events'
56
import { createConnection } from './connection/index.js'
6-
import { PROTOCOL_NEGOTIATION_TIMEOUT, UPGRADE_TIMEOUT } from './connection-manager/constants.js'
7+
import { PROTOCOL_NEGOTIATION_TIMEOUT, INBOUND_UPGRADE_TIMEOUT } from './connection-manager/constants.js'
78
import { ConnectionDeniedError, ConnectionInterceptedError, EncryptionFailedError, MuxerUnavailableError } from './errors.js'
89
import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js'
9-
import type { Libp2pEvents, AbortOptions, ComponentLogger, MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions, ConnectionEncrypter, SecuredConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, StreamMuxer, StreamMuxerFactory, Upgrader, UpgraderOptions, ConnectionLimits, SecureConnectionOptions, CounterGroup } from '@libp2p/interface'
10+
import type { Libp2pEvents, AbortOptions, ComponentLogger, MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions, ConnectionEncrypter, SecuredConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, StreamMuxer, StreamMuxerFactory, Upgrader as UpgraderInterface, UpgraderOptions, ConnectionLimits, SecureConnectionOptions, CounterGroup, ClearableSignal } from '@libp2p/interface'
1011
import type { ConnectionManager, Registrar } from '@libp2p/interface-internal'
1112

1213
interface CreateConnectionOptions {
@@ -40,13 +41,6 @@ export interface UpgraderInit {
4041
*/
4142
inboundUpgradeTimeout?: number
4243

43-
/**
44-
* An amount of ms by which an outbound connection upgrade must complete
45-
*
46-
* @default 3000
47-
*/
48-
outboundUpgradeTimeout?: number
49-
5044
/**
5145
* When a new incoming stream is opened on a multiplexed connection, protocol
5246
* negotiation on that stream must complete within this many ms
@@ -120,12 +114,11 @@ export interface DefaultUpgraderComponents {
120114

121115
type ConnectionDeniedType = keyof Pick<ConnectionGater, 'denyOutboundConnection' | 'denyInboundEncryptedConnection' | 'denyOutboundEncryptedConnection' | 'denyInboundUpgradedConnection' | 'denyOutboundUpgradedConnection'>
122116

123-
export class DefaultUpgrader implements Upgrader {
117+
export class Upgrader implements UpgraderInterface {
124118
private readonly components: DefaultUpgraderComponents
125119
private readonly connectionEncrypters: Map<string, ConnectionEncrypter>
126120
private readonly streamMuxers: Map<string, StreamMuxerFactory>
127121
private readonly inboundUpgradeTimeout: number
128-
private readonly outboundUpgradeTimeout: number
129122
private readonly inboundStreamProtocolNegotiationTimeout: number
130123
private readonly outboundStreamProtocolNegotiationTimeout: number
131124
private readonly events: TypedEventTarget<Libp2pEvents>
@@ -148,8 +141,7 @@ export class DefaultUpgrader implements Upgrader {
148141
this.streamMuxers.set(muxer.protocol, muxer)
149142
})
150143

151-
this.inboundUpgradeTimeout = init.inboundUpgradeTimeout ?? UPGRADE_TIMEOUT
152-
this.outboundUpgradeTimeout = init.outboundUpgradeTimeout ?? UPGRADE_TIMEOUT
144+
this.inboundUpgradeTimeout = init.inboundUpgradeTimeout ?? INBOUND_UPGRADE_TIMEOUT
153145
this.inboundStreamProtocolNegotiationTimeout = init.inboundStreamProtocolNegotiationTimeout ?? PROTOCOL_NEGOTIATION_TIMEOUT
154146
this.outboundStreamProtocolNegotiationTimeout = init.outboundStreamProtocolNegotiationTimeout ?? PROTOCOL_NEGOTIATION_TIMEOUT
155147
this.events = components.events
@@ -177,12 +169,25 @@ export class DefaultUpgrader implements Upgrader {
177169
}
178170
}
179171

172+
createInboundAbortSignal (signal: AbortSignal): ClearableSignal {
173+
const output = anySignal([
174+
AbortSignal.timeout(this.inboundUpgradeTimeout),
175+
signal
176+
])
177+
setMaxListeners(Infinity, output)
178+
179+
return output
180+
}
181+
180182
/**
181183
* Upgrades an inbound connection
182184
*/
183-
async upgradeInbound (maConn: MultiaddrConnection, opts: UpgraderOptions = {}): Promise<void> {
185+
async upgradeInbound (maConn: MultiaddrConnection, opts: UpgraderOptions): Promise<void> {
184186
let accepted = false
185187

188+
// always apply upgrade timeout for incoming upgrades
189+
const signal = this.createInboundAbortSignal(opts.signal)
190+
186191
try {
187192
this.metrics.dials?.increment({
188193
inbound: true
@@ -196,14 +201,19 @@ export class DefaultUpgrader implements Upgrader {
196201

197202
await this.shouldBlockConnection('denyInboundConnection', maConn)
198203

199-
await this._performUpgrade(maConn, 'inbound', opts)
204+
await this._performUpgrade(maConn, 'inbound', {
205+
...opts,
206+
signal
207+
})
200208
} catch (err) {
201209
this.metrics.errors?.increment({
202210
inbound: true
203211
})
204212

205213
throw err
206214
} finally {
215+
signal.clear()
216+
207217
if (accepted) {
208218
this.components.connectionManager.afterUpgradeInbound()
209219
}
@@ -213,7 +223,7 @@ export class DefaultUpgrader implements Upgrader {
213223
/**
214224
* Upgrades an outbound connection
215225
*/
216-
async upgradeOutbound (maConn: MultiaddrConnection, opts: UpgraderOptions = {}): Promise<Connection> {
226+
async upgradeOutbound (maConn: MultiaddrConnection, opts: UpgraderOptions): Promise<Connection> {
217227
try {
218228
this.metrics.dials?.increment({
219229
outbound: true
@@ -251,14 +261,6 @@ export class DefaultUpgrader implements Upgrader {
251261
let muxerFactory: StreamMuxerFactory | undefined
252262
let cryptoProtocol
253263

254-
if (opts.signal == null) {
255-
maConn.log('no abort signal was passed while trying to upgrade connection, falling back to default timeout')
256-
257-
const upgradeTimeoutSignal = AbortSignal.timeout(direction === 'inbound' ? this.inboundUpgradeTimeout : this.outboundUpgradeTimeout)
258-
setMaxListeners(Infinity, upgradeTimeoutSignal)
259-
opts.signal = upgradeTimeoutSignal
260-
}
261-
262264
this.components.metrics?.trackMultiaddrConnection(maConn)
263265

264266
maConn.log.trace('starting the %s connection upgrade', direction)

0 commit comments

Comments
 (0)