6
6
*/
7
7
package org .gridsuite .study .notification .server ;
8
8
9
- import java .io .UnsupportedEncodingException ;
10
- import java .net .URLDecoder ;
11
- import java .nio .charset .StandardCharsets ;
12
- import java .time .Duration ;
13
- import java .util .HashMap ;
14
- import java .util .Map ;
15
- import java .util .function .Consumer ;
16
- import java .util .logging .Level ;
17
-
18
9
import com .fasterxml .jackson .core .JsonProcessingException ;
19
10
import com .fasterxml .jackson .databind .ObjectMapper ;
20
-
11
+ import io .micrometer .core .instrument .MeterRegistry ;
12
+ import io .micrometer .core .instrument .MultiGauge ;
13
+ import io .micrometer .core .instrument .Tags ;
21
14
import org .gridsuite .study .notification .server .dto .Filters ;
22
15
import org .gridsuite .study .notification .server .dto .FiltersToAdd ;
23
16
import org .gridsuite .study .notification .server .dto .FiltersToRemove ;
24
- import org .gridsuite .study .notification .server .dto .NetworkImpactsInfos ;
25
17
import org .slf4j .Logger ;
26
18
import org .slf4j .LoggerFactory ;
27
19
import org .springframework .beans .factory .annotation .Value ;
37
29
import reactor .core .publisher .Flux ;
38
30
import reactor .core .publisher .Mono ;
39
31
32
+ import java .io .UnsupportedEncodingException ;
33
+ import java .net .URLDecoder ;
34
+ import java .nio .charset .StandardCharsets ;
35
+ import java .time .Duration ;
36
+ import java .util .HashMap ;
37
+ import java .util .Map ;
38
+ import java .util .concurrent .ConcurrentHashMap ;
39
+ import java .util .function .Consumer ;
40
+ import java .util .logging .Level ;
41
+
42
+ import static java .util .stream .Collectors .toList ;
43
+
40
44
/**
41
45
* A WebSocketHandler that sends messages from a broker to websockets opened by clients, interleaving with pings to keep connections open.
42
46
* <p>
@@ -63,36 +67,44 @@ public class NotificationWebSocketHandler implements WebSocketHandler {
63
67
static final String HEADER_TIMESTAMP = "timestamp" ;
64
68
static final String HEADER_ERROR = "error" ;
65
69
static final String HEADER_SUBSTATIONS_IDS = "substationsIds" ;
66
- static final String HEADER_DELETED_EQUIPMENTS = "deletedEquipments" ;
67
70
static final String HEADER_NODE = "node" ;
68
71
static final String HEADER_NODES = "nodes" ;
69
72
static final String HEADER_PARENT_NODE = "parentNode" ;
70
73
static final String HEADER_NEW_NODE = "newNode" ;
71
74
static final String HEADER_MOVED_NODE = "movedNode" ;
72
75
static final String HEADER_REMOVE_CHILDREN = "removeChildren" ;
73
76
static final String HEADER_INSERT_MODE = "insertMode" ;
77
+ static final String HEADER_REFERENCE_NODE_UUID = "referenceNodeUuid" ;
78
+ static final String HEADER_INDEXATION_STATUS = "indexation_status" ;
79
+ static final String USERS_METER_NAME = "app.users" ;
80
+ static final String USER_TAG = "user" ;
81
+
82
+ private final ObjectMapper jacksonObjectMapper ;
74
83
public static final String HEADER_PARAMS_NAME = "paramsName" ;
75
84
76
- private ObjectMapper jacksonObjectMapper ;
85
+ private final int heartbeatInterval ;
77
86
78
- private int heartbeatInterval ;
87
+ private final Map < String , Integer > userConnections = new ConcurrentHashMap <>() ;
79
88
80
- public NotificationWebSocketHandler (ObjectMapper jacksonObjectMapper , @ Value ("${notification.websocket.heartbeat.interval:30}" ) int heartbeatInterval ) {
89
+ private final MultiGauge multiGauge ;
90
+
91
+ public NotificationWebSocketHandler (ObjectMapper jacksonObjectMapper , MeterRegistry meterRegistry , @ Value ("${notification.websocket.heartbeat.interval:30}" ) int heartbeatInterval ) {
81
92
this .jacksonObjectMapper = jacksonObjectMapper ;
82
93
this .heartbeatInterval = heartbeatInterval ;
94
+ this .multiGauge = MultiGauge .builder (USERS_METER_NAME ).description ("The current number of connections per user" ).register (meterRegistry );
83
95
}
84
96
85
- Flux <Message <NetworkImpactsInfos >> flux ;
97
+ Flux <Message <String >> flux ;
86
98
87
99
@ Bean
88
- public Consumer <Flux <Message <NetworkImpactsInfos >>> consumeNotification () {
100
+ public Consumer <Flux <Message <String >>> consumeNotification () {
89
101
return f -> {
90
- ConnectableFlux <Message <NetworkImpactsInfos >> c = f .log (CATEGORY_BROKER_INPUT , Level .FINE ).publish ();
102
+ ConnectableFlux <Message <String >> c = f .log (CATEGORY_BROKER_INPUT , Level .FINE ).publish ();
91
103
this .flux = c ;
92
104
c .connect ();
93
105
// Force connect 1 fake subscriber to consumme messages as they come.
94
106
// Otherwise, reactorcore buffers some messages (not until the connectable flux had
95
- // at least one subscriber. Is there a better way ?
107
+ // at least one subscriber) . Is there a better way ?
96
108
c .subscribe ();
97
109
};
98
110
}
@@ -134,6 +146,8 @@ private static Map<String, Object> toResultHeader(Map<String, Object> messageHea
134
146
passHeader (messageHeader , resHeader , HEADER_NEW_NODE );
135
147
passHeader (messageHeader , resHeader , HEADER_MOVED_NODE );
136
148
passHeader (messageHeader , resHeader , HEADER_USER_ID ); // to filter the display of error messages in the front end
149
+ passHeader (messageHeader , resHeader , HEADER_REFERENCE_NODE_UUID );
150
+ passHeader (messageHeader , resHeader , HEADER_INDEXATION_STATUS );
137
151
passHeader (messageHeader , resHeader , HEADER_PARAMS_NAME );
138
152
139
153
return resHeader ;
@@ -210,10 +224,30 @@ public Mono<Void> handle(WebSocketSession webSocketSession) {
210
224
webSocketSession .getAttributes ().put (FILTER_UPDATE_TYPE , filterUpdateType );
211
225
}
212
226
213
- LOGGER .debug ("New websocket connection for studyUuid={}, updateType={}" , filterStudyUuid , filterUpdateType );
214
227
return webSocketSession
215
- .send (notificationFlux (webSocketSession )
216
- .mergeWith (heartbeatFlux (webSocketSession )))
217
- .and (receive (webSocketSession ));
228
+ .send (notificationFlux (webSocketSession ).mergeWith (heartbeatFlux (webSocketSession )))
229
+ .and (receive (webSocketSession ))
230
+ .doFirst (() -> updateConnectionMetrics (webSocketSession ))
231
+ .doFinally (s -> updateDisconnectionMetrics (webSocketSession ));
232
+ }
233
+
234
+ private void updateConnectionMetrics (WebSocketSession webSocketSession ) {
235
+ var userId = webSocketSession .getHandshakeInfo ().getHeaders ().getFirst (HEADER_USER_ID );
236
+ LOGGER .info ("New websocket connection id={} for user={} studyUuid={}, updateType={}" , webSocketSession .getId (), userId ,
237
+ webSocketSession .getAttributes ().get (FILTER_STUDY_UUID ), webSocketSession .getAttributes ().get (FILTER_UPDATE_TYPE ));
238
+ userConnections .compute (userId , (k , v ) -> (v == null ) ? 1 : v + 1 );
239
+ updateConnectionMetricsRegistry ();
240
+ }
241
+
242
+ private void updateDisconnectionMetrics (WebSocketSession webSocketSession ) {
243
+ var userId = webSocketSession .getHandshakeInfo ().getHeaders ().getFirst (HEADER_USER_ID );
244
+ LOGGER .info ("Websocket disconnection id={} for user={}" , webSocketSession .getId (), userId );
245
+ userConnections .computeIfPresent (userId , (k , v ) -> v > 1 ? v - 1 : null );
246
+ updateConnectionMetricsRegistry ();
247
+ }
248
+
249
+ private void updateConnectionMetricsRegistry () {
250
+ multiGauge .register (userConnections .entrySet ().stream ().map (e -> MultiGauge .Row .of (Tags .of (USER_TAG , e .getKey ()), e .getValue ()))
251
+ .collect (toList ()), true );
218
252
}
219
253
}
0 commit comments