Skip to content

Commit 24fa1d5

Browse files
authored
fix: refactor connection opening and closing (#2735)
Simplifies connection opening/closing, catches some instances where we were not destroying sockets for incoming and outgoing connections which caused a file descriptor leak.
1 parent 58784ab commit 24fa1d5

File tree

8 files changed

+195
-171
lines changed

8 files changed

+195
-171
lines changed

packages/integration-tests/test/circuit-relay.node.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -602,8 +602,8 @@ describe('circuit-relay', () => {
602602
await deferred.promise
603603

604604
// should have closed connections to remote and to relay
605-
expect(events[0].detail.remotePeer.toString()).to.equal(remote.peerId.toString())
606-
expect(events[1].detail.remotePeer.toString()).to.equal(relay1.peerId.toString())
605+
expect(events[0].detail.remotePeer.toString()).to.equal(relay1.peerId.toString())
606+
expect(events[1].detail.remotePeer.toString()).to.equal(remote.peerId.toString())
607607
})
608608

609609
it('should remove the relay event listener when the relay stops', async () => {

packages/transport-tcp/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@
6565
"@multiformats/mafmt": "^12.1.6",
6666
"@multiformats/multiaddr": "^12.2.3",
6767
"@types/sinon": "^17.0.3",
68+
"p-defer": "^4.0.1",
6869
"progress-events": "^1.0.0",
70+
"race-event": "^1.3.0",
6971
"stream-to-it": "^1.0.1"
7072
},
7173
"devDependencies": {
@@ -74,7 +76,6 @@
7476
"aegir": "^44.0.1",
7577
"it-all": "^3.0.6",
7678
"it-pipe": "^3.0.1",
77-
"p-defer": "^4.0.1",
7879
"sinon": "^18.0.0",
7980
"uint8arrays": "^5.1.0",
8081
"wherearewe": "^2.0.1"

packages/transport-tcp/src/constants.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@ export const CODE_UNIX = 400
77
export const CLOSE_TIMEOUT = 500
88

99
// Close the socket if there is no activity after this long in ms
10-
export const SOCKET_TIMEOUT = 5 * 60000 // 5 mins
10+
export const SOCKET_TIMEOUT = 2 * 60000 // 2 mins

packages/transport-tcp/src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ export interface TCPComponents {
123123
}
124124

125125
export interface TCPMetrics {
126-
dialerEvents: CounterGroup<'error' | 'timeout' | 'connect' | 'abort'>
126+
events: CounterGroup<'error' | 'timeout' | 'connect' | 'abort'>
127+
errors: CounterGroup<'outbound_to_connection' | 'outbound_upgrade'>
127128
}
128129

129130
export function tcp (init: TCPOptions = {}): (components: TCPComponents) => Transport {

packages/transport-tcp/src/listener.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -163,20 +163,19 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
163163
this.safeDispatchEvent('close')
164164
}
165165
})
166+
.on('drop', () => {
167+
this.metrics?.events.increment({ [`${this.addr} drop`]: true })
168+
})
166169
}
167170

