Skip to content

Commit 5bf808f

Browse files
authored
Merge pull request quarkusio#50896 from jcarranzan/fix-flaky-StreamingOutputErrorHandlingTest
Stablize flaky StreamingOutputErrorHandlingTest
2 parents f2f695b + 7a121b6 commit 5bf808f

File tree

3 files changed

+32
-18
lines changed

3 files changed

+32
-18
lines changed

independent-projects/vertx-utils/src/main/java/io/quarkus/vertx/utils/VertxOutputStream.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,12 +229,12 @@ public void close() throws IOException {
229229
private record DrainHandler(VertxOutputStream out) implements Handler<Void> {
230230

231231
@Override
232-
public void handle(Void event) {
233-
synchronized (out.request.connection()) {
234-
if (out.waitingForDrain) {
235-
out.request.connection().notifyAll();
236-
}
232+
public void handle(Void event) {
233+
synchronized (out.request.connection()) {
234+
if (out.waitingForDrain) {
235+
out.request.connection().notifyAll();
237236
}
238237
}
239238
}
239+
}
240240
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.quarkus.it.resteasy.reactive.elytron;
2+
3+
import io.quarkus.test.junit.QuarkusIntegrationTest;
4+
5+
@QuarkusIntegrationTest
6+
public class StreamingOutputErrorHandlingIT extends StreamingOutputErrorHandlingTest {
7+
}

integration-tests/elytron-resteasy-reactive/src/test/java/io/quarkus/it/resteasy/reactive/elytron/StreamingOutputErrorHandlingTest.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.concurrent.CompletableFuture;
88
import java.util.concurrent.ExecutionException;
99
import java.util.concurrent.TimeUnit;
10+
import java.util.concurrent.atomic.AtomicBoolean;
1011
import java.util.concurrent.atomic.AtomicLong;
1112
import java.util.function.Consumer;
1213

@@ -55,21 +56,25 @@ public void cleanup() throws Exception {
5556
public void testStreamingOutputFailureMidStream() {
5657
AtomicLong byteCount = new AtomicLong();
5758
CompletableFuture<Void> latch = new CompletableFuture<>();
59+
AtomicBoolean connectionWasReset = new AtomicBoolean(false);
5860

5961
sendRequest("/streaming-output-error/output?fail=true", latch,
60-
b -> byteCount.addAndGet(b.length()));
62+
b -> byteCount.addAndGet(b.length()),
63+
() -> connectionWasReset.set(true));
6164

6265
Assertions.assertTimeoutPreemptively(TIMEOUT, () -> {
63-
ExecutionException ex = Assertions.assertThrows(ExecutionException.class,
64-
latch::get,
65-
"Client should have detected that the server reset the connection");
66-
67-
Assertions.assertInstanceOf(HttpClosedException.class, ex.getCause(),
68-
"Expected HttpClosedException when connection is reset mid-stream");
66+
try {
67+
latch.get();
68+
Assertions.assertEquals(EXPECTED_BYTES_FIRST_BATCH, byteCount.get(),
69+
"When connection completes without exception, should have received only first batch " +
70+
"(data was in buffer before reset)");
71+
} catch (ExecutionException ex) {
72+
Assertions.assertInstanceOf(HttpClosedException.class, ex.getCause(),
73+
"Expected HttpClosedException when connection is reset mid-stream");
74+
Assertions.assertEquals(EXPECTED_BYTES_FIRST_BATCH, byteCount.get(),
75+
"Should have received only the first batch of data before failure");
76+
}
6977
});
70-
71-
Assertions.assertEquals(EXPECTED_BYTES_FIRST_BATCH, byteCount.get(),
72-
"Should have received only the first batch of data before failure");
7378
}
7479

7580
@Test
@@ -78,8 +83,8 @@ public void testStreamingOutputSuccess() {
7883
CompletableFuture<Void> latch = new CompletableFuture<>();
7984

8085
sendRequest("/streaming-output-error/output?fail=false", latch,
81-
b -> byteCount.addAndGet(b.length()));
82-
86+
b -> byteCount.addAndGet(b.length()), () -> {
87+
});
8388
Assertions.assertTimeoutPreemptively(TIMEOUT,
8489
() -> latch.get(),
8590
"StreamingOutput should complete successfully without errors");
@@ -88,7 +93,8 @@ public void testStreamingOutputSuccess() {
8893
"Should have received all data when no errors occur");
8994
}
9095

91-
private void sendRequest(String uri, CompletableFuture<Void> latch, Consumer<Buffer> bodyConsumer) {
96+
private void sendRequest(String uri, CompletableFuture<Void> latch,
97+
Consumer<Buffer> bodyConsumer, Runnable onConnectionClose) {
9298
Handler<Throwable> failureHandler = latch::completeExceptionally;
9399

94100
client.request(HttpMethod.GET, port, "localhost", uri)
@@ -99,6 +105,7 @@ private void sendRequest(String uri, CompletableFuture<Void> latch, Consumer<Buf
99105
.onFailure(failureHandler)
100106
.onSuccess(response -> {
101107
response.request().connection().closeHandler(v -> {
108+
onConnectionClose.run();
102109
failureHandler.handle(new HttpClosedException("Connection was closed"));
103110
});
104111

0 commit comments

Comments
 (0)