Skip to content

Commit 2f6ea53

Browse files
authored
Add connection metrics per username (#75)
1 parent b7e5ecc commit 2f6ea53

File tree

2 files changed

+56
-27
lines changed

2 files changed

+56
-27
lines changed

src/main/java/org/gridsuite/study/notification/server/NotificationWebSocketHandler.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88

99
import com.fasterxml.jackson.core.JsonProcessingException;
1010
import com.fasterxml.jackson.databind.ObjectMapper;
11-
import io.micrometer.core.instrument.Gauge;
1211
import io.micrometer.core.instrument.MeterRegistry;
12+
import io.micrometer.core.instrument.MultiGauge;
13+
import io.micrometer.core.instrument.Tags;
1314
import org.gridsuite.study.notification.server.dto.Filters;
1415
import org.gridsuite.study.notification.server.dto.FiltersToAdd;
1516
import org.gridsuite.study.notification.server.dto.FiltersToRemove;
@@ -38,6 +39,8 @@
3839
import java.util.function.Consumer;
3940
import java.util.logging.Level;
4041

42+
import static java.util.stream.Collectors.toList;
43+
4144
/**
4245
* A WebSocketHandler that sends messages from a broker to websockets opened by clients, interleaving with pings to keep connections open.
4346
* <p>
@@ -73,25 +76,21 @@ public class NotificationWebSocketHandler implements WebSocketHandler {
7376
static final String HEADER_INSERT_MODE = "insertMode";
7477
static final String HEADER_REFERENCE_NODE_UUID = "referenceNodeUuid";
7578
static final String HEADER_INDEXATION_STATUS = "indexation_status";
76-
7779
static final String USERS_METER_NAME = "app.users";
78-
static final String CONNECTIONS_METER_NAME = "app.connections";
80+
static final String USER_TAG = "user";
7981

8082
private final ObjectMapper jacksonObjectMapper;
8183

8284
private final int heartbeatInterval;
8385

8486
private final Map<String, Integer> userConnections = new ConcurrentHashMap<>();
8587

88+
private final MeterRegistry meterRegistry;
89+
8690
public NotificationWebSocketHandler(ObjectMapper jacksonObjectMapper, MeterRegistry meterRegistry, @Value("${notification.websocket.heartbeat.interval:30}") int heartbeatInterval) {
8791
this.jacksonObjectMapper = jacksonObjectMapper;
8892
this.heartbeatInterval = heartbeatInterval;
89-
initMetrics(meterRegistry);
90-
}
91-
92-
private void initMetrics(MeterRegistry meterRegistry) {
93-
Gauge.builder(USERS_METER_NAME, userConnections::size).register(meterRegistry);
94-
Gauge.builder(CONNECTIONS_METER_NAME, () -> userConnections.values().stream().mapToInt(Integer::intValue).sum()).register(meterRegistry);
93+
this.meterRegistry = meterRegistry;
9594
}
9695

9796
Flux<Message<String>> flux;
@@ -235,11 +234,22 @@ private void updateConnectionMetrics(WebSocketSession webSocketSession) {
235234
LOGGER.info("New websocket connection id={} for user={} studyUuid={}, updateType={}", webSocketSession.getId(), userId,
236235
webSocketSession.getAttributes().get(FILTER_STUDY_UUID), webSocketSession.getAttributes().get(FILTER_UPDATE_TYPE));
237236
userConnections.compute(userId, (k, v) -> (v == null) ? 1 : v + 1);
237+
updateConnectionMetricsRegistry();
238238
}
239239

240240
private void updateDisconnectionMetrics(WebSocketSession webSocketSession) {
241241
var userId = webSocketSession.getHandshakeInfo().getHeaders().getFirst(HEADER_USER_ID);
242242
LOGGER.info("Websocket disconnection id={} for user={}", webSocketSession.getId(), userId);
243243
userConnections.computeIfPresent(userId, (k, v) -> v > 1 ? v - 1 : null);
244+
updateConnectionMetricsRegistry();
245+
}
246+
247+
private void updateConnectionMetricsRegistry() {
248+
MultiGauge.builder(USERS_METER_NAME)
249+
.description("The current number of connections per user")
250+
.register(meterRegistry)
251+
.register(userConnections.entrySet().stream()
252+
.map(e -> MultiGauge.Row.of(Tags.of(USER_TAG, e.getKey()), e.getValue()))
253+
.collect(toList()), true);
244254
}
245255
}

src/test/java/org/gridsuite/study/notification/server/NotificationWebSocketIT.java

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
*/
77
package org.gridsuite.study.notification.server;
88

9-
import io.micrometer.core.instrument.Gauge;
109
import io.micrometer.core.instrument.MeterRegistry;
1110
import org.junit.Test;
1211
import org.junit.runner.RunWith;
@@ -22,10 +21,10 @@
2221
import reactor.core.publisher.Mono;
2322

2423
import java.net.URI;
24+
import java.util.Map;
2525

2626
import static org.gridsuite.study.notification.server.NotificationWebSocketHandler.*;
2727
import static org.junit.Assert.assertEquals;
28-
import static org.junit.Assert.assertNotNull;
2928

3029
/**
3130
* @author Jon Harper <jon.harper at rte-france.com>
@@ -50,27 +49,47 @@ public void echo() {
5049
client.execute(getUrl("/notify"), httpHeaders, ws -> Mono.empty()).block();
5150
}
5251

53-
@Test
54-
public void metrics() {
55-
testMeters(0);
56-
WebSocketClient client = new StandardWebSocketClient();
57-
HttpHeaders httpHeaders = new HttpHeaders();
58-
httpHeaders.add(HEADER_USER_ID, "test");
59-
client.execute(getUrl("/notify"), httpHeaders, ws -> Mono.fromRunnable(() -> testMeters(1))).block();
60-
}
61-
6252
protected URI getUrl(String path) {
6353
return URI.create("ws://localhost:" + this.port + path);
6454
}
6555

66-
private void testMeters(int val) {
67-
testMeter(USERS_METER_NAME, val);
68-
testMeter(CONNECTIONS_METER_NAME, val);
56+
@Test
57+
public void metricsMapOneUserTwoConnections() {
58+
WebSocketClient client1 = new StandardWebSocketClient();
59+
HttpHeaders httpHeaders1 = new HttpHeaders();
60+
String user = "test";
61+
httpHeaders1.add(HEADER_USER_ID, user);
62+
Map<String, Double> exp = Map.of(user, 2d);
63+
Mono<Void> connection1 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> testMeterMap(exp)));
64+
Mono<Void> connection2 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> testMeterMap(exp)));
65+
66+
Mono.zip(connection1, connection2).block();
67+
}
68+
69+
@Test
70+
public void metricsMapTwoUsers() {
71+
// First WebSocketClient for connections related to 'test' user
72+
WebSocketClient client1 = new StandardWebSocketClient();
73+
HttpHeaders httpHeaders1 = new HttpHeaders();
74+
String user1 = "test";
75+
httpHeaders1.add(HEADER_USER_ID, user1);
76+
String user2 = "test1";
77+
Map<String, Double> exp = Map.of(user1, 2d, user2, 1d);
78+
Mono<Void> connection1 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> testMeterMap(exp)));
79+
Mono<Void> connection2 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> testMeterMap(exp)));
80+
81+
// Second WebSocketClient for connections related to 'test1' user
82+
WebSocketClient client2 = new StandardWebSocketClient();
83+
HttpHeaders httpHeaders2 = new HttpHeaders();
84+
httpHeaders2.add(HEADER_USER_ID, user2);
85+
Mono<Void> connection3 = client2.execute(getUrl("/notify"), httpHeaders2, ws -> Mono.fromRunnable(() -> testMeterMap(exp)));
86+
87+
Mono.zip(connection1, connection2, connection3).block();
6988
}
7089

71-
private void testMeter(String name, int val) {
72-
Gauge meter = meterRegistry.get(name).gauge();
73-
assertNotNull(meter);
74-
assertEquals(val, Double.valueOf(meter.value()).intValue());
90+
private void testMeterMap(Map<String, Double> userMap) {
91+
for (Map.Entry<String, Double> userEntry : userMap.entrySet()) {
92+
assertEquals(userEntry.getValue(), meterRegistry.get(USERS_METER_NAME).tag(USER_TAG, userEntry.getKey()).gauge().value(), 0);
93+
}
7594
}
7695
}

0 commit comments

Comments
 (0)