|
33 | 33 | import java.util.Collections; |
34 | 34 | import java.util.List; |
35 | 35 | import java.util.concurrent.TimeUnit; |
36 | | -import java.util.concurrent.atomic.AtomicBoolean; |
37 | 36 | import java.util.concurrent.atomic.AtomicInteger; |
38 | 37 | import java.util.concurrent.atomic.AtomicReference; |
39 | 38 | import java.util.function.Function; |
@@ -185,13 +184,10 @@ private void awaitQueueSize(TestContext ctx, AtomicInteger queueSize, int num) t |
185 | 184 | public void testConnectionLost(TestContext ctx) throws Exception { |
186 | 185 | SqlConnectOptions connectOptions = connectOptions(); |
187 | 186 | ProxyServer proxy = ProxyServer.create(vertx, connectOptions.getPort(), connectOptions.getHost()); |
188 | | - AtomicBoolean firstConnection = new AtomicBoolean(true); |
| 187 | + AtomicReference<ProxyServer.Connection> firstConnection = new AtomicReference<>(); |
189 | 188 | proxy.proxyHandler(proxiedConn -> { |
190 | | - if (firstConnection.compareAndSet(true, false)) { |
| 189 | + if (firstConnection.compareAndSet(null, proxiedConn)) { |
191 | 190 | proxiedConn.connect(); |
192 | | - vertx.eventBus().consumer("disconnect", msg -> { |
193 | | - proxiedConn.clientSocket().close(); |
194 | | - }); |
195 | 191 | } |
196 | 192 | }); |
197 | 193 | // Start proxy |
@@ -232,7 +228,7 @@ public void dequeue(Object taskMetric) { |
232 | 228 | } |
233 | 229 | conn.closeHandler(v -> async.countDown()); |
234 | 230 | awaitQueueSize(ctx, queueSize, 16); |
235 | | - vertx.eventBus().send("disconnect", "boom"); |
| 231 | + firstConnection.get().clientSocket().close(); |
236 | 232 | async.await(20_000); |
237 | 233 | ctx.assertEquals(0, queueSize.get()); |
238 | 234 | } |
|
0 commit comments