Skip to content

Commit 00973e8

Browse files
committed
Bug fix: Fixed a race condition in async message stream handlers that can occur when a message with no content body is being submitted asynchronously
1 parent 1bf8e65 commit 00973e8

File tree

5 files changed

+142
-9
lines changed

5 files changed

+142
-9
lines changed

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,13 @@ private void commitRequest(final HttpRequest request, final EntityDetails entity
142142
httpProcessor.process(request, entityDetails, context);
143143

144144
final List<Header> headers = DefaultH2RequestConverter.INSTANCE.convert(request);
145-
outputChannel.submit(headers, entityDetails == null);
146-
connMetrics.incrementRequestCount();
147-
148145
if (entityDetails == null) {
149146
requestState.set(MessageState.COMPLETE);
147+
outputChannel.submit(headers, true);
148+
connMetrics.incrementRequestCount();
150149
} else {
150+
outputChannel.submit(headers, false);
151+
connMetrics.incrementRequestCount();
151152
final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
152153
final boolean expectContinue = h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue());
153154
if (expectContinue) {

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamHandler.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,14 @@ private void commitResponse(
177177
final List<Header> responseHeaders = DefaultH2ResponseConverter.INSTANCE.convert(response);
178178

179179
final boolean endStream = responseEntityDetails == null ||
180-
(receivedRequest != null && Method.HEAD.isSame(receivedRequest.getMethod()));
181-
outputChannel.submit(responseHeaders, endStream);
182-
connMetrics.incrementResponseCount();
180+
receivedRequest != null && Method.HEAD.isSame(receivedRequest.getMethod());
183181
if (endStream) {
184182
responseState.set(MessageState.COMPLETE);
183+
outputChannel.submit(responseHeaders, endStream);
184+
connMetrics.incrementResponseCount();
185185
} else {
186+
outputChannel.submit(responseHeaders, endStream);
187+
connMetrics.incrementResponseCount();
186188
exchangeHandler.produce(outputChannel);
187189
if (responseState.compareAndSet(MessageState.IDLE, MessageState.BODY)) {
188190
outputChannel.requestOutput();

httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
117117
import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
118118
import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
119+
import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder;
119120
import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
120121
import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
121122
import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
@@ -142,6 +143,8 @@
142143
import org.apache.hc.core5.util.TextUtils;
143144
import org.apache.hc.core5.util.Timeout;
144145
import org.hamcrest.CoreMatchers;
146+
import org.hamcrest.MatcherAssert;
147+
import org.hamcrest.Matchers;
145148
import org.junit.jupiter.api.Assertions;
146149
import org.junit.jupiter.api.Test;
147150
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -2095,4 +2098,130 @@ public void releaseResources() {
20952098
}
20962099
}
20972100

2101+
@Test
2102+
void testDelayedResponseSubmission() throws Exception {
2103+
final Http1TestServer server = resources.server();
2104+
final Http1TestClient client = resources.client();
2105+
2106+
server.register("/hello", () -> new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
2107+
2108+
private final Random random = new Random(System.currentTimeMillis());
2109+
2110+
@Override
2111+
protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
2112+
final HttpRequest request,
2113+
final EntityDetails entityDetails,
2114+
final HttpContext context) throws HttpException {
2115+
return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
2116+
}
2117+
2118+
@Override
2119+
protected void handle(
2120+
final Message<HttpRequest, String> requestMessage,
2121+
final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
2122+
final HttpContext context) throws HttpException, IOException {
2123+
executorResource.getExecutorService().execute(() -> {
2124+
try {
2125+
Thread.sleep(random.nextInt(200));
2126+
responseTrigger.submitResponse(AsyncResponseBuilder.create(HttpStatus.SC_OK)
2127+
.setEntity(new MultiLineEntityProducer("All is well", 100))
2128+
.build(),
2129+
context);
2130+
Thread.sleep(random.nextInt(200));
2131+
} catch (final Exception ignore) {
2132+
// ignore
2133+
}
2134+
});
2135+
2136+
}
2137+
2138+
});
2139+
final InetSocketAddress serverEndpoint = server.start();
2140+
2141+
client.start();
2142+
2143+
final Future<ClientSessionEndpoint> connectFuture = client.connect(
2144+
"localhost", serverEndpoint.getPort(), TIMEOUT);
2145+
final ClientSessionEndpoint streamEndpoint = connectFuture.get();
2146+
2147+
final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
2148+
for (int i = 0; i < 5; i++) {
2149+
final HttpRequest request = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
2150+
queue.add(streamEndpoint.execute(
2151+
new BasicRequestProducer(request, null),
2152+
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
2153+
}
2154+
while (!queue.isEmpty()) {
2155+
final Future<Message<HttpResponse, String>> future = queue.remove();
2156+
final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
2157+
Assertions.assertNotNull(result);
2158+
final HttpResponse response = result.getHead();
2159+
Assertions.assertNotNull(response);
2160+
Assertions.assertEquals(200, response.getCode());
2161+
MatcherAssert.assertThat(result.getBody(), Matchers.startsWith("All is well"));
2162+
}
2163+
}
2164+
2165+
@Test
2166+
void testDelayedResponseSubmissionNoResponseBody() throws Exception {
2167+
final Http1TestServer server = resources.server();
2168+
final Http1TestClient client = resources.client();
2169+
2170+
server.register("/hello", () -> new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
2171+
2172+
private final Random random = new Random(System.currentTimeMillis());
2173+
2174+
@Override
2175+
protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
2176+
final HttpRequest request,
2177+
final EntityDetails entityDetails,
2178+
final HttpContext context) throws HttpException {
2179+
return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
2180+
}
2181+
2182+
@Override
2183+
protected void handle(
2184+
final Message<HttpRequest, String> requestMessage,
2185+
final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
2186+
final HttpContext context) throws HttpException, IOException {
2187+
executorResource.getExecutorService().execute(() -> {
2188+
try {
2189+
Thread.sleep(random.nextInt(200));
2190+
responseTrigger.submitResponse(AsyncResponseBuilder.create(200)
2191+
.build(),
2192+
context);
2193+
Thread.sleep(random.nextInt(200));
2194+
} catch (final Exception ignore) {
2195+
// ignore
2196+
}
2197+
});
2198+
2199+
}
2200+
2201+
});
2202+
final InetSocketAddress serverEndpoint = server.start();
2203+
2204+
client.start();
2205+
2206+
final Future<ClientSessionEndpoint> connectFuture = client.connect(
2207+
"localhost", serverEndpoint.getPort(), TIMEOUT);
2208+
final ClientSessionEndpoint streamEndpoint = connectFuture.get();
2209+
2210+
final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
2211+
for (int i = 0; i < 5; i++) {
2212+
final HttpRequest request = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
2213+
queue.add(streamEndpoint.execute(
2214+
new BasicRequestProducer(request, null),
2215+
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
2216+
}
2217+
while (!queue.isEmpty()) {
2218+
final Future<Message<HttpResponse, String>> future = queue.remove();
2219+
final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
2220+
Assertions.assertNotNull(result);
2221+
final HttpResponse response = result.getHead();
2222+
Assertions.assertNotNull(response);
2223+
Assertions.assertEquals(200, response.getCode());
2224+
}
2225+
}
2226+
20982227
}

httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,9 @@ private void commitRequest(final HttpRequest request, final EntityDetails entity
166166

167167
final boolean endStream = entityDetails == null;
168168
if (endStream) {
169-
outputChannel.submit(request, true, FlushMode.IMMEDIATE);
170169
committedRequest = request;
171170
requestState.set(MessageState.COMPLETE);
171+
outputChannel.submit(request, true, FlushMode.IMMEDIATE);
172172
} else {
173173
final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
174174
final boolean expectContinue = h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue());

httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,14 @@ private void commitResponse(
189189
keepAlive = false;
190190
}
191191

192-
outputChannel.submit(response, endStream, endStream ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
193192
if (endStream) {
193+
responseState.set(MessageState.COMPLETE);
194+
outputChannel.submit(response, true, FlushMode.IMMEDIATE);
194195
if (!keepAlive) {
195196
outputChannel.close();
196197
}
197-
responseState.set(MessageState.COMPLETE);
198198
} else {
199+
outputChannel.submit(response, false, FlushMode.BUFFER);
199200
exchangeHandler.produce(internalDataChannel);
200201
if (responseState.compareAndSet(MessageState.IDLE, MessageState.BODY)) {
201202
outputChannel.requestOutput();

0 commit comments

Comments
 (0)