@@ -5,28 +5,11 @@ import { multiaddr } from '@multiformats/multiaddr'
5
5
import { pEvent } from 'p-event'
6
6
import { toMultiaddrConnection } from './socket-to-conn.js'
7
7
import { multiaddrToNetConfig } from './utils.js'
8
- import type { TCPCreateListenerOptions } from './index.js'
8
+ import type { CloseServerOnMaxConnectionsOpts , TCPCreateListenerOptions } from './index.js'
9
9
import type { NetConfig } from './utils.js'
10
10
import type { ComponentLogger , Logger , MultiaddrConnection , CounterGroup , MetricGroup , Metrics , Listener , ListenerEvents , Upgrader } from '@libp2p/interface'
11
11
import type { Multiaddr } from '@multiformats/multiaddr'
12
12
13
- export interface CloseServerOnMaxConnectionsOpts {
14
- /**
15
- * Server listens once connection count is less than `listenBelow`
16
- */
17
- listenBelow : number
18
-
19
- /**
20
- * Close server once connection count is greater than or equal to `closeAbove`
21
- */
22
- closeAbove : number
23
-
24
- /**
25
- * Invoked when there was an error listening on a socket
26
- */
27
- onListenError ?( err : Error ) : void
28
- }
29
-
30
13
interface Context extends TCPCreateListenerOptions {
31
14
upgrader : Upgrader
32
15
socketInactivityTimeout ?: number
@@ -38,10 +21,10 @@ interface Context extends TCPCreateListenerOptions {
38
21
logger : ComponentLogger
39
22
}
40
23
41
- export interface TCPListenerMetrics {
42
- status : MetricGroup
43
- errors : CounterGroup
44
- events : CounterGroup
24
+ interface TCPListenerMetrics {
25
+ status ? : MetricGroup
26
+ errors ? : CounterGroup
27
+ events ? : CounterGroup
45
28
}
46
29
47
30
enum TCPListenerStatusCode {
@@ -67,7 +50,7 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
67
50
/** Keep track of open sockets to destroy in case of timeout */
68
51
private readonly sockets = new Set < net . Socket > ( )
69
52
private status : Status = { code : TCPListenerStatusCode . INACTIVE }
70
- private metrics ? : TCPListenerMetrics
53
+ private metrics : TCPListenerMetrics
71
54
private addr : string
72
55
private readonly log : Logger
73
56
private readonly shutdownController : AbortController
@@ -100,59 +83,57 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
100
83
}
101
84
}
102
85
103
- this . server
104
- . on ( 'listening' , ( ) => {
105
- if ( context . metrics != null ) {
106
- // we are listening, register metrics for our port
107
- const address = this . server . address ( )
108
-
109
- if ( address == null ) {
110
- this . addr = 'unknown'
111
- } else if ( typeof address === 'string' ) {
112
- // unix socket
113
- this . addr = address
114
- } else {
115
- this . addr = `${ address . address } :${ address . port } `
116
- }
86
+ context . metrics ?. registerMetricGroup ( 'libp2p_tcp_inbound_connections_total' , {
87
+ label : 'address' ,
88
+ help : 'Current active connections in TCP listener' ,
89
+ calculate : ( ) => {
90
+ return {
91
+ [ this . addr ] : this . sockets . size
92
+ }
93
+ }
94
+ } )
117
95
118
- context . metrics ?. registerMetricGroup ( 'libp2p_tcp_inbound_connections_total' , {
119
- label : 'address' ,
120
- help : 'Current active connections in TCP listener' ,
121
- calculate : ( ) => {
122
- return {
123
- [ this . addr ] : this . sockets . size
124
- }
125
- }
126
- } )
127
-
128
- this . metrics = {
129
- status : context . metrics . registerMetricGroup ( 'libp2p_tcp_listener_status_info' , {
130
- label : 'address' ,
131
- help : 'Current status of the TCP listener socket'
132
- } ) ,
133
- errors : context . metrics . registerMetricGroup ( 'libp2p_tcp_listener_errors_total' , {
134
- label : 'address' ,
135
- help : 'Total count of TCP listener errors by type'
136
- } ) ,
137
- events : context . metrics . registerMetricGroup ( 'libp2p_tcp_listener_events_total' , {
138
- label : 'address' ,
139
- help : 'Total count of TCP listener events by type'
140
- } )
141
- }
96
+ this . metrics = {
97
+ status : context . metrics ?. registerMetricGroup ( 'libp2p_tcp_listener_status_info' , {
98
+ label : 'address' ,
99
+ help : 'Current status of the TCP listener socket'
100
+ } ) ,
101
+ errors : context . metrics ?. registerMetricGroup ( 'libp2p_tcp_listener_errors_total' , {
102
+ label : 'address' ,
103
+ help : 'Total count of TCP listener errors by type'
104
+ } ) ,
105
+ events : context . metrics ?. registerMetricGroup ( 'libp2p_tcp_listener_events_total' , {
106
+ label : 'address' ,
107
+ help : 'Total count of TCP listener events by type'
108
+ } )
109
+ }
142
110
143
- this . metrics ?. status . update ( {
144
- [ this . addr ] : TCPListenerStatusCode . ACTIVE
145
- } )
111
+ this . server
112
+ . on ( 'listening' , ( ) => {
113
+ // we are listening, register metrics for our port
114
+ const address = this . server . address ( )
115
+
116
+ if ( address == null ) {
117
+ this . addr = 'unknown'
118
+ } else if ( typeof address === 'string' ) {
119
+ // unix socket
120
+ this . addr = address
121
+ } else {
122
+ this . addr = `${ address . address } :${ address . port } `
146
123
}
147
124
125
+ this . metrics . status ?. update ( {
126
+ [ this . addr ] : TCPListenerStatusCode . ACTIVE
127
+ } )
128
+
148
129
this . safeDispatchEvent ( 'listening' )
149
130
} )
150
131
. on ( 'error' , err => {
151
- this . metrics ? .errors . increment ( { [ `${ this . addr } listen_error` ] : true } )
132
+ this . metrics . errors ? .increment ( { [ `${ this . addr } listen_error` ] : true } )
152
133
this . safeDispatchEvent ( 'error' , { detail : err } )
153
134
} )
154
135
. on ( 'close' , ( ) => {
155
- this . metrics ? .status . update ( {
136
+ this . metrics . status ? .update ( {
156
137
[ this . addr ] : this . status . code
157
138
} )
158
139
@@ -165,12 +146,12 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
165
146
}
166
147
} )
167
148
. on ( 'drop' , ( ) => {
168
- this . metrics ? .events . increment ( { [ `${ this . addr } drop` ] : true } )
149
+ this . metrics . events ? .increment ( { [ `${ this . addr } drop` ] : true } )
169
150
} )
170
151
}
171
152
172
153
private onSocket ( socket : net . Socket ) : void {
173
- this . metrics ? .events . increment ( { [ `${ this . addr } connection` ] : true } )
154
+ this . metrics . events ? .increment ( { [ `${ this . addr } connection` ] : true } )
174
155
175
156
if ( this . status . code !== TCPListenerStatusCode . ACTIVE ) {
176
157
socket . destroy ( )
@@ -190,7 +171,7 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
190
171
} )
191
172
} catch ( err : any ) {
192
173
this . log . error ( 'inbound connection failed' , err )
193
- this . metrics ? .errors . increment ( { [ `${ this . addr } inbound_to_connection` ] : true } )
174
+ this . metrics . errors ? .increment ( { [ `${ this . addr } inbound_to_connection` ] : true } )
194
175
socket . destroy ( )
195
176
return
196
177
}
@@ -233,7 +214,7 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
233
214
} )
234
215
. catch ( async err => {
235
216
this . log . error ( 'inbound connection upgrade failed' , err )
236
- this . metrics ? .errors . increment ( { [ `${ this . addr } inbound_upgrade` ] : true } )
217
+ this . metrics . errors ? .increment ( { [ `${ this . addr } inbound_upgrade` ] : true } )
237
218
this . sockets . delete ( socket )
238
219
maConn . abort ( err )
239
220
} )
0 commit comments