Skip to content

Commit 9ced839

Browse files
committed
wip
1 parent f1e3058 commit 9ced839

File tree

18 files changed

+240
-187
lines changed

18 files changed

+240
-187
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,11 @@ public String getName() {
920920
return ROUTE;
921921
}
922922

923+
@Override
924+
public boolean supportsContentStream() {
925+
return true;
926+
}
927+
923928
@Override
924929
public List<Route> routes() {
925930
return List.of(new Route(RestRequest.Method.POST, ROUTE));

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java

Lines changed: 0 additions & 59 deletions
This file was deleted.

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.netty.handler.codec.http.DefaultHttpContent;
2020
import io.netty.handler.codec.http.DefaultHttpResponse;
2121
import io.netty.handler.codec.http.DefaultLastHttpContent;
22-
import io.netty.handler.codec.http.FullHttpRequest;
2322
import io.netty.handler.codec.http.HttpContent;
2423
import io.netty.handler.codec.http.HttpObject;
2524
import io.netty.handler.codec.http.HttpRequest;
@@ -131,18 +130,13 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
131130
} else {
132131
nonError = (Exception) cause;
133132
}
134-
netty4HttpRequest = new Netty4HttpRequest(readSequence++, (FullHttpRequest) request, nonError);
133+
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, nonError);
135134
} else {
136135
assert currentRequestStream == null : "current stream must be null for new request";
137-
if (request instanceof FullHttpRequest fullHttpRequest) {
138-
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
139-
currentRequestStream = null;
140-
} else {
141-
var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext());
142-
currentRequestStream = contentStream;
143-
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
144-
shouldRead = false;
145-
}
136+
var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext());
137+
currentRequestStream = contentStream;
138+
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
139+
shouldRead = false;
146140
}
147141
handlePipelinedRequest(ctx, netty4HttpRequest);
148142
} else {

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java

Lines changed: 43 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@
99

1010
package org.elasticsearch.http.netty4;
1111

12-
import io.netty.buffer.Unpooled;
13-
import io.netty.handler.codec.http.DefaultFullHttpRequest;
14-
import io.netty.handler.codec.http.EmptyHttpHeaders;
15-
import io.netty.handler.codec.http.FullHttpRequest;
12+
import io.netty.handler.codec.http.DefaultHttpRequest;
1613
import io.netty.handler.codec.http.HttpHeaderNames;
1714
import io.netty.handler.codec.http.HttpHeaders;
1815
import io.netty.handler.codec.http.HttpMethod;
16+
import io.netty.handler.codec.http.HttpUtil;
1917
import io.netty.handler.codec.http.QueryStringDecoder;
2018
import io.netty.handler.codec.http.cookie.Cookie;
2119
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
@@ -28,7 +26,6 @@
2826
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
2927
import org.elasticsearch.rest.RestRequest;
3028
import org.elasticsearch.rest.RestStatus;
31-
import org.elasticsearch.transport.netty4.Netty4Utils;
3229

3330
import java.util.AbstractMap;
3431
import java.util.Collection;
@@ -41,71 +38,46 @@
4138

4239
public class Netty4HttpRequest implements HttpRequest {
4340

44-
private final FullHttpRequest request;
45-
private final HttpBody content;
41+
private final int sequence;
42+
private final io.netty.handler.codec.http.HttpRequest nettyRequest;
43+
private HttpBody content;
4644
private final Map<String, List<String>> headers;
4745
private final AtomicBoolean released;
4846
private final Exception inboundException;
49-
private final boolean pooled;
50-
private final int sequence;
5147
private final QueryStringDecoder queryStringDecoder;
5248

53-
Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest request, Netty4HttpRequestBodyStream contentStream) {
54-
this(
55-
sequence,
56-
new DefaultFullHttpRequest(
57-
request.protocolVersion(),
58-
request.method(),
59-
request.uri(),
60-
Unpooled.EMPTY_BUFFER,
61-
request.headers(),
62-
EmptyHttpHeaders.INSTANCE
63-
),
64-
new AtomicBoolean(false),
65-
true,
66-
contentStream,
67-
null
68-
);
69-
}
70-
71-
Netty4HttpRequest(int sequence, FullHttpRequest request) {
72-
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()));
73-
}
74-
75-
Netty4HttpRequest(int sequence, FullHttpRequest request, Exception inboundException) {
76-
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()), inboundException);
49+
public Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest nettyRequest, Exception exception) {
50+
this(sequence, nettyRequest, HttpBody.empty(), new AtomicBoolean(false), exception);
7751
}
7852

79-
private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, HttpBody content) {
80-
this(sequence, request, released, pooled, content, null);
53+
public Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest nettyRequest, HttpBody content) {
54+
this(sequence, nettyRequest, content, new AtomicBoolean(false), null);
8155
}
8256

