Skip to content

Commit d000586

Browse files
authored
Add subscribeOn boundedelastic on every connection to avoid locking unwanted threads (#78)
1 parent c6864e7 commit d000586

File tree

1 file changed

+6
-5
lines changed

1 file changed

+6
-5
lines changed

src/test/java/org/gridsuite/study/notification/server/NotificationWebSocketIT.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.springframework.web.reactive.socket.client.StandardWebSocketClient;
2020
import org.springframework.web.reactive.socket.client.WebSocketClient;
2121
import reactor.core.publisher.Mono;
22+
import reactor.core.scheduler.Schedulers;
2223

2324
import java.net.URI;
2425
import java.util.Map;
@@ -65,8 +66,8 @@ public void metricsMapOneUserTwoConnections() {
6566
CountDownLatch connectionLatch = new CountDownLatch(2);
6667
CountDownLatch assertLatch = new CountDownLatch(1);
6768

68-
Mono<Void> connection1 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> handleLatches(connectionLatch, assertLatch)));
69-
Mono<Void> connection2 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> handleLatches(connectionLatch, assertLatch)));
69+
Mono<Void> connection1 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> handleLatches(connectionLatch, assertLatch))).subscribeOn(Schedulers.boundedElastic());
70+
Mono<Void> connection2 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> handleLatches(connectionLatch, assertLatch))).subscribeOn(Schedulers.boundedElastic());
7071

7172
CompletableFuture<Void> evaluationFuture = evaluateAssert(connectionLatch, exp, assertLatch);
7273
Mono.zip(connection1, connection2).block();
@@ -84,14 +85,14 @@ public void metricsMapTwoUsers() {
8485
Map<String, Double> exp = Map.of(user1, 2d, user2, 1d);
8586
CountDownLatch connectionLatch = new CountDownLatch(3);
8687
CountDownLatch assertLatch = new CountDownLatch(1);
87-
Mono<Void> connection1 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> handleLatches(connectionLatch, assertLatch)));
88-
Mono<Void> connection2 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> handleLatches(connectionLatch, assertLatch)));
88+
Mono<Void> connection1 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> handleLatches(connectionLatch, assertLatch))).subscribeOn(Schedulers.boundedElastic());
89+
Mono<Void> connection2 = client1.execute(getUrl("/notify"), httpHeaders1, ws -> Mono.fromRunnable(() -> handleLatches(connectionLatch, assertLatch))).subscribeOn(Schedulers.boundedElastic());
8990

9091
// Second WebSocketClient for connections related to 'test1' user
9192
WebSocketClient client2 = new StandardWebSocketClient();
9293
HttpHeaders httpHeaders2 = new HttpHeaders();
9394
httpHeaders2.add(HEADER_USER_ID, user2);
94-
Mono<Void> connection3 = client2.execute(getUrl("/notify"), httpHeaders2, ws -> Mono.fromRunnable(() -> handleLatches(connectionLatch, assertLatch)));
95+
Mono<Void> connection3 = client2.execute(getUrl("/notify"), httpHeaders2, ws -> Mono.fromRunnable(() -> handleLatches(connectionLatch, assertLatch))).subscribeOn(Schedulers.boundedElastic());
9596

9697
CompletableFuture<Void> evaluationFuture = evaluateAssert(connectionLatch, exp, assertLatch);
9798
Mono.zip(connection1, connection2, connection3).block();

0 commit comments

Comments
 (0)