Skip to content

Commit aa8de9f

Browse files
tabcatachingbrain
andauthored
fix(@libp2p/tcp): race condition in onSocket (#2763)
This fixes a race condition in the onSocket listener method. onSocket gets a net.Socket parameter that needs to be closed later before the tcp server can close. The net.Socket is used to create a MultiaddrConnection but the connection is not tracked with this.connections until after it has been upgraded. If the tcp listener.close is called while a the MultiaddrConnection is waiting to be upgraded, then the MultiaddrConnection socket cannot be closed as it does not exist in this.connections. Fixes #2760 --------- Co-authored-by: achingbrain <[email protected]>
1 parent 3bc9769 commit aa8de9f

File tree

4 files changed

+152
-62
lines changed

4 files changed

+152
-62
lines changed

packages/transport-tcp/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
"@multiformats/multiaddr": "^12.2.3",
6767
"@types/sinon": "^17.0.3",
6868
"p-defer": "^4.0.1",
69+
"p-event": "^6.0.1",
6970
"progress-events": "^1.0.0",
7071
"race-event": "^1.3.0",
7172
"stream-to-it": "^1.0.1"

packages/transport-tcp/src/listener.ts

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import net from 'net'
2-
import { AbortError, AlreadyStartedError, InvalidParametersError, NotStartedError, TypedEventEmitter } from '@libp2p/interface'
2+
import { AlreadyStartedError, InvalidParametersError, NotStartedError, TypedEventEmitter, setMaxListeners } from '@libp2p/interface'
3+
import { pEvent } from 'p-event'
34
import { CODE_P2P } from './constants.js'
45
import { toMultiaddrConnection } from './socket-to-conn.js'
56
import {
@@ -67,19 +68,23 @@ type Status = { code: TCPListenerStatusCode.INACTIVE } | {
6768

6869
export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Listener {
6970
private readonly server: net.Server
70-
/** Keep track of open connections to destroy in case of timeout */
71-
private readonly connections = new Set<MultiaddrConnection>()
71+
/** Keep track of open sockets to destroy in case of timeout */
72+
private readonly sockets = new Set<net.Socket>()
7273
private status: Status = { code: TCPListenerStatusCode.INACTIVE }
7374
private metrics?: TCPListenerMetrics
7475
private addr: string
7576
private readonly log: Logger
77+
private readonly shutdownController: AbortController
7678

7779
constructor (private readonly context: Context) {
7880
super()
7981

8082
context.keepAlive = context.keepAlive ?? true
8183
context.noDelay = context.noDelay ?? true
8284

85+
this.shutdownController = new AbortController()
86+
setMaxListeners(Infinity, this.shutdownController.signal)
87+
8388
this.log = context.logger.forComponent('libp2p:tcp:listener')
8489
this.addr = 'unknown'
8590
this.server = net.createServer(context, this.onSocket.bind(this))
@@ -119,7 +124,7 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
119124
help: 'Current active connections in TCP listener',
120125
calculate: () => {
121126
return {
122-
[this.addr]: this.connections.size
127+
[this.addr]: this.sockets.size
123128
}
124129
}
125130
})
@@ -195,18 +200,20 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
195200
}
196201

197202
this.log('new inbound connection %s', maConn.remoteAddr)
203+
this.sockets.add(socket)
198204

199-
this.context.upgrader.upgradeInbound(maConn)
205+
this.context.upgrader.upgradeInbound(maConn, {
206+
signal: this.shutdownController.signal
207+
})
200208
.then((conn) => {
201209
this.log('inbound connection upgraded %s', maConn.remoteAddr)
202-
this.connections.add(maConn)
203210

204211
socket.once('close', () => {
205-
this.connections.delete(maConn)
212+
this.sockets.delete(socket)
206213

207214
if (
208215
this.context.closeServerOnMaxConnections != null &&
209-
this.connections.size < this.context.closeServerOnMaxConnections.listenBelow
216+
this.sockets.size < this.context.closeServerOnMaxConnections.listenBelow
210217
) {
211218
// The most likely case of error is if the port taken by this
212219
// application is bound by another process during the time the
@@ -227,18 +234,17 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
227234

228235
if (
229236
this.context.closeServerOnMaxConnections != null &&
230-
this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove
237+
this.sockets.size >= this.context.closeServerOnMaxConnections.closeAbove
231238
) {
232-
this.pause(false).catch(e => {
233-
this.log.error('error attempting to close server once connection count over limit', e)
234-
})
239+
this.pause()
235240
}
236241

237242
this.safeDispatchEvent('connection', { detail: conn })
238243
})
239244
.catch(async err => {
240245
this.log.error('inbound connection upgrade failed', err)
241246
this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true })
247+
this.sockets.delete(socket)
242248
maConn.abort(err)
243249
})
244250
}
@@ -300,15 +306,28 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
300306
}
301307

302308
async close (): Promise<void> {
303-
const err = new AbortError('Listener is closing')
309+
const events: Array<Promise<void>> = []
304310

305-
// synchronously close each connection
306-
this.connections.forEach(conn => {
307-
conn.abort(err)
308-
})
311+
if (this.server.listening) {
312+
events.push(pEvent(this.server, 'close'))
313+
}
309314

310315
// shut down the server socket, permanently
311-
await this.pause(true)
316+
this.pause(true)
317+
318+
// stop any in-progress connection upgrades
319+
this.shutdownController.abort()
320+
321+
// synchronously close any open connections - should be done after closing
322+
// the server socket in case new sockets are opened during the shutdown
323+
this.sockets.forEach(socket => {
324+
if (socket.readable) {
325+
events.push(pEvent(socket, 'close'))
326+
socket.destroy()
327+
}
328+
})
329+
330+
await Promise.all(events)
312331
}
313332

314333
/**
@@ -332,7 +351,7 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
332351
this.log('listening on %s', this.server.address())
333352
}
334353

335-
private async pause (permanent: boolean): Promise<void> {
354+
private pause (permanent: boolean = false): void {
336355
if (!this.server.listening && this.status.code === TCPListenerStatusCode.PAUSED && permanent) {
337356
this.status = { code: TCPListenerStatusCode.INACTIVE }
338357
return
@@ -361,15 +380,10 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
361380
// during the time the server is closing
362381
this.status = permanent ? { code: TCPListenerStatusCode.INACTIVE } : { ...this.status, code: TCPListenerStatusCode.PAUSED }
363382

364-
await new Promise<void>((resolve, reject) => {
365-
this.server.close(err => {
366-
if (err != null) {
367-
reject(err)
368-
return
369-
}
370-
371-
resolve()
372-
})
373-
})
383+
// stop accepting incoming connections - existing connections are maintained
384+
// - any callback passed here would be invoked after existing connections
385+
// close, we want to maintain them so no callback is passed otherwise his
386+
// method will never return
387+
this.server.close()
374388
}
375389
}

packages/transport-tcp/test/connection-limits.spec.ts

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import net from 'node:net'
22
import { promisify } from 'util'
3-
import { TypedEventEmitter } from '@libp2p/interface'
43
import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks'
54
import { defaultLogger } from '@libp2p/logger'
65
import { multiaddr } from '@multiformats/multiaddr'
@@ -64,38 +63,40 @@ async function assertServerConnections (listener: TCPListener, connections: numb
6463
// Expect server connections but allow time for sockets to connect or disconnect
6564
for (let i = 0; i < 100; i++) {
6665
// eslint-disable-next-line @typescript-eslint/dot-notation
67-
if (listener['connections'].size === connections) {
66+
if (listener['sockets'].size === connections) {
6867
return
6968
} else {
7069
await promisify(setTimeout)(10)
7170
}
7271
}
7372
// eslint-disable-next-line @typescript-eslint/dot-notation
74-
expect(listener['connections'].size).equals(connections, 'invalid amount of server connections')
73+
expect(listener['sockets'].size).equals(connections, 'invalid amount of server connections')
7574
}
7675

7776
describe('closeAbove/listenBelow', () => {
78-
const afterEachCallbacks: Array<() => Promise<any> | any> = []
77+
let afterEachCallbacks: Array<() => Promise<any> | any> = []
78+
79+
beforeEach(() => {
80+
afterEachCallbacks = []
81+
})
82+
7983
afterEach(async () => {
8084
await Promise.all(afterEachCallbacks.map(fn => fn()))
81-
afterEachCallbacks.length = 0
8285
})
8386

8487
it('reject dial of connection above closeAbove', async () => {
8588
const listenBelow = 2
8689
const closeAbove = 3
8790
const port = 9900
8891

89-
const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
92+
const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
9093
logger: defaultLogger()
9194
})
9295

93-
const upgrader = mockUpgrader({
94-
events: new TypedEventEmitter()
95-
})
96-
const listener = trasnport.createListener({ upgrader }) as TCPListener
97-
// eslint-disable-next-line @typescript-eslint/promise-function-async
98-
afterEachCallbacks.push(() => listener.close())
96+
const upgrader = mockUpgrader()
97+
const listener = transport.createListener({ upgrader }) as TCPListener
98+
afterEachCallbacks.push(async () => listener.close())
99+
99100
await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`))
100101
const { assertConnectedSocket, assertRefusedSocket } = buildSocketAssertions(port, afterEachCallbacks)
101102

@@ -115,16 +116,14 @@ describe('closeAbove/listenBelow', () => {
115116
const closeAbove = 3
116117
const port = 9900
117118

118-
const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
119+
const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
119120
logger: defaultLogger()
120121
})
121122

122-
const upgrader = mockUpgrader({
123-
events: new TypedEventEmitter()
124-
})
125-
const listener = trasnport.createListener({ upgrader }) as TCPListener
126-
// eslint-disable-next-line @typescript-eslint/promise-function-async
127-
afterEachCallbacks.push(() => listener.close())
123+
const upgrader = mockUpgrader()
124+
const listener = transport.createListener({ upgrader }) as TCPListener
125+
afterEachCallbacks.push(async () => listener.close())
126+
128127
await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`))
129128
const { assertConnectedSocket } = buildSocketAssertions(port, afterEachCallbacks)
130129

@@ -152,16 +151,13 @@ describe('closeAbove/listenBelow', () => {
152151
const closeAbove = 3
153152
const port = 9900
154153

155-
const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
154+
const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
156155
logger: defaultLogger()
157156
})
158157

159-
const upgrader = mockUpgrader({
160-
events: new TypedEventEmitter()
161-
})
162-
const listener = trasnport.createListener({ upgrader }) as TCPListener
163-
// eslint-disable-next-line @typescript-eslint/promise-function-async
164-
afterEachCallbacks.push(() => listener.close())
158+
const upgrader = mockUpgrader()
159+
const listener = transport.createListener({ upgrader }) as TCPListener
160+
afterEachCallbacks.push(async () => listener.close())
165161

166162
let closeEventCallCount = 0
167163
listener.addEventListener('close', () => {
@@ -185,16 +181,13 @@ describe('closeAbove/listenBelow', () => {
185181
const closeAbove = 3
186182
const port = 9900
187183

188-
const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
184+
const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
189185
logger: defaultLogger()
190186
})
191187

192-
const upgrader = mockUpgrader({
193-
events: new TypedEventEmitter()
194-
})
195-
const listener = trasnport.createListener({ upgrader }) as TCPListener
196-
// eslint-disable-next-line @typescript-eslint/promise-function-async
197-
afterEachCallbacks.push(() => listener.close())
188+
const upgrader = mockUpgrader()
189+
const listener = transport.createListener({ upgrader }) as TCPListener
190+
afterEachCallbacks.push(async () => listener.close())
198191

199192
let listeningEventCallCount = 0
200193
listener.addEventListener('listening', () => {

packages/transport-tcp/test/listen-dial.spec.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,4 +394,86 @@ describe('dial', () => {
394394

395395
await listener.close()
396396
})
397+
398+
it('should close before connection upgrade is completed', async () => {
399+
// create a Promise that resolves when the upgrade starts
400+
const upgradeStarted = pDefer()
401+
402+
// create a listener with the handler
403+
const listener = transport.createListener({
404+
upgrader: {
405+
async upgradeInbound () {
406+
upgradeStarted.resolve()
407+
408+
return new Promise(() => {})
409+
},
410+
async upgradeOutbound () {
411+
return new Promise(() => {})
412+
}
413+
}
414+
})
415+
416+
// listen on a multiaddr
417+
await listener.listen(multiaddr('/ip4/127.0.0.1/tcp/0'))
418+
419+
const localAddrs = listener.getAddrs()
420+
expect(localAddrs.length).to.equal(1)
421+
422+
// dial the listener address
423+
transport.dial(localAddrs[0], {
424+
upgrader
425+
}).catch(() => {})
426+
427+
// wait for the upgrade to start
428+
await upgradeStarted.promise
429+
430+
// close the listener, process should exit normally
431+
await listener.close()
432+
})
433+
434+
it('should abort inbound upgrade on close', async () => {
435+
// create a Promise that resolves when the upgrade starts
436+
const upgradeStarted = pDefer()
437+
const abortedUpgrade = pDefer()
438+
439+
// create a listener with the handler
440+
const listener = transport.createListener({
441+
upgrader: {
442+
async upgradeInbound (maConn, opts) {
443+
upgradeStarted.resolve()
444+
445+
opts?.signal?.addEventListener('abort', () => {
446+
abortedUpgrade.resolve()
447+
}, {
448+
once: true
449+
})
450+
451+
return new Promise(() => {})
452+
},
453+
async upgradeOutbound () {
454+
return new Promise(() => {})
455+
}
456+
}
457+
})
458+
459+
// listen on a multiaddr
460+
await listener.listen(multiaddr('/ip4/127.0.0.1/tcp/0'))
461+
462+
const localAddrs = listener.getAddrs()
463+
expect(localAddrs.length).to.equal(1)
464+
465+
// dial the listener address
466+
transport.dial(localAddrs[0], {
467+
upgrader
468+
}).catch(() => {})
469+
470+
// wait for the upgrade to start
471+
await upgradeStarted.promise
472+
473+
// close the listener
474+
await listener.close()
475+
476+
// should abort the upgrade
477+
await abortedUpgrade.promise
478+
})
397479
})

0 commit comments

Comments
 (0)