Skip to content

Commit 6d3914c

Browse files
committed
test failures
1 parent 9ced839 commit 6d3914c

File tree

10 files changed

+91
-57
lines changed

10 files changed

+91
-57
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/MissingReadDetector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
4747
if (pendingRead == false) {
4848
long now = timer.absoluteTimeInMillis();
4949
if (now >= lastRead + interval) {
50+
// if you encounter this warning during test make sure you consume content of RestRequest if it's a stream
51+
// or use AggregatingDispatcher that will consume stream fully and produce RestRequest with full content.
5052
logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead));
5153
}
5254
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,14 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
144144
assert currentRequestStream != null : "current stream must exists before handling http content";
145145
shouldRead = false;
146146
currentRequestStream.handleNettyContent((HttpContent) msg);
147-
if (msg instanceof LastHttpContent) {
148-
currentRequestStream = null;
149-
}
150147
}
151148
} finally {
152149
if (shouldRead) {
153150
ctx.channel().eventLoop().execute(ctx::read);
154151
}
152+
if (msg instanceof LastHttpContent) {
153+
currentRequestStream = null;
154+
}
155155
activityTracker.stopActivity();
156156
}
157157
}
@@ -161,7 +161,7 @@ protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpReque
161161
final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
162162
boolean success = false;
163163
assert Transports.assertDefaultThreadContext(serverTransport.getThreadPool().getThreadContext());
164-
assert Transports.assertTransportThread();
164+
assert ctx.channel().eventLoop().inEventLoop();
165165
try {
166166
serverTransport.incomingRequest(pipelinedRequest, channel);
167167
success = true;

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,17 +105,19 @@ public void close() {
105105

106106
private void doClose() {
107107
assert ctx.channel().eventLoop().inEventLoop() : Thread.currentThread().getName();
108-
closing = true;
109-
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
110-
for (var tracer : tracingHandlers) {
111-
Releasables.closeExpectNoException(tracer);
108+
if (closing == false) {
109+
closing = true;
110+
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
111+
for (var tracer : tracingHandlers) {
112+
Releasables.closeExpectNoException(tracer);
113+
}
114+
if (handler != null) {
115+
handler.close();
116+
}
112117
}
113-
if (handler != null) {
114-
handler.close();
118+
if (readLastChunk == false) {
119+
read();
115120
}
116121
}
117-
if (readLastChunk == false) {
118-
read();
119-
}
120122
}
121123
}

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.netty.handler.codec.DecoderResult;
2020
import io.netty.handler.codec.http.DefaultFullHttpRequest;
2121
import io.netty.handler.codec.http.DefaultHttpContent;
22+
import io.netty.handler.codec.http.DefaultHttpRequest;
2223
import io.netty.handler.codec.http.DefaultHttpResponse;
2324
import io.netty.handler.codec.http.DefaultLastHttpContent;
2425
import io.netty.handler.codec.http.FullHttpResponse;
@@ -35,15 +36,24 @@
3536
import org.elasticsearch.common.bytes.BytesReference;
3637
import org.elasticsearch.common.bytes.ReleasableBytesReference;
3738
import org.elasticsearch.common.bytes.ZeroBytesReference;
39+
import org.elasticsearch.common.network.NetworkService;
3840
import org.elasticsearch.common.network.ThreadWatchdog;
3941
import org.elasticsearch.common.network.ThreadWatchdogHelper;
4042
import org.elasticsearch.common.recycler.Recycler;
43+
import org.elasticsearch.common.settings.ClusterSettings;
44+
import org.elasticsearch.common.settings.Settings;
45+
import org.elasticsearch.http.AggregatingDispatcher;
4146
import org.elasticsearch.http.HttpResponse;
4247
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
4348
import org.elasticsearch.rest.RestStatus;
49+
import org.elasticsearch.telemetry.tracing.Tracer;
4450
import org.elasticsearch.test.ESTestCase;
51+
import org.elasticsearch.threadpool.TestThreadPool;
52+
import org.elasticsearch.threadpool.ThreadPool;
4553
import org.elasticsearch.transport.netty4.Netty4Utils;
4654
import org.elasticsearch.transport.netty4.NettyAllocator;
55+
import org.elasticsearch.transport.netty4.SharedGroupFactory;
56+
import org.elasticsearch.transport.netty4.TLSConfig;
4757
import org.junit.After;
4858

4959
import java.nio.channels.ClosedChannelException;
@@ -70,7 +80,6 @@
7080
import static org.hamcrest.Matchers.not;
7181
import static org.hamcrest.Matchers.sameInstance;
7282
import static org.hamcrest.core.Is.is;
73-
import static org.mockito.Mockito.mock;
7483

7584
public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
7685

@@ -79,11 +88,14 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
7988
private final Map<String, CountDownLatch> waitingRequests = new ConcurrentHashMap<>();
8089
private final Map<String, CountDownLatch> finishingRequests = new ConcurrentHashMap<>();
8190

91+
private final ThreadPool threadPool = new TestThreadPool("pipelining test");
92+
8293
@After
8394
public void tearDown() throws Exception {
8495
waitingRequests.keySet().forEach(this::finishRequest);
8596
terminateExecutorService(handlerService);
8697
terminateExecutorService(eventLoopService);
98+
threadPool.shutdownNow();
8799
super.tearDown();
88100
}
89101

@@ -126,12 +138,31 @@ public void testThatPipeliningWorksWithFastSerializedRequests() throws Interrupt
126138
}
127139

128140
private EmbeddedChannel makeEmbeddedChannelWithSimulatedWork(int numberOfRequests) {
129-
return new EmbeddedChannel(new Netty4HttpPipeliningHandler(numberOfRequests, null, new ThreadWatchdog.ActivityTracker()) {
130-
@Override
131-
protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) {
132-
ctx.fireChannelRead(pipelinedRequest);
133-
}
134-
}, new WorkEmulatorHandler());
141+
return new EmbeddedChannel(
142+
new Netty4HttpPipeliningHandler(numberOfRequests, httpServerTransport(), new ThreadWatchdog.ActivityTracker()) {
143+
@Override
144+
protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) {
145+
ctx.fireChannelRead(pipelinedRequest);
146+
}
147+
},
148+
new WorkEmulatorHandler()
149+
);
150+
}
151+
152+
private Netty4HttpServerTransport httpServerTransport() {
153+
return new Netty4HttpServerTransport(
154+
Settings.EMPTY,
155+
new NetworkService(List.of()),
156+
threadPool,
157+
xContentRegistry(),
158+
new AggregatingDispatcher(),
159+
ClusterSettings.createBuiltInClusterSettings(),
160+
new SharedGroupFactory(Settings.EMPTY),
161+
Tracer.NOOP,
162+
TLSConfig.noTLS(),
163+
null,
164+
null
165+
);
135166
}
136167

