1
1
import net from 'net'
2
2
import { AlreadyStartedError , InvalidParametersError , NotStartedError , TypedEventEmitter , setMaxListeners } from '@libp2p/interface'
3
+ import { anySignal } from 'any-signal'
3
4
import { pEvent } from 'p-event'
4
- import { CODE_P2P } from './constants.js'
5
+ import { CODE_P2P , INBOUND_UPGRADE_TIMEOUT } from './constants.js'
5
6
import { toMultiaddrConnection } from './socket-to-conn.js'
6
7
import {
7
8
getMultiaddrs ,
@@ -31,6 +32,7 @@ export interface CloseServerOnMaxConnectionsOpts {
31
32
32
33
interface Context extends TCPCreateListenerOptions {
33
34
upgrader : Upgrader
35
+ inboundUpgradeTimeout ?: number
34
36
socketInactivityTimeout ?: number
35
37
socketCloseTimeout ?: number
36
38
maxConnections ?: number
@@ -74,6 +76,7 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
74
76
private addr : string
75
77
private readonly log : Logger
76
78
private readonly shutdownController : AbortController
79
+ private readonly inboundUpgradeTimeout : number
77
80
78
81
constructor ( private readonly context : Context ) {
79
82
super ( )
@@ -85,6 +88,7 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
85
88
setMaxListeners ( Infinity , this . shutdownController . signal )
86
89
87
90
this . log = context . logger . forComponent ( 'libp2p:tcp:listener' )
91
+ this . inboundUpgradeTimeout = context . inboundUpgradeTimeout ?? INBOUND_UPGRADE_TIMEOUT
88
92
this . addr = 'unknown'
89
93
this . server = net . createServer ( context , this . onSocket . bind ( this ) )
90
94
@@ -201,8 +205,14 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
201
205
this . log ( 'new inbound connection %s' , maConn . remoteAddr )
202
206
this . sockets . add ( socket )
203
207
208
+ const signal = anySignal ( [
209
+ this . shutdownController . signal ,
210
+ AbortSignal . timeout ( this . inboundUpgradeTimeout )
211
+ ] )
212
+ setMaxListeners ( Infinity , signal )
213
+
204
214
this . context . upgrader . upgradeInbound ( maConn , {
205
- signal : this . shutdownController . signal
215
+ signal
206
216
} )
207
217
. then ( ( ) => {
208
218
this . log ( 'inbound connection upgraded %s' , maConn . remoteAddr )
@@ -240,6 +250,9 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
240
250
this . sockets . delete ( socket )
241
251
maConn . abort ( err )
242
252
} )
253
+ . finally ( ( ) => {
254
+ signal . clear ( )
255
+ } )
243
256
}
244
257
245
258
getAddrs ( ) : Multiaddr [ ] {
0 commit comments