Skip to content

Commit 1bf8e65

Browse files
committed
Bug fix: Async message stream handlers to avoid firing data produce events from multiple threads
1 parent 3010ce8 commit 1bf8e65

File tree

5 files changed

+19
-49
lines changed

5 files changed

+19
-49
lines changed

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,10 @@ private void commitRequest(final HttpRequest request, final EntityDetails entity
153153
if (expectContinue) {
154154
requestState.set(MessageState.ACK);
155155
} else {
156-
requestState.set(MessageState.BODY);
157156
exchangeHandler.produce(dataChannel);
157+
if (requestState.compareAndSet(MessageState.HEADERS, MessageState.BODY)) {
158+
outputChannel.requestOutput();
159+
}
158160
}
159161
}
160162
} else {

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,10 @@ private void commitResponse(
183183
if (endStream) {
184184
responseState.set(MessageState.COMPLETE);
185185
} else {
186-
responseState.set(MessageState.BODY);
187186
exchangeHandler.produce(outputChannel);
187+
if (responseState.compareAndSet(MessageState.IDLE, MessageState.BODY)) {
188+
outputChannel.requestOutput();
189+
}
188190
}
189191
} else {
190192
throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Response already committed");

httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java

Lines changed: 7 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import java.util.concurrent.Executors;
5757
import java.util.concurrent.Future;
5858
import java.util.concurrent.atomic.AtomicReference;
59-
import java.util.concurrent.locks.ReentrantLock;
6059

6160
import org.apache.hc.core5.function.Callback;
6261
import org.apache.hc.core5.http.ConnectionClosedException;
@@ -1005,7 +1004,6 @@ void testDelayedExpectContinueAck() throws Exception {
10051004
private final Random random = new Random(System.currentTimeMillis());
10061005
private final AsyncEntityProducer entityProducer = AsyncEntityProducers.create(
10071006
"All is well");
1008-
private final ReentrantLock lock = new ReentrantLock();
10091007

10101008
@Override
10111009
public void handleRequest(
@@ -1023,12 +1021,7 @@ public void handleRequest(
10231021
responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), context);
10241022
}
10251023
final HttpResponse response = new BasicHttpResponse(200);
1026-
lock.lock();
1027-
try {
1028-
responseChannel.sendResponse(response, entityProducer, context);
1029-
} finally {
1030-
lock.unlock();
1031-
}
1024+
responseChannel.sendResponse(response, entityProducer, context);
10321025
}
10331026
} catch (final Exception ignore) {
10341027
// ignore
@@ -1052,22 +1045,12 @@ public void streamEnd(final List<? extends Header> trailers) throws HttpExceptio
10521045

10531046
@Override
10541047
public int available() {
1055-
lock.lock();
1056-
try {
1057-
return entityProducer.available();
1058-
} finally {
1059-
lock.unlock();
1060-
}
1048+
return entityProducer.available();
10611049
}
10621050

10631051
@Override
10641052
public void produce(final DataStreamChannel channel) throws IOException {
1065-
lock.lock();
1066-
try {
1067-
entityProducer.produce(channel);
1068-
} finally {
1069-
lock.unlock();
1070-
}
1053+
entityProducer.produce(channel);
10711054
}
10721055

10731056
@Override
@@ -2060,19 +2043,13 @@ void testDelayedRequestSubmission() throws Exception {
20602043
new AsyncRequestProducer() {
20612044

20622045
private final Random random = new Random(System.currentTimeMillis());
2063-
private final ReentrantLock lock = new ReentrantLock();
20642046

20652047
@Override
20662048
public void sendRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException {
20672049
executorResource.getExecutorService().execute(() -> {
20682050
try {
20692051
Thread.sleep(random.nextInt(200));
2070-
lock.lock();
2071-
try {
2072-
channel.sendRequest(request, entityProducer, context);
2073-
} finally {
2074-
lock.unlock();
2075-
}
2052+
channel.sendRequest(request, entityProducer, context);
20762053
} catch (final Exception ignore) {
20772054
// ignore
20782055
}
@@ -2081,32 +2058,17 @@ public void sendRequest(final RequestChannel channel, final HttpContext context)
20812058

20822059
@Override
20832060
public boolean isRepeatable() {
2084-
lock.lock();
2085-
try {
2086-
return entityProducer.isRepeatable();
2087-
} finally {
2088-
lock.unlock();
2089-
}
2061+
return entityProducer.isRepeatable();
20902062
}
20912063

20922064
@Override
20932065
public int available() {
2094-
lock.lock();
2095-
try {
2096-
return entityProducer.available();
2097-
} finally {
2098-
lock.unlock();
2099-
}
2066+
return entityProducer.available();
21002067
}
21012068

21022069
@Override
21032070
public void produce(final DataStreamChannel channel) throws IOException {
2104-
lock.lock();
2105-
try {
2106-
entityProducer.produce(channel);
2107-
} finally {
2108-
lock.unlock();
2109-
}
2071+
entityProducer.produce(channel);
21102072
}
21112073

21122074
@Override

httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,10 @@ private void commitRequest(final HttpRequest request, final EntityDetails entity
180180
final Timeout timeout = http1Config.getWaitForContinueTimeout() != null ? http1Config.getWaitForContinueTimeout() : DEFAULT_WAIT_FOR_CONTINUE;
181181
outputChannel.setSocketTimeout(timeout);
182182
} else {
183-
requestState.set(MessageState.BODY);
184183
exchangeHandler.produce(internalDataChannel);
184+
if (requestState.compareAndSet(MessageState.HEADERS, MessageState.BODY)) {
185+
outputChannel.requestOutput();
186+
}
185187
}
186188
}
187189
} else {

httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,10 @@ private void commitResponse(
196196
}
197197
responseState.set(MessageState.COMPLETE);
198198
} else {
199-
responseState.set(MessageState.BODY);
200199
exchangeHandler.produce(internalDataChannel);
200+
if (responseState.compareAndSet(MessageState.IDLE, MessageState.BODY)) {
201+
outputChannel.requestOutput();
202+
}
201203
}
202204
} else {
203205
throw new HttpException("Response already committed");

0 commit comments

Comments
 (0)