137168
public void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() throws InterruptedException {
@@ -193,7 +224,7 @@ public void testThatPipeliningClosesConnectionWithTooManyEvents() throws Interru
193224
public void testPipeliningRequestsAreReleased() {
194225
final int numberOfRequests = 10;
195226
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(
196-
new Netty4HttpPipeliningHandler(numberOfRequests + 1, null, new ThreadWatchdog.ActivityTracker())
227+
new Netty4HttpPipeliningHandler(numberOfRequests + 1, httpServerTransport(), new ThreadWatchdog.ActivityTracker())
197228
);
198229

199230
for (int i = 0; i < numberOfRequests; i++) {
@@ -485,7 +516,7 @@ public void testActivityTracking() {
485516
final var watchdog = new ThreadWatchdog();
486517
final var activityTracker = watchdog.getActivityTrackerForCurrentThread();
487518
final var requestHandled = new AtomicBoolean();
488-
final var handler = new Netty4HttpPipeliningHandler(Integer.MAX_VALUE, mock(Netty4HttpServerTransport.class), activityTracker) {
519+
final var handler = new Netty4HttpPipeliningHandler(Integer.MAX_VALUE, httpServerTransport(), activityTracker) {
489520
@Override
490521
protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) {
491522
// thread is not idle while handling the request
@@ -526,11 +557,7 @@ private static void assertDoneWithClosedChannel(ChannelPromise chunkedWritePromi
526557
}
527558

528559
private Netty4HttpPipeliningHandler getTestHttpHandler() {
529-
return new Netty4HttpPipeliningHandler(
530-
Integer.MAX_VALUE,
531-
mock(Netty4HttpServerTransport.class),
532-
new ThreadWatchdog.ActivityTracker()
533-
) {
560+
return new Netty4HttpPipeliningHandler(Integer.MAX_VALUE, httpServerTransport(), new ThreadWatchdog.ActivityTracker()) {
534561
@Override
535562
protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) {
536563
ctx.fireChannelRead(pipelinedRequest);
@@ -591,8 +618,8 @@ private void assertReadHttpMessageHasContent(EmbeddedChannel embeddedChannel, St
591618
assertThat(data, is(expectedContent));
592619
}
593620

594-
private DefaultFullHttpRequest createHttpRequest(String uri) {
595-
return new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uri);
621+
private Object[] createHttpRequest(String uri) {
622+
return new Object[] { new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, uri), LastHttpContent.EMPTY_LAST_CONTENT };
596623
}
597624

598625
private class WorkEmulatorHandler extends SimpleChannelInboundHandler<Netty4HttpRequest> {

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
import org.elasticsearch.common.settings.ClusterSettings;
2121
import org.elasticsearch.common.settings.Settings;
2222
import org.elasticsearch.common.transport.TransportAddress;
23+
import org.elasticsearch.http.AggregatingDispatcher;
2324
import org.elasticsearch.http.HttpChannel;
2425
import org.elasticsearch.http.HttpRequest;
2526
import org.elasticsearch.http.HttpResponse;
2627
import org.elasticsearch.http.HttpServerTransport;
27-
import org.elasticsearch.http.NullDispatcher;
2828
import org.elasticsearch.rest.RestStatus;
2929
import org.elasticsearch.telemetry.tracing.Tracer;
3030
import org.elasticsearch.test.ESTestCase;
@@ -103,7 +103,7 @@ class CustomNettyHttpServerTransport extends Netty4HttpServerTransport {
103103
Netty4HttpServerPipeliningTests.this.networkService,
104104
Netty4HttpServerPipeliningTests.this.threadPool,
105105
xContentRegistry(),
106-
new NullDispatcher(),
106+
new AggregatingDispatcher(),
107107
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
108108
new SharedGroupFactory(settings),
109109
Tracer.NOOP,

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,12 @@
6767
import org.elasticsearch.core.TimeValue;
6868
import org.elasticsearch.core.Tuple;
6969
import org.elasticsearch.http.AbstractHttpServerTransportTestCase;
70+
import org.elasticsearch.http.AggregatingDispatcher;
7071
import org.elasticsearch.http.BindHttpException;
7172
import org.elasticsearch.http.CorsHandler;
7273
import org.elasticsearch.http.HttpHeadersValidationException;
7374
import org.elasticsearch.http.HttpServerTransport;
7475
import org.elasticsearch.http.HttpTransportSettings;
75-
import org.elasticsearch.http.NullDispatcher;
7676
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
7777
import org.elasticsearch.http.netty4.internal.HttpValidator;
7878
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
@@ -192,9 +192,9 @@ private void runExpectHeaderTest(
192192
final int contentLength,
193193
final HttpResponseStatus expectedStatus
194194
) throws InterruptedException {
195-
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
195+
final HttpServerTransport.Dispatcher dispatcher = new AggregatingDispatcher() {
196196
@Override
197-
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
197+
public void dispatchAggregatedRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
198198
channel.sendResponse(new RestResponse(OK, RestResponse.TEXT_CONTENT_TYPE, new BytesArray("done")));
199199
}
200200

@@ -262,7 +262,7 @@ public void testBindUnavailableAddress() {
262262
networkService,
263263
threadPool,
264264
xContentRegistry(),
265-
new NullDispatcher(),
265+
new AggregatingDispatcher(),
266266
clusterSettings,
267267
new SharedGroupFactory(Settings.EMPTY),
268268
Tracer.NOOP,
@@ -283,7 +283,7 @@ public void testBindUnavailableAddress() {
283283
networkService,
284284
threadPool,
285285
xContentRegistry(),
286-
new NullDispatcher(),
286+
new AggregatingDispatcher(),
287287
clusterSettings,
288288
new SharedGroupFactory(settings),
289289
Tracer.NOOP,
@@ -850,9 +850,9 @@ public void testLargeRequestIsNeverDispatched() throws Exception {
850850
final Settings settings = createBuilderWithPort().put(HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), "1mb")
851851
.build();
852852
final String requestString = randomAlphaOfLength(2 * 1024 * 1024); // request size is twice the limit
853-
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
853+
final HttpServerTransport.Dispatcher dispatcher = new AggregatingDispatcher() {
854854
@Override
855-
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
855+
public void dispatchAggregatedRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
856856
throw new AssertionError("Request dispatched but shouldn't");
857857
}
858858

@@ -1058,9 +1058,9 @@ private void runRespondAfterServiceCloseTest(boolean clientCancel) throws Except
10581058
final SubscribableListener<Void> transportClosedFuture = new SubscribableListener<>();
10591059
final CountDownLatch handlingRequestLatch = new CountDownLatch(1);
10601060

1061-
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
1061+
final HttpServerTransport.Dispatcher dispatcher = new AggregatingDispatcher() {
10621062
@Override
1063-
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
1063+
public void dispatchAggregatedRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
10641064
assertEquals(request.uri(), url);
10651065
final var response = RestResponse.chunked(
10661066
OK,

server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
import org.elasticsearch.common.util.BigArrays;
1818
import org.elasticsearch.common.util.PageCacheRecycler;
1919
import org.elasticsearch.common.util.concurrent.ThreadContext;
20+
import org.elasticsearch.http.AggregatingDispatcher;
2021
import org.elasticsearch.http.HttpInfo;
2122
import org.elasticsearch.http.HttpPreRequest;
2223
import org.elasticsearch.http.HttpServerTransport;
2324
import org.elasticsearch.http.HttpStats;
24-
import org.elasticsearch.http.NullDispatcher;
2525
import org.elasticsearch.indices.breaker.CircuitBreakerService;
2626
import org.elasticsearch.plugins.NetworkPlugin;
2727
import org.elasticsearch.telemetry.tracing.Tracer;
@@ -299,7 +299,7 @@ private NetworkModule newNetworkModule(Settings settings, NetworkPlugin... plugi
299299
null,
300300
xContentRegistry(),
301301
null,
302-
new NullDispatcher(),
302+
new AggregatingDispatcher(),
303303
(preRequest, threadContext) -> {},
304304
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
305305
Tracer.NOOP

test/framework/src/main/java/org/elasticsearch/http/NullDispatcher.java renamed to test/framework/src/main/java/org/elasticsearch/http/AggregatingDispatcher.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,16 @@
1111

1212
import org.elasticsearch.common.util.concurrent.ThreadContext;
1313
import org.elasticsearch.rest.RestChannel;
14+
import org.elasticsearch.rest.RestContentAggregator;
1415
import org.elasticsearch.rest.RestRequest;
1516

16-
public class NullDispatcher implements HttpServerTransport.Dispatcher {
17+
public class AggregatingDispatcher implements HttpServerTransport.Dispatcher {
1718

18-
@Override
19-
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
19+
public void dispatchAggregatedRequest(RestRequest restRequest, RestChannel restChannel, ThreadContext threadContext) {}
2020

21+
@Override
22+
public final void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
23+
RestContentAggregator.aggregate(request, (r) -> dispatchAggregatedRequest(r, channel, threadContext));
2124
}
2225

2326
@Override

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportCloseNotifyTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.elasticsearch.common.util.concurrent.ThreadContext;
3333
import org.elasticsearch.env.TestEnvironment;
3434
import org.elasticsearch.http.AbstractHttpServerTransportTestCase;
35-
import org.elasticsearch.http.HttpServerTransport;
35+
import org.elasticsearch.http.AggregatingDispatcher;
3636
import org.elasticsearch.http.netty4.Netty4HttpServerTransport;
3737
import org.elasticsearch.rest.RestChannel;
3838
import org.elasticsearch.rest.RestRequest;
@@ -255,13 +255,13 @@ public void close() {
255255
}
256256
}
257257

258-
private static class QueuedDispatcher implements HttpServerTransport.Dispatcher {
258+
private static class QueuedDispatcher extends AggregatingDispatcher {
259259
BlockingQueue<ReqCtx> reqQueue = new LinkedBlockingDeque<>();
260260
BlockingDeque<ErrCtx> errQueue = new LinkedBlockingDeque<>();
261261

262262
@Override
263-
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
264-
reqQueue.add(new ReqCtx(request, channel, threadContext));
263+
public void dispatchAggregatedRequest(RestRequest restRequest, RestChannel restChannel, ThreadContext threadContext) {
264+
reqQueue.add(new ReqCtx(restRequest, restChannel, threadContext));
265265
}
266266

267267
@Override

0 commit comments

Comments
 (0)