Skip to content

Commit bbaa587

Browse files
authored
Netty Expect 100-Continue fix (#5847)
Signed-off-by: Maxim Nesen <[email protected]>
1 parent 85e6620 commit bbaa587

File tree

10 files changed

+963
-212
lines changed

10 files changed

+963
-212
lines changed
Lines changed: 96 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2023, 2025 Oracle and/or its affiliates. All rights reserved.
33
*
44
* This program and the accompanying materials are made available under the
55
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -16,111 +16,133 @@
1616

1717
package org.glassfish.jersey.netty.connector;
1818

19-
import io.netty.channel.Channel;
20-
import io.netty.channel.ChannelFuture;
2119
import io.netty.channel.ChannelHandlerContext;
2220
import io.netty.channel.ChannelInboundHandlerAdapter;
23-
import io.netty.handler.codec.http.DefaultFullHttpRequest;
24-
import io.netty.handler.codec.http.HttpHeaderNames;
25-
import io.netty.handler.codec.http.HttpRequest;
21+
import io.netty.handler.codec.http.FullHttpMessage;
2622
import io.netty.handler.codec.http.HttpResponse;
2723
import io.netty.handler.codec.http.HttpResponseStatus;
28-
import io.netty.handler.codec.http.HttpUtil;
29-
import org.glassfish.jersey.client.ClientRequest;
24+
import io.netty.handler.codec.http.LastHttpContent;
3025

3126
import javax.ws.rs.ProcessingException;
27+
import java.io.IOException;
28+
import java.util.ArrayList;
3229
import java.util.Arrays;
3330
import java.util.List;
34-
import java.util.concurrent.CompletableFuture;
35-
import java.util.concurrent.ExecutionException;
36-
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.CountDownLatch;
3732
import java.util.concurrent.TimeoutException;
3833

3934
public class JerseyExpectContinueHandler extends ChannelInboundHandlerAdapter {
4035

41-
private boolean isExpected;
36+
private ExpectationState currentState = ExpectationState.IDLE;
4237

43-
private static final List<HttpResponseStatus> statusesToBeConsidered = Arrays.asList(HttpResponseStatus.CONTINUE,
44-
HttpResponseStatus.UNAUTHORIZED, HttpResponseStatus.EXPECTATION_FAILED,
45-
HttpResponseStatus.METHOD_NOT_ALLOWED, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE);
38+
private static final List<HttpResponseStatus> finalErrorStatuses = Arrays.asList(HttpResponseStatus.UNAUTHORIZED,
39+
HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE);
40+
private static final List<HttpResponseStatus> reSendErrorStatuses = Arrays.asList(
41+
HttpResponseStatus.METHOD_NOT_ALLOWED,
42+
HttpResponseStatus.EXPECTATION_FAILED);
4643

47-
private CompletableFuture<HttpResponseStatus> expectedFuture = new CompletableFuture<>();
44+
private static final List<HttpResponseStatus> statusesToBeConsidered = new ArrayList<>(reSendErrorStatuses);
45+
46+
static {
47+
statusesToBeConsidered.addAll(finalErrorStatuses);
48+
statusesToBeConsidered.add(HttpResponseStatus.CONTINUE);
49+
}
50+
51+
private HttpResponseStatus status = null;
52+
53+
private CountDownLatch latch = null;
54+
55+
private boolean propagateLastMessage = false;
4856

4957
@Override
5058
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
51-
if (isExpected && msg instanceof HttpResponse) {
52-
final HttpResponse response = (HttpResponse) msg;
53-
if (statusesToBeConsidered.contains(response.status())) {
54-
expectedFuture.complete(response.status());
55-
}
56-
if (!HttpResponseStatus.CONTINUE.equals(response.status())) {
59+
60+
if (checkExpectResponse(msg) || checkInvalidExpect(msg)) {
61+
currentState = ExpectationState.AWAITING;
62+
}
63+
switch (currentState) {
64+
case AWAITING:
65+
final HttpResponse response = (HttpResponse) msg;
66+
status = response.status();
67+
boolean handshakeDone = processErrorStatuses(status) || msg instanceof FullHttpMessage;
68+
currentState = (handshakeDone) ? ExpectationState.IDLE : ExpectationState.FINISHING;
69+
processLatch();
70+
return;
71+
case FINISHING:
72+
if (msg instanceof LastHttpContent) {
73+
currentState = ExpectationState.IDLE;
74+
if (propagateLastMessage) {
75+
propagateLastMessage = false;
76+
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
77+
}
78+
}
79+
return;
80+
default:
5781
ctx.fireChannelRead(msg); //bypass the message to the next handler in line
58-
} else {
59-
ctx.pipeline().remove(JerseyExpectContinueHandler.class);
60-
}
61-
} else {
62-
if (!isExpected) {
63-
ctx.pipeline().remove(JerseyExpectContinueHandler.class);
64-
}
65-
ctx.fireChannelRead(msg); //bypass the message to the next handler in line
6682
}
6783
}
6884

69-
CompletableFuture<HttpResponseStatus> processExpect100ContinueRequest(HttpRequest nettyRequest,
70-
ClientRequest jerseyRequest,
71-
Channel ch,
72-
Integer timeout)
73-
throws InterruptedException, ExecutionException, TimeoutException {
74-
//check for 100-Continue presence/availability
75-
final Expect100ContinueConnectorExtension expect100ContinueExtension
76-
= new Expect100ContinueConnectorExtension();
77-
78-
final DefaultFullHttpRequest nettyRequestHeaders =
79-
new DefaultFullHttpRequest(nettyRequest.protocolVersion(), nettyRequest.method(), nettyRequest.uri());
80-
nettyRequestHeaders.headers().setAll(nettyRequest.headers());
81-
82-
if (!nettyRequestHeaders.headers().contains(HttpHeaderNames.HOST)) {
83-
nettyRequestHeaders.headers().add(HttpHeaderNames.HOST, jerseyRequest.getUri().getHost());
85+
private boolean checkExpectResponse(Object msg) {
86+
if (currentState == ExpectationState.IDLE && latch != null && msg instanceof HttpResponse) {
87+
return statusesToBeConsidered.contains(((HttpResponse) msg).status());
8488
}
89+
return false;
90+
}
91+
92+
private boolean checkInvalidExpect(Object msg) {
93+
return (ExpectationState.IDLE.equals(currentState)
94+
&& msg instanceof HttpResponse
95+
&& (HttpResponseStatus.CONTINUE.equals(((HttpResponse) msg).status())
96+
|| reSendErrorStatuses.contains(((HttpResponse) msg).status()))
97+
);
98+
}
8599

86-
//If Expect:100-continue feature is enabled and client supports it, the nettyRequestHeaders will be
87-
//enriched with the 'Expect:100-continue' header.
88-
expect100ContinueExtension.invoke(jerseyRequest, nettyRequestHeaders);
89-
90-
final ChannelFuture expect100ContinueFuture = (HttpUtil.is100ContinueExpected(nettyRequestHeaders))
91-
// Send only head of the HTTP request enriched with Expect:100-continue header.
92-
? ch.writeAndFlush(nettyRequestHeaders)
93-
// Expect:100-Continue either is not supported or is turned off
94-
: null;
95-
isExpected = expect100ContinueFuture != null;
96-
if (!isExpected) {
97-
ch.pipeline().remove(JerseyExpectContinueHandler.class);
98-
} else {
99-
final HttpResponseStatus status = expectedFuture
100-
.get(timeout, TimeUnit.MILLISECONDS);
101-
102-
processExpectationStatus(status);
100+
boolean processErrorStatuses(HttpResponseStatus status) {
101+
if (reSendErrorStatuses.contains(status)) {
102+
propagateLastMessage = true;
103103
}
104-
return expectedFuture;
104+
return (finalErrorStatuses.contains(status));
105105
}
106106

107-
private void processExpectationStatus(HttpResponseStatus status)
108-
throws TimeoutException {
107+
void processExpectationStatus()
108+
throws TimeoutException, IOException {
109+
if (status == null) {
110+
throw new TimeoutException(); // continue without expectations
111+
}
109112
if (!statusesToBeConsidered.contains(status)) {
110113
throw new ProcessingException(LocalizationMessages
111114
.UNEXPECTED_VALUE_FOR_EXPECT_100_CONTINUE_STATUSES(status.code()), null);
112115
}
113-
if (!expectedFuture.isDone() || HttpResponseStatus.EXPECTATION_FAILED.equals(status)) {
114-
isExpected = false;
115-
throw new TimeoutException(); // continue without expectations
116+
117+
if (finalErrorStatuses.contains(status)) {
118+
throw new IOException(LocalizationMessages
119+
.EXPECT_100_CONTINUE_FAILED_REQUEST_FAILED(), null);
116120
}
117-
if (!HttpResponseStatus.CONTINUE.equals(status)) {
118-
throw new ProcessingException(LocalizationMessages
119-
.UNEXPECTED_VALUE_FOR_EXPECT_100_CONTINUE_STATUSES(status.code()), null);
121+
122+
if (reSendErrorStatuses.contains(status)) {
123+
throw new TimeoutException(LocalizationMessages
124+
.EXPECT_100_CONTINUE_FAILED_REQUEST_SHOULD_BE_RESENT()); // Re-send request without expectations
125+
}
126+
127+
}
128+
129+
void resetHandler() {
130+
latch = null;
131+
}
132+
133+
void attachCountDownLatch(CountDownLatch latch) {
134+
this.latch = latch;
135+
}
136+
137+
private void processLatch() {
138+
if (latch != null) {
139+
latch.countDown();
120140
}
121141
}
122142

123-
boolean isExpected() {
124-
return isExpected;
143+
private enum ExpectationState {
144+
AWAITING,
145+
FINISHING,
146+
IDLE
125147
}
126-
}
148+
}

connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717
package org.glassfish.jersey.netty.connector;
1818

1919
import java.io.IOException;
20-
import java.io.InterruptedIOException;
2120
import java.io.OutputStream;
22-
import java.io.OutputStreamWriter;
2321
import java.net.InetSocketAddress;
2422
import java.net.SocketAddress;
2523
import java.net.URI;
@@ -34,7 +32,6 @@
3432
import java.util.concurrent.CompletableFuture;
3533
import java.util.concurrent.CompletionException;
3634
import java.util.concurrent.CountDownLatch;
37-
import java.util.concurrent.ExecutionException;
3835
import java.util.concurrent.ExecutorService;
3936
import java.util.concurrent.Future;
4037
import java.util.concurrent.TimeUnit;
@@ -69,6 +66,7 @@
6966
import io.netty.handler.codec.http.HttpRequest;
7067
import io.netty.handler.codec.http.HttpUtil;
7168
import io.netty.handler.codec.http.HttpVersion;
69+
import io.netty.handler.codec.http.LastHttpContent;
7270
import io.netty.handler.proxy.HttpProxyHandler;
7371
import io.netty.handler.proxy.ProxyHandler;
7472
import io.netty.handler.ssl.ApplicationProtocolConfig;
@@ -256,6 +254,8 @@ protected void execute(final ClientRequest jerseyRequest, final Set<URI> redirec
256254
}
257255
}
258256

257+
final JerseyExpectContinueHandler expect100ContinueHandler = new JerseyExpectContinueHandler();
258+
259259
if (chan == null) {
260260
Integer connectTimeout = jerseyRequest.resolveProperty(ClientProperties.CONNECT_TIMEOUT, 0);
261261
Bootstrap b = new Bootstrap();
@@ -328,8 +328,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
328328
final Integer maxInitialLineLength = ClientProperties.getValue(config.getProperties(),
329329
NettyClientProperties.MAX_INITIAL_LINE_LENGTH,
330330
NettyClientProperties.DEFAULT_INITIAL_LINE_LENGTH);
331-
332331
p.addLast(new HttpClientCodec(maxInitialLineLength, maxHeaderSize, maxChunkSize));
332+
p.addLast(EXPECT_100_CONTINUE_HANDLER, expect100ContinueHandler);
333333
p.addLast(new ChunkedWriteHandler());
334334
p.addLast(new HttpContentDecompressor());
335335
}
@@ -358,11 +358,10 @@ protected void initChannel(SocketChannel ch) throws Exception {
358358
final Channel ch = chan;
359359
JerseyClientHandler clientHandler =
360360
new JerseyClientHandler(jerseyRequest, responseAvailable, responseDone, redirectUriHistory, this);
361-
final JerseyExpectContinueHandler expect100ContinueHandler = new JerseyExpectContinueHandler();
361+
362362
// read timeout makes sense really as an inactivity timeout
363363
ch.pipeline().addLast(READ_TIMEOUT_HANDLER,
364364
new IdleStateHandler(0, 0, timeout, TimeUnit.MILLISECONDS));
365-
ch.pipeline().addLast(EXPECT_100_CONTINUE_HANDLER, expect100ContinueHandler);
366365
ch.pipeline().addLast(REQUEST_HANDLER, clientHandler);
367366

368367
responseDone.whenComplete((_r, th) -> {
@@ -445,22 +444,11 @@ public void operationComplete(io.netty.util.concurrent.Future<? super Void> futu
445444
// // Set later after the entity is "written"
446445
// break;
447446
}
448-
try {
449-
expect100ContinueHandler.processExpect100ContinueRequest(nettyRequest, jerseyRequest,
450-
ch, expect100ContinueTimeout);
451-
} catch (ExecutionException e) {
452-
responseDone.completeExceptionally(e);
453-
} catch (TimeoutException e) {
454-
//Expect:100-continue allows timeouts by the spec
455-
//just removing the pipeline from processing
456-
if (ch.pipeline().context(JerseyExpectContinueHandler.class) != null) {
457-
ch.pipeline().remove(EXPECT_100_CONTINUE_HANDLER);
458-
}
459-
}
460447

461448
final CountDownLatch headersSet = new CountDownLatch(1);
462449
final CountDownLatch contentLengthSet = new CountDownLatch(1);
463450

451+
464452
jerseyRequest.setStreamProvider(new OutboundMessageContext.StreamProvider() {
465453
@Override
466454
public OutputStream getOutputStream(int contentLength) throws IOException {
@@ -485,7 +473,6 @@ public void run() {
485473

486474
try {
487475
jerseyRequest.writeEntity();
488-
489476
if (entityWriter.getType() == NettyEntityWriter.Type.DELAYED) {
490477
nettyRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, entityWriter.getLength());
491478
contentLengthSet.countDown();
@@ -505,12 +492,36 @@ public void run() {
505492
});
506493

507494
headersSet.await();
508-
if (!expect100ContinueHandler.isExpected()) {
509-
// Send the HTTP request. Expect:100-continue processing is not applicable
510-
// in this case.
495+
new Expect100ContinueConnectorExtension().invoke(jerseyRequest, nettyRequest);
496+
497+
boolean continueExpected = HttpUtil.is100ContinueExpected(nettyRequest);
498+
boolean expectationsFailed = false;
499+
500+
if (continueExpected) {
501+
final CountDownLatch expect100ContinueLatch = new CountDownLatch(1);
502+
expect100ContinueHandler.attachCountDownLatch(expect100ContinueLatch);
503+
//send expect request, sync and wait till either response or timeout received
511504
entityWriter.writeAndFlush(nettyRequest);
505+
expect100ContinueLatch.await(expect100ContinueTimeout, TimeUnit.MILLISECONDS);
506+
try {
507+
expect100ContinueHandler.processExpectationStatus();
508+
} catch (TimeoutException e) {
509+
//Expect:100-continue allows timeouts by the spec
510+
//so, send request directly without Expect header.
511+
expectationsFailed = true;
512+
} finally {
513+
//restore request and handler to the original state.
514+
HttpUtil.set100ContinueExpected(nettyRequest, false);
515+
expect100ContinueHandler.resetHandler();
516+
}
512517
}
513518

519+
if (!continueExpected || expectationsFailed) {
520+
if (expectationsFailed) {
521+
ch.pipeline().writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).sync();
522+
}
523+
entityWriter.writeAndFlush(nettyRequest);
524+
}
514525
if (HttpUtil.isTransferEncodingChunked(nettyRequest)) {
515526
entityWriter.write(new HttpChunkedInput(entityWriter.getChunkedInput()));
516527
} else {

connectors/netty-connector/src/main/resources/org/glassfish/jersey/netty/connector/localization.properties

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright (c) 2016, 2023 Oracle and/or its affiliates. All rights reserved.
2+
# Copyright (c) 2016, 2025 Oracle and/or its affiliates. All rights reserved.
33
#
44
# This program and the accompanying materials are made available under the
55
# terms of the Eclipse Public License v. 2.0, which is available at
@@ -23,3 +23,5 @@ redirect.error.determining.location="Error determining redirect location: ({0}).
2323
redirect.infinite.loop="Infinite loop in chained redirects detected."
2424
redirect.limit.reached="Max chained redirect limit ({0}) exceeded."
2525
unexpected.value.for.expect.100.continue.statuses=Unexpected value: ("{0}").
26+
expect.100.continue.failed.request.should.be.resent=Expect 100-continue failed. Request should be resent.
27+
expect.100.continue.failed.request.failed=Expect 100-continue failed. Request failed.

0 commit comments

Comments
 (0)