168171
private onSocket (socket: net.Socket): void {
172+
this.metrics?.events.increment({ [`${this.addr} connection`]: true })
173+
169174
if (this.status.code !== TCPListenerStatusCode.ACTIVE) {
170175
socket.destroy()
171176
throw new NotStartedError('Server is not listening yet')
172177
}
173178

174-
// Avoid uncaught errors caused by unstable connections
175-
socket.on('error', err => {
176-
this.log('socket error', err)
177-
this.metrics?.events.increment({ [`${this.addr} error`]: true })
178-
})
179-
180179
let maConn: MultiaddrConnection
181180
try {
182181
maConn = toMultiaddrConnection(socket, {
@@ -185,11 +184,13 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
185184
socketCloseTimeout: this.context.socketCloseTimeout,
186185
metrics: this.metrics?.events,
187186
metricPrefix: `${this.addr} `,
188-
logger: this.context.logger
187+
logger: this.context.logger,
188+
direction: 'inbound'
189189
})
190-
} catch (err) {
190+
} catch (err: any) {
191191
this.log.error('inbound connection failed', err)
192192
this.metrics?.errors.increment({ [`${this.addr} inbound_to_connection`]: true })
193+
socket.destroy()
193194
return
194195
}
195196

Lines changed: 81 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1-
import { AbortError, InvalidParametersError, TimeoutError } from '@libp2p/interface'
1+
import { InvalidParametersError, TimeoutError } from '@libp2p/interface'
22
import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr'
3+
import pDefer from 'p-defer'
4+
import { raceEvent } from 'race-event'
35
import { duplex } from 'stream-to-it'
46
import { CLOSE_TIMEOUT, SOCKET_TIMEOUT } from './constants.js'
57
import { multiaddrToNetConfig } from './utils.js'
68
import type { ComponentLogger, MultiaddrConnection, CounterGroup } from '@libp2p/interface'
79
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
810
import type { Socket } from 'net'
11+
import type { DeferredPromise } from 'p-defer'
912

1013
interface ToConnectionOptions {
1114
listeningAddr?: Multiaddr
@@ -16,19 +19,23 @@ interface ToConnectionOptions {
1619
metrics?: CounterGroup
1720
metricPrefix?: string
1821
logger: ComponentLogger
22+
direction: 'inbound' | 'outbound'
1923
}
2024

2125
/**
2226
* Convert a socket into a MultiaddrConnection
2327
* https://github.com/libp2p/interface-transport#multiaddrconnection
2428
*/
2529
export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptions): MultiaddrConnection => {
26-
let closePromise: Promise<void> | null = null
30+
let closePromise: DeferredPromise<void>
2731
const log = options.logger.forComponent('libp2p:tcp:socket')
32+
const direction = options.direction
2833
const metrics = options.metrics
2934
const metricPrefix = options.metricPrefix ?? ''
3035
const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT
3136
const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT
37+
let timedout = false
38+
let errored = false
3239

3340
// Check if we are connected on a unix path
3441
if (options.listeningAddr?.getPath() != null) {
@@ -39,6 +46,19 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
3946
options.localAddr = options.remoteAddr
4047
}
4148

49+
// handle socket errors
50+
socket.on('error', err => {
51+
errored = true
52+
53+
if (!timedout) {
54+
log.error('%s socket error - %e', direction, err)
55+
metrics?.increment({ [`${metricPrefix}error`]: true })
56+
}
57+
58+
socket.destroy()
59+
maConn.timeline.close = Date.now()
60+
})
61+
4262
let remoteAddr: Multiaddr
4363

4464
if (options.remoteAddr != null) {
@@ -59,37 +79,37 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
5979

6080
// by default there is no timeout
6181
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
62-
socket.setTimeout(inactivityTimeout, () => {
63-
log('%s socket read timeout', lOptsStr)
64-
metrics?.increment({ [`${metricPrefix}timeout`]: true })
82+
socket.setTimeout(inactivityTimeout)
6583

66-
// only destroy with an error if the remote has not sent the FIN message
67-
let err: Error | undefined
68-
if (socket.readable) {
69-
err = new TimeoutError('Socket read timeout')
70-
}
84+
socket.once('timeout', () => {
85+
timedout = true
86+
log('%s %s socket read timeout', direction, lOptsStr)
87+
metrics?.increment({ [`${metricPrefix}timeout`]: true })
7188

7289
// if the socket times out due to inactivity we must manually close the connection
7390
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-timeout
74-
socket.destroy(err)
91+
socket.destroy(new TimeoutError())
92+
maConn.timeline.close = Date.now()
7593
})
7694

7795
socket.once('close', () => {
78-
log('%s socket close', lOptsStr)
79-
metrics?.increment({ [`${metricPrefix}close`]: true })
96+
// record metric for clean exit
97+
if (!timedout && !errored) {
98+
log('%s %s socket close', direction, lOptsStr)
99+
metrics?.increment({ [`${metricPrefix}close`]: true })
100+
}
80101

81102
// In instances where `close` was not explicitly called,
82103
// such as an iterable stream ending, ensure we have set the close
83104
// timeline
84-
if (maConn.timeline.close == null) {
85-
maConn.timeline.close = Date.now()
86-
}
105+
socket.destroy()
106+
maConn.timeline.close = Date.now()
87107
})
88108

89109
socket.once('end', () => {
90110
// the remote sent a FIN packet which means no more data will be sent
91111
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end
92-
log('%s socket end', lOptsStr)
112+
log('%s %s socket end', direction, lOptsStr)
93113
metrics?.increment({ [`${metricPrefix}end`]: true })
94114
})
95115

@@ -111,7 +131,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
111131
// If the source errored the socket will already have been destroyed by
112132
// duplex(). If the socket errored it will already be
113133
// destroyed. There's nothing to do here except log the error & return.
114-
log.error('%s error in sink', lOptsStr, err)
134+
log.error('%s %s error in sink - %e', direction, lOptsStr, err)
115135
}
116136
}
117137

@@ -128,100 +148,84 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
128148

129149
async close (options: AbortOptions = {}) {
130150
if (socket.closed) {
131-
log('The %s socket is already closed', lOptsStr)
151+
log('the %s %s socket is already closed', direction, lOptsStr)
132152
return
133153
}
134154

135155
if (socket.destroyed) {
136-
log('The %s socket is already destroyed', lOptsStr)
156+
log('the %s %s socket is already destroyed', direction, lOptsStr)
137157
return
138158
}
139159

140-
const abortSignalListener = (): void => {
141-
socket.destroy(new AbortError('Destroying socket after timeout'))
160+
if (closePromise != null) {
161+
return closePromise.promise
142162
}
143163

144164
try {
145-
if (closePromise != null) {
146-
log('The %s socket is already closing', lOptsStr)
147-
await closePromise
148-
return
149-
}
165+
closePromise = pDefer()
150166

151-
if (options.signal == null) {
152-
const signal = AbortSignal.timeout(closeTimeout)
167+
// close writable end of socket
168+
socket.end()
153169

154-
options = {
155-
...options,
156-
signal
157-
}
158-
}
170+
// convert EventEmitter to EventTarget
171+
const eventTarget = socketToEventTarget(socket)
159172

160-
options.signal?.addEventListener('abort', abortSignalListener)
173+
// don't wait forever to close
174+
const signal = options.signal ?? AbortSignal.timeout(closeTimeout)
161175

162-
log('%s closing socket', lOptsStr)
163-
closePromise = new Promise<void>((resolve, reject) => {
164-
socket.once('close', () => {
165-
// socket completely closed
166-
log('%s socket closed', lOptsStr)
167-
resolve()
176+
// wait for any unsent data to be sent
177+
if (socket.writableLength > 0) {
178+
log('%s %s draining socket', direction, lOptsStr)
179+
await raceEvent(eventTarget, 'drain', signal, {
180+
errorEvent: 'error'
168181
})
169-
socket.once('error', (err: Error) => {
170-
log('%s socket error', lOptsStr, err)
171-
172-
if (!socket.destroyed) {
173-
reject(err)
174-
}
175-
// if socket is destroyed, 'closed' event will be emitted later to resolve the promise
176-
})
177-
178-
// shorten inactivity timeout
179-
socket.setTimeout(closeTimeout)
180-
181-
// close writable end of the socket
182-
socket.end()
183-
184-
if (socket.writableLength > 0) {
185-
// there are outgoing bytes waiting to be sent
186-
socket.once('drain', () => {
187-
log('%s socket drained', lOptsStr)
182+
log('%s %s socket drained', direction, lOptsStr)
183+
}
188184

189-
// all bytes have been sent we can destroy the socket (maybe) before the timeout
190-
socket.destroy()
191-
})
192-
} else {
193-
// nothing to send, destroy immediately, no need for the timeout
194-
socket.destroy()
195-
}
196-
})
185+
await Promise.all([
186+
raceEvent(eventTarget, 'close', signal, {
187+
errorEvent: 'error'
188+
}),
197189

198-
await closePromise
190+
// all bytes have been sent we can destroy the socket
191+
socket.destroy()
192+
])
199193
} catch (err: any) {
200194
this.abort(err)
201195
} finally {
202-
options.signal?.removeEventListener('abort', abortSignalListener)
196+
closePromise.resolve()
203197
}
204198
},
205199

206200
abort: (err: Error) => {
207-
log('%s socket abort due to error', lOptsStr, err)
201+
log('%s %s socket abort due to error - %e', direction, lOptsStr, err)
208202

209203
// the abortSignalListener may already destroyed the socket with an error
210-
if (!socket.destroyed) {
211-
socket.destroy(err)
212-
}
204+
socket.destroy()
213205

214206
// closing a socket is always asynchronous (must wait for "close" event)
215207
// but the tests expect this to be a synchronous operation so we have to
216208
// set the close time here. the tests should be refactored to reflect
217209
// reality.
218-
if (maConn.timeline.close == null) {
219-
maConn.timeline.close = Date.now()
220-
}
210+
maConn.timeline.close = Date.now()
221211
},
222212

223213
log
224214
}
225215

226216
return maConn
227217
}
218+
219+
function socketToEventTarget (obj?: any): EventTarget {
220+
const eventTarget = {
221+
addEventListener: (type: any, cb: any) => {
222+
obj.addListener(type, cb)
223+
},
224+
removeEventListener: (type: any, cb: any) => {
225+
obj.removeListener(type, cb)
226+
}
227+
}
228+
229+
// @ts-expect-error partial implementation
230+
return eventTarget
231+
}

0 commit comments

Comments
 (0)