@@ -4,7 +4,8 @@ import { toMultiaddrConnection } from './socket-to-conn.js'
44import { CODE_P2P } from './constants.js'
55import {
66 getMultiaddrs ,
7- multiaddrToNetConfig
7+ multiaddrToNetConfig ,
8+ NetConfig
89} from './utils.js'
910import { EventEmitter , CustomEvent } from '@libp2p/interfaces/events'
1011import type { MultiaddrConnection , Connection } from '@libp2p/interface-connection'
@@ -26,13 +27,22 @@ async function attemptClose (maConn: MultiaddrConnection) {
2627 }
2728}
2829
30+ export interface CloseServerOnMaxConnectionsOpts {
31+ /** Server listens once connection count is less than `listenBelow` */
32+ listenBelow : number
33+ /** Close server once connection count is greater than or equal to `closeAbove` */
34+ closeAbove : number
35+ onListenError ?: ( err : Error ) => void
36+ }
37+
2938interface Context extends TCPCreateListenerOptions {
3039 handler ?: ( conn : Connection ) => void
3140 upgrader : Upgrader
3241 socketInactivityTimeout ?: number
3342 socketCloseTimeout ?: number
3443 maxConnections ?: number
3544 metrics ?: Metrics
45+ closeServerOnMaxConnections ?: CloseServerOnMaxConnectionsOpts
3646}
3747
3848const SERVER_STATUS_UP = 1
@@ -44,7 +54,12 @@ export interface TCPListenerMetrics {
4454 events : CounterGroup
4555}
4656
47- type Status = { started : false } | { started : true , listeningAddr : Multiaddr , peerId : string | null }
57+ type Status = { started : false } | {
58+ started : true
59+ listeningAddr : Multiaddr
60+ peerId : string | null
61+ netConfig : NetConfig
62+ }
4863
4964export class TCPListener extends EventEmitter < ListenerEvents > implements Listener {
5065 private readonly server : net . Server
@@ -69,6 +84,13 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
6984 this . server . maxConnections = context . maxConnections
7085 }
7186
87+ if ( context . closeServerOnMaxConnections != null ) {
88+ // Sanity check options
89+ if ( context . closeServerOnMaxConnections . closeAbove < context . closeServerOnMaxConnections . listenBelow ) {
90+ throw Error ( 'closeAbove must be >= listenBelow' )
91+ }
92+ }
93+
7294 this . server
7395 . on ( 'listening' , ( ) => {
7496 if ( context . metrics != null ) {
@@ -159,12 +181,33 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
159181
160182 socket . once ( 'close' , ( ) => {
161183 this . connections . delete ( maConn )
184+
185+ if (
186+ this . context . closeServerOnMaxConnections != null &&
187+ this . connections . size < this . context . closeServerOnMaxConnections . listenBelow
188+ ) {
189+ // The most likely case of error is if the port taken by this application is binded by
190+ // another process during the time the server if closed. In that case there's not much
191+ // we can do. netListen() will be called again every time a connection is dropped, which
192+ // acts as an eventual retry mechanism. onListenError allows the consumer act on this.
193+ this . netListen ( ) . catch ( e => {
194+ log . error ( 'error attempting to listen server once connection count under limit' , e )
195+ this . context . closeServerOnMaxConnections ?. onListenError ?.( e as Error )
196+ } )
197+ }
162198 } )
163199
164200 if ( this . context . handler != null ) {
165201 this . context . handler ( conn )
166202 }
167203
204+ if (
205+ this . context . closeServerOnMaxConnections != null &&
206+ this . connections . size >= this . context . closeServerOnMaxConnections . closeAbove
207+ ) {
208+ this . netClose ( )
209+ }
210+
168211 this . dispatchEvent ( new CustomEvent < Connection > ( 'connection' , { detail : conn } ) )
169212 } )
170213 . catch ( async err => {
@@ -220,34 +263,70 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
220263 }
221264
222265 async listen ( ma : Multiaddr ) {
266+ if ( this . status . started ) {
267+ throw Error ( 'server is already listening' )
268+ }
269+
223270 const peerId = ma . getPeerId ( )
224271 const listeningAddr = peerId == null ? ma . decapsulateCode ( CODE_P2P ) : ma
225272
226- this . status = { started : true , listeningAddr, peerId }
273+ this . status = {
274+ started : true ,
275+ listeningAddr,
276+ peerId,
277+ netConfig : multiaddrToNetConfig ( listeningAddr )
278+ }
227279
228- return await new Promise < void > ( ( resolve , reject ) => {
229- const options = multiaddrToNetConfig ( listeningAddr )
230- this . server . on ( 'error' , ( err ) => {
231- reject ( err )
232- } )
233- this . server . listen ( options , ( ) => {
234- log ( 'Listening on %s' , this . server . address ( ) )
235- resolve ( )
236- } )
237- } )
280+ await this . netListen ( )
238281 }
239282
240283 async close ( ) {
241- if ( ! this . server . listening ) {
242- return
243- }
244-
245284 await Promise . all (
246285 Array . from ( this . connections . values ( ) ) . map ( async maConn => await attemptClose ( maConn ) )
247286 )
248287
288+ // netClose already checks if server.listening
289+ this . netClose ( )
290+ }
291+
292+ private async netListen ( ) : Promise < void > {
293+ if ( ! this . status . started || this . server . listening ) {
294+ return
295+ }
296+
297+ const netConfig = this . status . netConfig
298+
249299 await new Promise < void > ( ( resolve , reject ) => {
250- this . server . close ( err => ( err != null ) ? reject ( err ) : resolve ( ) )
300+ // NOTE: 'listening' event is only fired on success. Any error such as port already binded, is emitted via 'error'
301+ this . server . once ( 'error' , reject )
302+ this . server . listen ( netConfig , resolve )
251303 } )
304+
305+ log ( 'Listening on %s' , this . server . address ( ) )
306+ }
307+
308+ private netClose ( ) : void {
309+ if ( ! this . status . started || ! this . server . listening ) {
310+ return
311+ }
312+
313+ log ( 'Closing server on %s' , this . server . address ( ) )
314+
315+ // NodeJS implementation tracks listening status with `this._handle` property.
316+ // - Server.close() sets this._handle to null immediately. If this._handle is null, ERR_SERVER_NOT_RUNNING is thrown
317+ // - Server.listening returns `this._handle !== null` https://github.com/nodejs/node/blob/386d761943bb1b217fba27d6b80b658c23009e60/lib/net.js#L1675
318+ // - Server.listen() if `this._handle !== null` throws ERR_SERVER_ALREADY_LISTEN
319+ //
320+ // NOTE: Both listen and close are technically not async actions, so it's not necessary to track
321+ // states 'pending-close' or 'pending-listen'
322+
323+ // From docs https://nodejs.org/api/net.html#serverclosecallback
324+ // Stops the server from accepting new connections and keeps existing connections.
325+ // 'close' event is emitted only emitted when all connections are ended.
326+ // The optional callback will be called once the 'close' event occurs.
327+ //
328+ // NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary
329+ // to pass a callback to close.
330+ this . server . close ( )
252331 }
253332}
0 commit comments