1
1
import os from 'os'
2
- import { TypedEventEmitter , CustomEvent } from '@libp2p/interface'
2
+ import { TypedEventEmitter } from '@libp2p/interface'
3
3
import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr'
4
4
import { multiaddr , protocols } from '@multiformats/multiaddr'
5
5
import { createServer } from 'it-ws/server'
6
6
import { socketToMaConn } from './socket-to-conn.js'
7
- import type { ComponentLogger , Logger , Connection , Listener , ListenerEvents , CreateListenerOptions } from '@libp2p/interface'
7
+ import type { ComponentLogger , Logger , Connection , Listener , ListenerEvents , CreateListenerOptions , CounterGroup , MetricGroup , Metrics } from '@libp2p/interface'
8
8
import type { Multiaddr } from '@multiformats/multiaddr'
9
9
import type { Server } from 'http'
10
10
import type { DuplexWebSocket } from 'it-ws/duplex'
11
11
import type { WebSocketServer } from 'it-ws/server'
12
12
13
13
export interface WebSocketListenerComponents {
14
14
logger : ComponentLogger
15
+ metrics ?: Metrics
15
16
}
16
17
17
18
export interface WebSocketListenerInit extends CreateListenerOptions {
18
19
server ?: Server
19
20
}
20
21
22
+ export interface WebSocketListenerMetrics {
23
+ status : MetricGroup
24
+ errors : CounterGroup
25
+ events : CounterGroup
26
+ }
27
+
21
28
class WebSocketListener extends TypedEventEmitter < ListenerEvents > implements Listener {
22
29
private readonly connections : Set < DuplexWebSocket >
23
30
private listeningMultiaddr ?: Multiaddr
24
31
private readonly server : WebSocketServer
25
32
private readonly log : Logger
33
+ private metrics ?: WebSocketListenerMetrics
34
+ private addr : string
26
35
27
36
constructor ( components : WebSocketListenerComponents , init : WebSocketListenerInit ) {
28
37
super ( )
29
38
30
39
this . log = components . logger . forComponent ( 'libp2p:websockets:listener' )
40
+ const metrics = components . metrics
31
41
// Keep track of open connections to destroy when the listener is closed
32
42
this . connections = new Set < DuplexWebSocket > ( )
33
43
34
44
const self = this // eslint-disable-line @typescript-eslint/no-this-alias
35
45
46
+ this . addr = 'unknown'
47
+
36
48
this . server = createServer ( {
37
49
...init ,
38
50
onConnection : ( stream : DuplexWebSocket ) => {
39
51
const maConn = socketToMaConn ( stream , toMultiaddr ( stream . remoteAddress ?? '' , stream . remotePort ?? 0 ) , {
40
- logger : components . logger
52
+ logger : components . logger ,
53
+ metrics : this . metrics ?. events ,
54
+ metricPrefix : `${ this . addr } `
41
55
} )
42
56
this . log ( 'new inbound connection %s' , maConn . remoteAddr )
43
57
@@ -62,6 +76,7 @@ class WebSocketListener extends TypedEventEmitter<ListenerEvents> implements Lis
62
76
} )
63
77
. catch ( async err => {
64
78
this . log . error ( 'inbound connection failed to upgrade' , err )
79
+ this . metrics ?. errors . increment ( { [ `${ this . addr } inbound_upgrade` ] : true } )
65
80
66
81
await maConn . close ( ) . catch ( err => {
67
82
this . log . error ( 'inbound connection failed to close after upgrade failed' , err )
@@ -71,15 +86,46 @@ class WebSocketListener extends TypedEventEmitter<ListenerEvents> implements Lis
71
86
this . log . error ( 'inbound connection failed to upgrade' , err )
72
87
maConn . close ( ) . catch ( err => {
73
88
this . log . error ( 'inbound connection failed to close after upgrade failed' , err )
89
+ this . metrics ?. errors . increment ( { [ `${ this . addr } inbound_closing_failed` ] : true } )
74
90
} )
75
91
}
76
92
}
77
93
} )
78
94
79
95
this . server . on ( 'listening' , ( ) => {
96
+ if ( metrics != null ) {
97
+ const { host, port } = this . listeningMultiaddr ?. toOptions ( ) ?? { }
98
+ this . addr = `${ host } :${ port } `
99
+
100
+ metrics . registerMetricGroup ( 'libp2p_websockets_inbound_connections_total' , {
101
+ label : 'address' ,
102
+ help : 'Current active connections in WebSocket listener' ,
103
+ calculate : ( ) => {
104
+ return {
105
+ [ this . addr ] : this . connections . size
106
+ }
107
+ }
108
+ } )
109
+
110
+ this . metrics = {
111
+ status : metrics ?. registerMetricGroup ( 'libp2p_websockets_listener_status_info' , {
112
+ label : 'address' ,
113
+ help : 'Current status of the WebSocket listener socket'
114
+ } ) ,
115
+ errors : metrics ?. registerMetricGroup ( 'libp2p_websockets_listener_errors_total' , {
116
+ label : 'address' ,
117
+ help : 'Total count of WebSocket listener errors by type'
118
+ } ) ,
119
+ events : metrics ?. registerMetricGroup ( 'libp2p_websockets_listener_events_total' , {
120
+ label : 'address' ,
121
+ help : 'Total count of WebSocket listener events by type'
122
+ } )
123
+ }
124
+ }
80
125
this . dispatchEvent ( new CustomEvent ( 'listening' ) )
81
126
} )
82
127
this . server . on ( 'error' , ( err : Error ) => {
128
+ this . metrics ?. errors . increment ( { [ `${ this . addr } listen_error` ] : true } )
83
129
this . dispatchEvent ( new CustomEvent ( 'error' , {
84
130
detail : err
85
131
} ) )
0 commit comments