Skip to content

Commit 3010ce8

Browse files
committed
Use atomic refs to keep request and response state in async message stream handlers
1 parent 50749b8 commit 3010ce8

File tree

4 files changed

+104
-105
lines changed

4 files changed

+104
-105
lines changed

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamHandler.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.nio.ByteBuffer;
3131
import java.util.List;
3232
import java.util.concurrent.atomic.AtomicBoolean;
33+
import java.util.concurrent.atomic.AtomicReference;
3334

3435
import org.apache.hc.core5.http.EntityDetails;
3536
import org.apache.hc.core5.http.Header;
@@ -65,13 +66,12 @@ class ClientH2StreamHandler implements H2StreamHandler {
6566
private final AsyncClientExchangeHandler exchangeHandler;
6667
private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
6768
private final HttpCoreContext context;
69+
private final AtomicReference<MessageState> requestState;
70+
private final AtomicReference<MessageState> responseState;
6871
private final AtomicBoolean requestCommitted;
6972
private final AtomicBoolean failed;
7073
private final AtomicBoolean done;
7174

72-
private volatile MessageState requestState;
73-
private volatile MessageState responseState;
74-
7575
ClientH2StreamHandler(
7676
final H2StreamChannel outputChannel,
7777
final HttpProcessor httpProcessor,
@@ -95,13 +95,13 @@ public int write(final ByteBuffer src) throws IOException {
9595
@Override
9696
public void endStream(final List<? extends Header> trailers) throws IOException {
9797
outputChannel.endStream(trailers);
98-
requestState = MessageState.COMPLETE;
98+
requestState.set(MessageState.COMPLETE);
9999
}
100100

101101
@Override
102102
public void endStream() throws IOException {
103103
outputChannel.endStream();
104-
requestState = MessageState.COMPLETE;
104+
requestState.set(MessageState.COMPLETE);
105105
}
106106

107107
};
@@ -113,8 +113,8 @@ public void endStream() throws IOException {
113113
this.requestCommitted = new AtomicBoolean();
114114
this.failed = new AtomicBoolean();
115115
this.done = new AtomicBoolean();
116-
this.requestState = MessageState.HEADERS;
117-
this.responseState = MessageState.HEADERS;
116+
this.requestState = new AtomicReference<>(MessageState.HEADERS);
117+
this.responseState = new AtomicReference<>(MessageState.HEADERS);
118118
}
119119

120120
@Override
@@ -124,7 +124,7 @@ public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
124124

125125
@Override
126126
public boolean isOutputReady() {
127-
switch (requestState) {
127+
switch (requestState.get()) {
128128
case HEADERS:
129129
return true;
130130
case BODY:
@@ -146,14 +146,14 @@ private void commitRequest(final HttpRequest request, final EntityDetails entity
146146
connMetrics.incrementRequestCount();
147147

148148
if (entityDetails == null) {
149-
requestState = MessageState.COMPLETE;
149+
requestState.set(MessageState.COMPLETE);
150150
} else {
151151
final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
152152
final boolean expectContinue = h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue());
153153
if (expectContinue) {
154-
requestState = MessageState.ACK;
154+
requestState.set(MessageState.ACK);
155155
} else {
156-
requestState = MessageState.BODY;
156+
requestState.set(MessageState.BODY);
157157
exchangeHandler.produce(dataChannel);
158158
}
159159
}
@@ -164,7 +164,7 @@ private void commitRequest(final HttpRequest request, final EntityDetails entity
164164

165165
@Override
166166
public void produceOutput() throws HttpException, IOException {
167-
switch (requestState) {
167+
switch (requestState.get()) {
168168
case HEADERS:
169169
exchangeHandler.produceRequest((request, entityDetails, httpContext) -> commitRequest(request, entityDetails), context);
170170
break;
@@ -184,7 +184,7 @@ public void consumeHeader(final List<Header> headers, final boolean endStream) t
184184
if (done.get()) {
185185
throw new ProtocolException("Unexpected message headers");
186186
}
187-
switch (responseState) {
187+
switch (responseState.get()) {
188188
case HEADERS:
189189
final HttpResponse response = DefaultH2ResponseConverter.INSTANCE.convert(headers);
190190
final int status = response.getCode();
@@ -194,9 +194,9 @@ public void consumeHeader(final List<Header> headers, final boolean endStream) t
194194
if (status > HttpStatus.SC_CONTINUE && status < HttpStatus.SC_SUCCESS) {
195195
exchangeHandler.consumeInformation(response, context);
196196
}
197-
if (requestState == MessageState.ACK) {
197+
if (requestState.get() == MessageState.ACK) {
198198
if (status == HttpStatus.SC_CONTINUE || status >= HttpStatus.SC_SUCCESS) {
199-
requestState = MessageState.BODY;
199+
requestState.set(MessageState.BODY);
200200
exchangeHandler.produce(dataChannel);
201201
}
202202
}
@@ -210,10 +210,10 @@ public void consumeHeader(final List<Header> headers, final boolean endStream) t
210210
connMetrics.incrementResponseCount();
211211

212212
exchangeHandler.consumeResponse(response, entityDetails, context);
213-
responseState = endStream ? MessageState.COMPLETE : MessageState.BODY;
213+
responseState.set(endStream ? MessageState.COMPLETE : MessageState.BODY);
214214
break;
215215
case BODY:
216-
responseState = MessageState.COMPLETE;
216+
responseState.set(MessageState.COMPLETE);
217217
exchangeHandler.streamEnd(headers);
218218
break;
219219
default:
@@ -228,14 +228,14 @@ public void updateInputCapacity() throws IOException {
228228

229229
@Override
230230
public void consumeData(final ByteBuffer src, final boolean endStream) throws HttpException, IOException {
231-
if (done.get() || responseState != MessageState.BODY) {
231+
if (done.get() || responseState.get() != MessageState.BODY) {
232232
throw new ProtocolException("Unexpected message data");
233233
}
234234
if (src != null) {
235235
exchangeHandler.consume(src);
236236
}
237237
if (endStream) {
238-
responseState = MessageState.COMPLETE;
238+
responseState.set(MessageState.COMPLETE);
239239
exchangeHandler.streamEnd(null);
240240
}
241241
}
@@ -261,17 +261,17 @@ public void failed(final Exception cause) {
261261
@Override
262262
public void releaseResources() {
263263
if (done.compareAndSet(false, true)) {
264-
responseState = MessageState.COMPLETE;
265-
requestState = MessageState.COMPLETE;
264+
responseState.set(MessageState.COMPLETE);
265+
requestState.set(MessageState.COMPLETE);
266266
exchangeHandler.releaseResources();
267267
}
268268
}
269269

270270
@Override
271271
public String toString() {
272272
return "[" +
273-
"requestState=" + requestState +
274-
", responseState=" + responseState +
273+
"requestState=" + requestState.get() +
274+
", responseState=" + responseState.get() +
275275
']';
276276
}
277277

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamHandler.java

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.nio.ByteBuffer;
3131
import java.util.List;
3232
import java.util.concurrent.atomic.AtomicBoolean;
33+
import java.util.concurrent.atomic.AtomicReference;
3334

3435
import org.apache.hc.core5.http.EntityDetails;
3536
import org.apache.hc.core5.http.Header;
@@ -72,14 +73,14 @@ class ServerH2StreamHandler implements H2StreamHandler {
7273
private final BasicHttpConnectionMetrics connMetrics;
7374
private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
7475
private final HttpCoreContext context;
76+
private final AtomicReference<MessageState> requestState;
77+
private final AtomicReference<MessageState> responseState;
7578
private final AtomicBoolean responseCommitted;
7679
private final AtomicBoolean failed;
7780
private final AtomicBoolean done;
7881

7982
private volatile AsyncServerExchangeHandler exchangeHandler;
8083
private volatile HttpRequest receivedRequest;
81-
private volatile MessageState requestState;
82-
private volatile MessageState responseState;
8384

8485
ServerH2StreamHandler(
8586
final H2StreamChannel outputChannel,
@@ -103,13 +104,13 @@ public int write(final ByteBuffer src) throws IOException {
103104
@Override
104105
public void endStream(final List<? extends Header> trailers) throws IOException {
105106
outputChannel.endStream(trailers);
106-
responseState = MessageState.COMPLETE;
107+
responseState.set(MessageState.COMPLETE);
107108
}
108109

109110
@Override
110111
public void endStream() throws IOException {
111112
outputChannel.endStream();
112-
responseState = MessageState.COMPLETE;
113+
responseState.set(MessageState.COMPLETE);
113114
}
114115

115116
};
@@ -140,8 +141,8 @@ public void pushPromise(
140141
this.responseCommitted = new AtomicBoolean();
141142
this.failed = new AtomicBoolean();
142143
this.done = new AtomicBoolean();
143-
this.requestState = MessageState.HEADERS;
144-
this.responseState = MessageState.IDLE;
144+
this.requestState = new AtomicReference<>(MessageState.HEADERS);
145+
this.responseState = new AtomicReference<>(MessageState.IDLE);
145146
}
146147

147148
@Override
@@ -180,9 +181,9 @@ private void commitResponse(
180181
outputChannel.submit(responseHeaders, endStream);
181182
connMetrics.incrementResponseCount();
182183
if (endStream) {
183-
responseState = MessageState.COMPLETE;
184+
responseState.set(MessageState.COMPLETE);
184185
} else {
185-
responseState = MessageState.BODY;
186+
responseState.set(MessageState.BODY);
186187
exchangeHandler.produce(outputChannel);
187188
}
188189
} else {
@@ -211,9 +212,9 @@ public void consumeHeader(final List<Header> headers, final boolean endStream) t
211212
if (done.get()) {
212213
throw new ProtocolException("Unexpected message headers");
213214
}
214-
switch (requestState) {
215+
switch (requestState.get()) {
215216
case HEADERS:
216-
requestState = endStream ? MessageState.COMPLETE : MessageState.BODY;
217+
requestState.set(endStream ? MessageState.COMPLETE : MessageState.BODY);
217218

218219
final HttpRequest request = DefaultH2RequestConverter.INSTANCE.convert(headers);
219220
final EntityDetails requestEntityDetails = endStream ? null : new IncomingEntityDetails(request, -1);
@@ -251,7 +252,7 @@ public void consumeHeader(final List<Header> headers, final boolean endStream) t
251252
}
252253
break;
253254
case BODY:
254-
responseState = MessageState.COMPLETE;
255+
responseState.set(MessageState.COMPLETE);
255256
exchangeHandler.streamEnd(headers);
256257
break;
257258
default:
@@ -267,27 +268,27 @@ public void updateInputCapacity() throws IOException {
267268

268269
@Override
269270
public void consumeData(final ByteBuffer src, final boolean endStream) throws HttpException, IOException {
270-
if (done.get() || requestState != MessageState.BODY) {
271+
if (done.get() || requestState.get() != MessageState.BODY) {
271272
throw new ProtocolException("Unexpected message data");
272273
}
273274
Asserts.notNull(exchangeHandler, "Exchange handler");
274275
if (src != null) {
275276
exchangeHandler.consume(src);
276277
}
277278
if (endStream) {
278-
requestState = MessageState.COMPLETE;
279+
requestState.set(MessageState.COMPLETE);
279280
exchangeHandler.streamEnd(null);
280281
}
281282
}
282283

283284
@Override
284285
public boolean isOutputReady() {
285-
return responseState == MessageState.BODY && exchangeHandler != null && exchangeHandler.available() > 0;
286+
return responseState.get() == MessageState.BODY && exchangeHandler != null && exchangeHandler.available() > 0;
286287
}
287288

288289
@Override
289290
public void produceOutput() throws HttpException, IOException {
290-
if (responseState == MessageState.BODY) {
291+
if (responseState.get() == MessageState.BODY) {
291292
Asserts.notNull(exchangeHandler, "Exchange handler");
292293
exchangeHandler.produce(dataChannel);
293294
}
@@ -298,9 +299,9 @@ public void handle(final HttpException ex, final boolean endStream) throws HttpE
298299
if (done.get()) {
299300
throw ex;
300301
}
301-
switch (requestState) {
302+
switch (requestState.get()) {
302303
case HEADERS:
303-
requestState = endStream ? MessageState.COMPLETE : MessageState.BODY;
304+
requestState.set(endStream ? MessageState.COMPLETE : MessageState.BODY);
304305
if (!responseCommitted.get()) {
305306
final AsyncResponseProducer responseProducer = new BasicResponseProducer(
306307
ServerSupport.toStatusCode(ex),
@@ -312,7 +313,7 @@ public void handle(final HttpException ex, final boolean endStream) throws HttpE
312313
}
313314
break;
314315
case BODY:
315-
responseState = MessageState.COMPLETE;
316+
responseState.set(MessageState.COMPLETE);
316317
default:
317318
throw ex;
318319
}
@@ -334,8 +335,8 @@ public void failed(final Exception cause) {
334335
@Override
335336
public void releaseResources() {
336337
if (done.compareAndSet(false, true)) {
337-
requestState = MessageState.COMPLETE;
338-
responseState = MessageState.COMPLETE;
338+
requestState.set(MessageState.COMPLETE);
339+
responseState.set(MessageState.COMPLETE);
339340
if (exchangeHandler != null) {
340341
exchangeHandler.releaseResources();
341342
}
@@ -345,8 +346,8 @@ public void releaseResources() {
345346
@Override
346347
public String toString() {
347348
return "[" +
348-
"requestState=" + requestState +
349-
", responseState=" + responseState +
349+
"requestState=" + requestState.get() +
350+
", responseState=" + responseState.get() +
350351
']';
351352
}
352353

0 commit comments

Comments
 (0)