83-
private Netty4HttpRequest(
57+
public Netty4HttpRequest(
8458
int sequence,
85-
FullHttpRequest request,
86-
AtomicBoolean released,
87-
boolean pooled,
59+
io.netty.handler.codec.http.HttpRequest nettyRequest,
8860
HttpBody content,
61+
AtomicBoolean released,
8962
Exception inboundException
9063
) {
9164
this.sequence = sequence;
92-
this.request = request;
93-
this.headers = getHttpHeadersAsMap(request.headers());
65+
this.nettyRequest = nettyRequest;
9466
this.content = content;
95-
this.pooled = pooled;
67+
this.headers = getHttpHeadersAsMap(nettyRequest.headers());
9668
this.released = released;
9769
this.inboundException = inboundException;
98-
this.queryStringDecoder = new QueryStringDecoder(request.uri());
70+
this.queryStringDecoder = new QueryStringDecoder(nettyRequest.uri());
9971
}
10072

10173
@Override
10274
public RestRequest.Method method() {
103-
return translateRequestMethod(request.method());
75+
return translateRequestMethod(nettyRequest.method());
10476
}
10577

10678
@Override
10779
public String uri() {
108-
return request.uri();
80+
return nettyRequest.uri();
10981
}
11082

11183
@Override
@@ -119,10 +91,18 @@ public HttpBody body() {
11991
return content;
12092
}
12193

94+
@Override
95+
public void setBody(HttpBody body) {
96+
if (body.isFull()) {
97+
var contentLength = body.asFull().bytes().length();
98+
HttpUtil.setContentLength(nettyRequest, contentLength);
99+
}
100+
this.content = body;
101+
}
102+
122103
@Override
123104
public void release() {
124-
if (pooled && released.compareAndSet(false, true)) {
125-
request.release();
105+
if (released.compareAndSet(false, true)) {
126106
content.close();
127107
}
128108
}
@@ -134,7 +114,7 @@ public final Map<String, List<String>> getHeaders() {
134114

135115
@Override
136116
public List<String> strictCookies() {
137-
String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
117+
String cookieString = nettyRequest.headers().get(HttpHeaderNames.COOKIE);
138118
if (cookieString != null) {
139119
Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
140120
if (cookies.isEmpty() == false) {
@@ -146,40 +126,36 @@ public List<String> strictCookies() {
146126

147127
@Override
148128
public HttpVersion protocolVersion() {
149-
if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_0)) {
150-
return HttpRequest.HttpVersion.HTTP_1_0;
151-
} else if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_1)) {
152-
return HttpRequest.HttpVersion.HTTP_1_1;
129+
if (nettyRequest.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_0)) {
130+
return HttpVersion.HTTP_1_0;
131+
} else if (nettyRequest.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_1)) {
132+
return HttpVersion.HTTP_1_1;
153133
} else {
154-
throw new IllegalArgumentException("Unexpected http protocol version: " + request.protocolVersion());
134+
throw new IllegalArgumentException("Unexpected http protocol version: " + nettyRequest.protocolVersion());
155135
}
156136
}
157137

158138
@Override
159139
public HttpRequest removeHeader(String header) {
160-
HttpHeaders copiedHeadersWithout = request.headers().copy();
140+
HttpHeaders copiedHeadersWithout = nettyRequest.headers().copy();
161141
copiedHeadersWithout.remove(header);
162-
HttpHeaders copiedTrailingHeadersWithout = request.trailingHeaders().copy();
163-
copiedTrailingHeadersWithout.remove(header);
164-
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(
165-
request.protocolVersion(),
166-
request.method(),
167-
request.uri(),
168-
request.content(),
169-
copiedHeadersWithout,
170-
copiedTrailingHeadersWithout
142+
var requestWithoutHeader = new DefaultHttpRequest(
143+
nettyRequest.protocolVersion(),
144+
nettyRequest.method(),
145+
nettyRequest.uri(),
146+
copiedHeadersWithout
171147
);
172-
return new Netty4HttpRequest(sequence, requestWithoutHeader, released, pooled, content);
148+
return new Netty4HttpRequest(sequence, requestWithoutHeader, content, released, null);
173149
}
174150

175151
@Override
176152
public Netty4FullHttpResponse createResponse(RestStatus status, BytesReference contentRef) {
177-
return new Netty4FullHttpResponse(sequence, request.protocolVersion(), status, contentRef);
153+
return new Netty4FullHttpResponse(sequence, nettyRequest.protocolVersion(), status, contentRef);
178154
}
179155

180156
@Override
181157
public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) {
182-
return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, firstBodyPart);
158+
return new Netty4ChunkedHttpResponse(sequence, nettyRequest.protocolVersion(), status, firstBodyPart);
183159
}
184160

185161
@Override
@@ -188,7 +164,7 @@ public Exception getInboundException() {
188164
}
189165

190166
public io.netty.handler.codec.http.HttpRequest getNettyRequest() {
191-
return request;
167+
return nettyRequest;
192168
}
193169

194170
public static RestRequest.Method translateRequestMethod(HttpMethod httpMethod) {

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,11 @@ public void next() {
7373

7474
public void handleNettyContent(HttpContent httpContent) {
7575
assert ctx.channel().eventLoop().inEventLoop() : Thread.currentThread().getName();
76+
assert readLastChunk == false;
7677
if (closing) {
7778
httpContent.release();
7879
read();
7980
} else {
80-
assert readLastChunk == false;
8181
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
8282
var isLast = httpContent instanceof LastHttpContent;
8383
var buf = Netty4Utils.toReleasableBytesReference(httpContent.content());

0 commit comments

Comments
 (0)