Skip to content

Commit c6864e7

Browse files
authored
Fix user metrics (#77)
* Register only once the meterRegistry to correctly account for changes in users map * Fix concurrency issue in test
1 parent 232cd15 commit c6864e7

File tree

2 files changed

+78
-47
lines changed

2 files changed

+78
-47
lines changed

src/main/java/org/gridsuite/study/notification/server/NotificationWebSocketHandler.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,12 @@ public class NotificationWebSocketHandler implements WebSocketHandler {
8585

8686
private final Map<String, Integer> userConnections = new ConcurrentHashMap<>();
8787

88-
private final MeterRegistry meterRegistry;
88+
private final MultiGauge multiGauge;
8989

9090
public NotificationWebSocketHandler(ObjectMapper jacksonObjectMapper, MeterRegistry meterRegistry, @Value("${notification.websocket.heartbeat.interval:30}") int heartbeatInterval) {
9191
this.jacksonObjectMapper = jacksonObjectMapper;
9292
this.heartbeatInterval = heartbeatInterval;
93-
this.meterRegistry = meterRegistry;
93+
this.multiGauge = MultiGauge.builder(USERS_METER_NAME).description("The current number of connections per user").register(meterRegistry);
9494
}
9595

9696
Flux<Message<String>> flux;
@@ -245,11 +245,7 @@ private void updateDisconnectionMetrics(WebSocketSession webSocketSession) {
245245
}
246246

247247
private void updateConnectionMetricsRegistry() {
248-
MultiGauge.builder(USERS_METER_NAME)
249-
.description("The current number of connections per user")
250-
.register(meterRegistry)
251-
.register(userConnections.entrySet().stream()
252-
.map(e -> MultiGauge.Row.of(Tags.of(USER_TAG, e.getKey()), e.getValue()))
248+
multiGauge.register(userConnections.entrySet().stream().map(e -> MultiGauge.Row.of(Tags.of(USER_TAG, e.getKey()), e.getValue()))
253249
.collect(toList()), true);
254250
}
255251
}

src/test/java/org/gridsuite/study/notification/server/NotificationWebSocketIT.java

Lines changed: 75 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@
2121
import reactor.core.publisher.Mono;
2222

2323
import java.net.URI;
24+
import java.util.Map;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.CountDownLatch;
2427

2528
import static org.gridsuite.study.notification.server.NotificationWebSocketHandler.*;
29+
import static org.junit.Assert.assertEquals;
2630

2731
/**
2832
* @author Jon Harper <jon.harper at rte-france.com>
@@ -50,44 +54,75 @@ public void echo() {
5054
protected URI getUrl(String path) {
5155
return URI.create("ws://localhost:" + this.port + path);
5256
}
53-
// FIXME: disabled tests because they are not reproducible on github action
54-
// @Test
55-
// public void metricsMapOneUserTwoConnections() {
56-
// WebSocketClient client1 = new StandardWebSocketClient();
57-
// HttpHeaders httpHeaders1 = new HttpHeaders();
58-
// String user = "test";
59-
// httpHeaders1.add(HEADER_USER_ID, user);
60-
// Map<String, Double> exp = Map.of(user, 2d);
61-
// Mono<Void> connection1 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> testMeterMap(exp)));
62-
// Mono<Void> connection2 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> testMeterMap(exp)));
63-
//
64-
// Mono.zip(connection1, connection2).block();
65-
// }
66-
//
67-
// @Test
68-
// public void metricsMapTwoUsers() {
69-
// // First WebSocketClient for connections related to 'test' user
70-
// WebSocketClient client1 = new StandardWebSocketClient();
71-
// HttpHeaders httpHeaders1 = new HttpHeaders();
72-
// String user1 = "test";
73-
// httpHeaders1.add(HEADER_USER_ID, user1);
74-
// String user2 = "test1";
75-
// Map<String, Double> exp = Map.of(user1, 2d, user2, 1d);
76-
// Mono<Void> connection1 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> testMeterMap(exp)));
77-
// Mono<Void> connection2 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> testMeterMap(exp)));
78-
//
79-
// // Second WebSocketClient for connections related to 'test1' user
80-
// WebSocketClient client2 = new StandardWebSocketClient();
81-
// HttpHeaders httpHeaders2 = new HttpHeaders();
82-
// httpHeaders2.add(HEADER_USER_ID, user2);
83-
// Mono<Void> connection3 = client2.execute(getUrl("/notify"), httpHeaders2, ws -> Mono.fromRunnable(() -> testMeterMap(exp)));
84-
//
85-
// Mono.zip(connection1, connection2, connection3).block();
86-
// }
87-
//
88-
// private void testMeterMap(Map<String, Double> userMap) {
89-
// for (Map.Entry<String, Double> userEntry : userMap.entrySet()) {
90-
// assertEquals(userEntry.getValue(), meterRegistry.get(USERS_METER_NAME).tag(USER_TAG, userEntry.getKey()).gauge().value(), 0);
91-
// }
92-
// }
57+
58+
@Test
59+
public void metricsMapOneUserTwoConnections() {
60+
WebSocketClient client1 = new StandardWebSocketClient();
61+
HttpHeaders httpHeaders1 = new HttpHeaders();
62+
String user = "test";
63+
httpHeaders1.add(HEADER_USER_ID, user);
64+
Map<String, Double> exp = Map.of(user, 2d);
65+
CountDownLatch connectionLatch = new CountDownLatch(2);
66+
CountDownLatch assertLatch = new CountDownLatch(1);
67+
68+
Mono<Void> connection1 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> handleLatches(connectionLatch, assertLatch)));
69+
Mono<Void> connection2 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> handleLatches(connectionLatch, assertLatch)));
70+
71+
CompletableFuture<Void> evaluationFuture = evaluateAssert(connectionLatch, exp, assertLatch);
72+
Mono.zip(connection1, connection2).block();
73+
evaluationFuture.join(); // Throw assertion errors
74+
}
75+
76+
@Test
77+
public void metricsMapTwoUsers() {
78+
// First WebSocketClient for connections related to 'test' user
79+
WebSocketClient client1 = new StandardWebSocketClient();
80+
HttpHeaders httpHeaders1 = new HttpHeaders();
81+
String user1 = "test";
82+
httpHeaders1.add(HEADER_USER_ID, user1);
83+
String user2 = "test1";
84+
Map<String, Double> exp = Map.of(user1, 2d, user2, 1d);
85+
CountDownLatch connectionLatch = new CountDownLatch(3);
86+
CountDownLatch assertLatch = new CountDownLatch(1);
87+
Mono<Void> connection1 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> handleLatches(connectionLatch, assertLatch)));
88+
Mono<Void> connection2 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> handleLatches(connectionLatch, assertLatch)));
89+
90+
// Second WebSocketClient for connections related to 'test1' user
91+
WebSocketClient client2 = new StandardWebSocketClient();
92+
HttpHeaders httpHeaders2 = new HttpHeaders();
93+
httpHeaders2.add(HEADER_USER_ID, user2);
94+
Mono<Void> connection3 = client2.execute(getUrl("/notify"), httpHeaders2, ws -> Mono.fromRunnable(() -> handleLatches(connectionLatch, assertLatch)));
95+
96+
CompletableFuture<Void> evaluationFuture = evaluateAssert(connectionLatch, exp, assertLatch);
97+
Mono.zip(connection1, connection2, connection3).block();
98+
evaluationFuture.join(); // Throw assertion errors
99+
}
100+
101+
private void handleLatches(CountDownLatch connectionLatch, CountDownLatch assertLatch) {
102+
try {
103+
connectionLatch.countDown();
104+
assertLatch.await(); // Wait for assertion to be evaluated before closing the connection
105+
} catch (InterruptedException e) {
106+
throw new RuntimeException(e);
107+
}
108+
}
109+
110+
private CompletableFuture<Void> evaluateAssert(CountDownLatch connectionLatch, Map<String, Double> exp, CountDownLatch assertLatch) {
111+
return CompletableFuture.runAsync(() -> {
112+
try {
113+
connectionLatch.await(); // Wait for connections to be established
114+
testMeterMap(exp);
115+
} catch (InterruptedException e) {
116+
throw new RuntimeException(e);
117+
} finally {
118+
assertLatch.countDown(); // Close connections if there is an assertion error
119+
}
120+
});
121+
}
122+
123+
private void testMeterMap(Map<String, Double> userMap) {
124+
for (Map.Entry<String, Double> userEntry : userMap.entrySet()) {
125+
assertEquals(userEntry.getValue(), meterRegistry.get(USERS_METER_NAME).tag(USER_TAG, userEntry.getKey()).gauge().value(), 0);
126+
}
127+
}
93128
}

0 commit comments

Comments
 (0)