diff --git a/.idea/runConfigurations/Debug_OpenSearch.xml b/.idea/runConfigurations/Debug_OpenSearch.xml index fddcf47728460..65412c988926b 100644 --- a/.idea/runConfigurations/Debug_OpenSearch.xml +++ b/.idea/runConfigurations/Debug_OpenSearch.xml @@ -1,11 +1,15 @@ - - + + diff --git a/CHANGELOG.md b/CHANGELOG.md index c69c928a7ab56..95f21e7666f6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,6 +73,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump opensearch-protobufs dependency to 0.24.0 and update transport-grpc module compatibility ([#20059](https://github.com/opensearch-project/OpenSearch/pull/20059)) - Refactor the ShardStats, WarmerStats and IndexingPressureStats class to use the Builder pattern instead of constructors ([#19966](https://github.com/opensearch-project/OpenSearch/pull/19966)) +- Cleanup HttpServerTransport.Dispatcher in Netty tests ([#20160](https://github.com/opensearch-project/OpenSearch/pull/20160)) - Throw exceptions for currently unsupported GRPC request-side fields ([#20162](https://github.com/opensearch-project/OpenSearch/pull/20162)) ### Fixed diff --git a/distribution/src/config/opensearch.yml b/distribution/src/config/opensearch.yml index 29070a59cb5df..e022c5d76084e 100644 --- a/distribution/src/config/opensearch.yml +++ b/distribution/src/config/opensearch.yml @@ -121,3 +121,5 @@ ${path.logs} # Once there is no observed impact on performance, this feature flag can be removed. # #opensearch.experimental.optimization.datetime_formatter_caching.enabled: false +aux.transport.types: [transport-grpc] +aux.transport.transport-grpc.port: 9400 diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4BadRequestTests.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4BadRequestTests.java index 03990c173d547..e07cdc74b41e6 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4BadRequestTests.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4BadRequestTests.java @@ -38,15 +38,12 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.MockBigArrays; import org.opensearch.common.util.MockPageCacheRecycler; -import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; import org.opensearch.core.rest.RestStatus; import org.opensearch.http.HttpServerTransport; import org.opensearch.http.HttpTransportSettings; import org.opensearch.rest.BytesRestResponse; -import org.opensearch.rest.RestChannel; -import org.opensearch.rest.RestRequest; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -74,35 +71,29 @@ public class Netty4BadRequestTests extends OpenSearchTestCase { private ThreadPool threadPool; @Before - public void setup() throws Exception { + public void setup() { networkService = new NetworkService(Collections.emptyList()); bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); threadPool = new TestThreadPool("test"); } @After - public void shutdown() throws Exception { + public void shutdown() { terminate(threadPool); } public void testBadParameterEncoding() throws Exception { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - @Override - public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { - fail(); - } - - @Override - public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) { + HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults() + .withDispatchRequest((request, channel, threadContext) -> fail()) + .withDispatchBadRequest((channel, threadContext, cause) -> { try { final Exception e = cause instanceof Exception ? (Exception) cause : new OpenSearchException(cause); channel.sendResponse(new BytesRestResponse(channel, RestStatus.BAD_REQUEST, e)); } catch (final IOException e) { throw new UncheckedIOException(e); } - } - }; - + }) + .build(); Settings settings = Settings.builder().put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange()).build(); try ( HttpServerTransport httpServerTransport = new Netty4HttpServerTransport( diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpServerTransportTests.java index 05cd7c9fd90d3..483713a42aa9b 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpServerTransportTests.java @@ -32,7 +32,6 @@ package org.opensearch.http.netty4; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; import org.opensearch.common.network.NetworkAddress; import org.opensearch.common.network.NetworkService; @@ -42,7 +41,6 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.MockBigArrays; import org.opensearch.common.util.MockPageCacheRecycler; -import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.common.unit.ByteSizeValue; @@ -53,11 +51,8 @@ import org.opensearch.http.HttpTransportSettings; import org.opensearch.http.NullDispatcher; import org.opensearch.rest.BytesRestResponse; -import org.opensearch.rest.RestChannel; -import org.opensearch.rest.RestRequest; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.test.rest.FakeRestRequest; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.NettyAllocator; @@ -118,7 +113,7 @@ public class Netty4HttpServerTransportTests extends OpenSearchTestCase { private ClusterSettings clusterSettings; @Before - public void setup() throws Exception { + public void setup() { networkService = new NetworkService(Collections.emptyList()); threadPool = new TestThreadPool("test"); bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); @@ -126,7 +121,7 @@ public void setup() throws Exception { } @After - public void shutdown() throws Exception { + public void shutdown() { if (threadPool != null) { threadPool.shutdownNow(); } @@ -175,21 +170,13 @@ private void runExpectHeaderTest( final int contentLength, final HttpResponseStatus expectedStatus ) throws InterruptedException { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - @Override - public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { - channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done"))); - } - - @Override - public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError(); - } - }; + HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults() + .withDispatchRequest( + (request, channel, threadContext) -> channel.sendResponse( + new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done")) + ) + ) + .build(); try ( Netty4HttpServerTransport transport = new Netty4HttpServerTransport( settings, @@ -280,16 +267,8 @@ public void testBindUnavailableAddress() { public void testBadRequest() throws InterruptedException { final AtomicReference causeReference = new AtomicReference<>(); - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { - logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); - throw new AssertionError(); - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults() + .withDispatchBadRequest((channel, threadContext, cause) -> { causeReference.set(cause); try { final OpenSearchException e = new OpenSearchException("you sent a bad request and you should feel bad"); @@ -297,10 +276,8 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } catch (final IOException e) { throw new AssertionError(e); } - } - - }; - + }) + .build(); final Settings settings; final int maxInitialLineLength; final Setting httpMaxInitialLineLengthSetting = HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH; @@ -352,29 +329,16 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th public void testLargeCompressedResponse() throws InterruptedException { final String responseString = randomAlphaOfLength(4 * 1024 * 1024); final String url = "/thing"; - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults() + .withDispatchRequest((request, channel, threadContext) -> { if (url.equals(request.uri())) { channel.sendResponse(new BytesRestResponse(OK, responseString)); } else { logger.error("--> Unexpected successful uri [{}]", request.uri()); throw new AssertionError(); } - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError(); - } - - }; - + }) + .build(); try ( Netty4HttpServerTransport transport = new Netty4HttpServerTransport( Settings.EMPTY, @@ -425,25 +389,7 @@ private long getHugeAllocationCount() { } public void testCorsRequest() throws InterruptedException { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { - logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); - throw new AssertionError(); - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError(); - } - - }; - + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults().build(); final Settings settings = createBuilderWithPort().put(SETTING_CORS_ENABLED.getKey(), true) .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "test-cors.org") .build(); @@ -497,25 +443,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } public void testReadTimeout() throws Exception { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { - logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); - throw new AssertionError("Should not have received a dispatched request"); - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError("Should not have received a dispatched request"); - } - - }; - + HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults().build(); Settings settings = createBuilderWithPort().put( HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(), new TimeValue(randomIntBetween(100, 300)) diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/TestDispatcherBuilder.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/TestDispatcherBuilder.java new file mode 100644 index 0000000000000..d5362bfbc958d --- /dev/null +++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/TestDispatcherBuilder.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.netty4; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.http.HttpServerTransport; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestRequest; + +import java.util.Map; +import java.util.Optional; + +/** + * A builder for creating instances of {@link HttpServerTransport.Dispatcher} + * with sensible defaults for testing purposes. + */ +public class TestDispatcherBuilder { + + private static final Logger logger = LogManager.getLogger(TestDispatcherBuilder.class); + + private DispatchRequest dispatchRequest = (request, channel, threadContext) -> { + logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); + throw new AssertionError(); + }; + private DispatchBadRequest dispatchBadRequest = (channel, threadContext, cause) -> { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError(cause); + }; + private DispatchHandler dispatchHandler = (uri, rawPath, method, params) -> Optional.empty(); + + public static TestDispatcherBuilder withDefaults() { + return new TestDispatcherBuilder(); + } + + public TestDispatcherBuilder withDispatchRequest(DispatchRequest dispatchRequest) { + this.dispatchRequest = dispatchRequest; + return this; + } + + public TestDispatcherBuilder withDispatchBadRequest(DispatchBadRequest dispatchBadRequest) { + this.dispatchBadRequest = dispatchBadRequest; + return this; + } + + public TestDispatcherBuilder withDispatchHandler(DispatchHandler dispatchHandler) { + this.dispatchHandler = dispatchHandler; + return this; + } + + public HttpServerTransport.Dispatcher build() { + return new HttpServerTransport.Dispatcher() { + @Override + public Optional dispatchHandler( + String uri, + String rawPath, + RestRequest.Method method, + Map params + ) { + return dispatchHandler.dispatchHandler(uri, rawPath, method, params); + } + + @Override + public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { + dispatchRequest.dispatchRequest(request, channel, threadContext); + } + + @Override + public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) { + dispatchBadRequest.dispatchBadRequest(channel, threadContext, cause); + } + }; + } + + @FunctionalInterface + public interface DispatchRequest { + void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext); + } + + @FunctionalInterface + public interface DispatchBadRequest { + void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause); + } + + @FunctionalInterface + public interface DispatchHandler { + Optional dispatchHandler(String uri, String rawPath, RestRequest.Method method, Map params); + } + +} diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java index 8ce4e0767ab55..02cd9209dd61d 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java @@ -8,7 +8,6 @@ package org.opensearch.http.netty4.ssl; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; import org.opensearch.common.network.NetworkAddress; import org.opensearch.common.network.NetworkService; @@ -18,7 +17,6 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.MockBigArrays; import org.opensearch.common.util.MockPageCacheRecycler; -import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.common.unit.ByteSizeValue; @@ -30,14 +28,12 @@ import org.opensearch.http.HttpTransportSettings; import org.opensearch.http.NullDispatcher; import org.opensearch.http.netty4.Netty4HttpClient; +import org.opensearch.http.netty4.TestDispatcherBuilder; import org.opensearch.plugins.SecureHttpTransportSettingsProvider; import org.opensearch.plugins.TransportExceptionHandler; import org.opensearch.rest.BytesRestResponse; -import org.opensearch.rest.RestChannel; -import org.opensearch.rest.RestRequest; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.test.rest.FakeRestRequest; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.NettyAllocator; @@ -147,7 +143,7 @@ public class SecureNetty4HttpServerTransportTests extends OpenSearchTestCase { private SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider; @Before - public void setup() throws Exception { + public void setup() { networkService = new NetworkService(Collections.emptyList()); threadPool = new TestThreadPool("test"); bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); @@ -167,7 +163,7 @@ public Optional buildSecureHttpServerEngine(Settings settings, HttpSe } @After - public void shutdown() throws Exception { + public void shutdown() { if (threadPool != null) { threadPool.shutdownNow(); } @@ -179,6 +175,7 @@ public void shutdown() throws Exception { /** * Test that {@link SecureNetty4HttpServerTransport} supports the "Expect: 100-continue" HTTP header + * * @throws InterruptedException if the client communication with the server is interrupted */ public void testExpectContinueHeader() throws InterruptedException { @@ -191,6 +188,7 @@ public void testExpectContinueHeader() throws InterruptedException { * Test that {@link SecureNetty4HttpServerTransport} responds to a * 100-continue expectation with too large a content-length * with a 413 status. + * * @throws InterruptedException if the client communication with the server is interrupted */ public void testExpectContinueHeaderContentLengthTooLong() throws InterruptedException { @@ -203,6 +201,7 @@ public void testExpectContinueHeaderContentLengthTooLong() throws InterruptedExc /** * Test that {@link SecureNetty4HttpServerTransport} responds to an unsupported expectation with a 417 status. + * * @throws InterruptedException if the client communication with the server is interrupted */ public void testExpectUnsupportedExpectation() throws InterruptedException { @@ -216,22 +215,13 @@ private void runExpectHeaderTest( final int contentLength, final HttpResponseStatus expectedStatus ) throws InterruptedException { - - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - @Override - public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { - channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done"))); - } - - @Override - public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError(); - } - }; + HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults() + .withDispatchRequest( + (request, channel, threadContext) -> channel.sendResponse( + new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done")) + ) + ) + .build(); try ( SecureNetty4HttpServerTransport transport = new SecureNetty4HttpServerTransport( settings, @@ -325,16 +315,8 @@ public void testBindUnavailableAddress() { public void testBadRequest() throws InterruptedException { final AtomicReference causeReference = new AtomicReference<>(); - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { - logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); - throw new AssertionError(); - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults() + .withDispatchBadRequest((channel, threadContext, cause) -> { causeReference.set(cause); try { final OpenSearchException e = new OpenSearchException("you sent a bad request and you should feel bad"); @@ -342,10 +324,8 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } catch (final IOException e) { throw new AssertionError(e); } - } - - }; - + }) + .build(); final Settings settings; final int maxInitialLineLength; final Setting httpMaxInitialLineLengthSetting = HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH; @@ -398,29 +378,16 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th public void testLargeCompressedResponse() throws InterruptedException { final String responseString = randomAlphaOfLength(4 * 1024 * 1024); final String url = "/thing"; - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults() + .withDispatchRequest((request, channel, threadContext) -> { if (url.equals(request.uri())) { channel.sendResponse(new BytesRestResponse(OK, responseString)); } else { logger.error("--> Unexpected successful uri [{}]", request.uri()); throw new AssertionError(); } - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError(); - } - - }; - + }) + .build(); try ( SecureNetty4HttpServerTransport transport = new SecureNetty4HttpServerTransport( Settings.EMPTY, @@ -455,25 +422,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } public void testCorsRequest() throws InterruptedException { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { - logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); - throw new AssertionError(); - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError(); - } - - }; - + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults().build(); final Settings settings = createBuilderWithPort().put(SETTING_CORS_ENABLED.getKey(), true) .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "test-cors.org") .build(); @@ -528,25 +477,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } public void testReadTimeout() throws Exception { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { - logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); - throw new AssertionError("Should not have received a dispatched request"); - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError("Should not have received a dispatched request"); - } - - }; - + HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults().build(); Settings settings = createBuilderWithPort().put( HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(), new TimeValue(randomIntBetween(100, 300)) diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4BadRequestTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4BadRequestTests.java index 00ca378a4e46b..e0c15641ea497 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4BadRequestTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4BadRequestTests.java @@ -14,15 +14,12 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.MockBigArrays; import org.opensearch.common.util.MockPageCacheRecycler; -import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; import org.opensearch.core.rest.RestStatus; import org.opensearch.http.HttpServerTransport; import org.opensearch.http.HttpTransportSettings; import org.opensearch.rest.BytesRestResponse; -import org.opensearch.rest.RestChannel; -import org.opensearch.rest.RestRequest; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -51,34 +48,29 @@ public class ReactorNetty4BadRequestTests extends OpenSearchTestCase { private ThreadPool threadPool; @Before - public void setup() throws Exception { + public void setup() { networkService = new NetworkService(Collections.emptyList()); bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); threadPool = new TestThreadPool("test"); } @After - public void shutdown() throws Exception { + public void shutdown() { terminate(threadPool); } public void testBadParameterEncoding() throws Exception { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - @Override - public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { - fail(); - } - - @Override - public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) { + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults() + .withDispatchRequest((request, channel, threadContext) -> fail()) + .withDispatchBadRequest((channel, threadContext, cause) -> { try { final Exception e = cause instanceof Exception ? (Exception) cause : new OpenSearchException(cause); channel.sendResponse(new BytesRestResponse(channel, RestStatus.BAD_REQUEST, e)); } catch (final IOException e) { throw new UncheckedIOException(e); } - } - }; + }) + .build(); Settings settings = Settings.builder().put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange()).build(); try ( diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java index 715f0191fd851..880be7eb51eb5 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java @@ -8,14 +8,12 @@ package org.opensearch.http.reactor.netty4; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.common.lease.Releasable; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.MockBigArrays; import org.opensearch.common.util.MockPageCacheRecycler; -import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.xcontent.support.XContentHttpChunk; import org.opensearch.core.common.transport.TransportAddress; @@ -26,11 +24,9 @@ import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestHandler; import org.opensearch.rest.RestRequest; -import org.opensearch.rest.RestRequest.Method; import org.opensearch.rest.StreamingRestChannel; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.test.rest.FakeRestRequest; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.client.node.NodeClient; @@ -44,7 +40,6 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; -import java.util.Map; import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; @@ -65,7 +60,7 @@ * Tests for the {@link ReactorNetty4HttpServerTransport} class with streaming support. */ public class ReactorNetty4HttpServerTransportStreamingTests extends OpenSearchTestCase { - private static final Function XCONTENT_CONVERTER = (str) -> new ToXContent() { + private static final Function XCONTENT_CONVERTER = str -> new ToXContent() { @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { return builder.startObject().field("doc", str).endObject(); @@ -78,7 +73,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par private ClusterSettings clusterSettings; @Before - public void setup() throws Exception { + public void setup() { networkService = new NetworkService(Collections.emptyList()); threadPool = new TestThreadPool("test"); bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); @@ -86,7 +81,7 @@ public void setup() throws Exception { } @After - public void shutdown() throws Exception { + public void shutdown() { if (threadPool != null) { threadPool.shutdownNow(); } @@ -180,66 +175,45 @@ public void testConnectionsGettingClosedForStreamingRequests() throws Interrupte } private HttpServerTransport.Dispatcher createStreamingDispatcher(String url, String responseString) { - return new HttpServerTransport.Dispatcher() { + return TestDispatcherBuilder.withDefaults().withDispatchHandler((uri, rawPath, method, params) -> Optional.of(new RestHandler() { @Override - public Optional dispatchHandler(String uri, String rawPath, Method method, Map params) { - return Optional.of(new RestHandler() { - @Override - public boolean supportsStreaming() { - return true; - } - - @Override - public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception { - logger.error("--> Unexpected request [{}]", request.uri()); - throw new AssertionError(); - } - }); + public boolean supportsStreaming() { + return true; } @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { - if (url.equals(request.uri())) { - assertThat(channel, instanceOf(StreamingRestChannel.class)); - final StreamingRestChannel streamingChannel = (StreamingRestChannel) channel; - - // Await at most 5 seconds till channel is ready for writing the response stream, fail otherwise - final Mono ready = Mono.fromRunnable(() -> { - while (!streamingChannel.isWritable()) { - Thread.onSpinWait(); - } - }).timeout(Duration.ofSeconds(5)); - - threadPool.executor(ThreadPool.Names.WRITE) - .execute(() -> Flux.concat(Flux.fromArray(newChunks(responseString)).map(e -> { - try (XContentBuilder builder = channel.newBuilder(XContentType.JSON, true)) { - return XContentHttpChunk.from(e.toXContent(builder, ToXContent.EMPTY_PARAMS)); - } catch (final IOException ex) { - throw new UncheckedIOException(ex); - } - }), Mono.just(XContentHttpChunk.last())) - .delaySubscription(ready) - .subscribe(streamingChannel::sendChunk, null, () -> { - if (channel.bytesOutput() instanceof Releasable) { - ((Releasable) channel.bytesOutput()).close(); - } - })); - } else { - logger.error("--> Unexpected successful uri [{}]", request.uri()); - throw new AssertionError(); - } + public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception { + logger.error("--> Unexpected request [{}]", request.uri()); + throw new AssertionError(); } + })).withDispatchRequest((request, channel, threadContext) -> { + if (url.equals(request.uri())) { + assertThat(channel, instanceOf(StreamingRestChannel.class)); + final StreamingRestChannel streamingChannel = (StreamingRestChannel) channel; + + // Await at most 5 seconds till channel is ready for writing the response stream, fail otherwise + final Mono ready = Mono.fromRunnable(() -> { + while (!streamingChannel.isWritable()) { + Thread.onSpinWait(); + } + }).timeout(Duration.ofSeconds(5)); - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); + threadPool.executor(ThreadPool.Names.WRITE).execute(() -> Flux.concat(Flux.fromArray(newChunks(responseString)).map(e -> { + try (XContentBuilder builder = channel.newBuilder(XContentType.JSON, true)) { + return XContentHttpChunk.from(e.toXContent(builder, ToXContent.EMPTY_PARAMS)); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } + }), Mono.just(XContentHttpChunk.last())).delaySubscription(ready).subscribe(streamingChannel::sendChunk, null, () -> { + if (channel.bytesOutput() instanceof Releasable) { + ((Releasable) channel.bytesOutput()).close(); + } + })); + } else { + logger.error("--> Unexpected successful uri [{}]", request.uri()); throw new AssertionError(); } - - }; + }).build(); } private static ToXContent[] newChunks(final String responseString) { diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java index 72d645aaf8022..d9a1d8084a273 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportTests.java @@ -32,7 +32,6 @@ package org.opensearch.http.reactor.netty4; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.common.network.NetworkAddress; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.ClusterSettings; @@ -41,7 +40,6 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.MockBigArrays; import org.opensearch.common.util.MockPageCacheRecycler; -import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.common.unit.ByteSizeValue; @@ -52,11 +50,8 @@ import org.opensearch.http.HttpTransportSettings; import org.opensearch.http.NullDispatcher; import org.opensearch.rest.BytesRestResponse; -import org.opensearch.rest.RestChannel; -import org.opensearch.rest.RestRequest; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.test.rest.FakeRestRequest; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.NettyAllocator; @@ -112,7 +107,7 @@ public class ReactorNetty4HttpServerTransportTests extends OpenSearchTestCase { private ClusterSettings clusterSettings; @Before - public void setup() throws Exception { + public void setup() { networkService = new NetworkService(Collections.emptyList()); threadPool = new TestThreadPool("test"); bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); @@ -120,7 +115,7 @@ public void setup() throws Exception { } @After - public void shutdown() throws Exception { + public void shutdown() { if (threadPool != null) { threadPool.shutdownNow(); } @@ -172,21 +167,14 @@ private void runExpectHeaderTest( final int contentLength, final HttpResponseStatus expectedStatus ) throws InterruptedException { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - @Override - public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { - channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done"))); - } + HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults() + .withDispatchRequest( + (request, channel, threadContext) -> channel.sendResponse( + new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done")) + ) + ) + .build(); - @Override - public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError(); - } - }; try ( ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( settings, @@ -266,20 +254,7 @@ public void testBindUnavailableAddress() { } public void testBadRequest() throws InterruptedException { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { - logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); - throw new AssertionError(); - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error("--> Unexpected bad request request"); - throw new AssertionError(cause); - } - }; - + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults().build(); final Settings settings; final int maxInitialLineLength; final Setting httpMaxInitialLineLengthSetting = HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH; @@ -323,18 +298,11 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } public void testDispatchFailed() throws InterruptedException { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults() + .withDispatchRequest((request, channel, threadContext) -> { throw new RuntimeException("Bad things happen"); - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error("--> Unexpected bad request request"); - throw new AssertionError(cause); - } - }; + }) + .build(); final Settings settings = createSettings(); try ( @@ -370,28 +338,16 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th public void testLargeCompressedResponse() throws InterruptedException { final String responseString = randomAlphaOfLength(4 * 1024 * 1024); final String url = "/thing/"; - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults() + .withDispatchRequest((request, channel, threadContext) -> { if (url.equals(request.uri())) { channel.sendResponse(new BytesRestResponse(OK, responseString)); } else { logger.error("--> Unexpected successful uri [{}]", request.uri()); throw new AssertionError(); } - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError(); - } - - }; + }) + .build(); try ( ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( @@ -442,26 +398,16 @@ private long getHugeAllocationCount() { public void testConnectionsGettingClosed() throws InterruptedException { final String responseString = "ok"; final String url = "/thing/"; - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults() + .withDispatchRequest((request, channel, threadContext) -> { if (url.equals(request.uri())) { channel.sendResponse(new BytesRestResponse(OK, responseString)); } else { logger.error("--> Unexpected successful uri [{}]", request.uri()); throw new AssertionError(); } - } - - @Override - public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError(cause); - } - }; + }) + .build(); try ( ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( @@ -498,25 +444,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, } public void testCorsRequest() throws InterruptedException { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { - logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); - throw new AssertionError(); - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError(); - } - - }; - + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults().build(); final Settings settings = createBuilderWithPort().put(SETTING_CORS_ENABLED.getKey(), true) .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "test-cors.org") .build(); @@ -570,25 +498,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } public void testConnectTimeout() throws Exception { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { - logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); - throw new AssertionError("Should not have received a dispatched request"); - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError("Should not have received a dispatched request"); - } - - }; - + HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults().build(); Settings settings = createBuilderWithPort().put( HttpTransportSettings.SETTING_HTTP_CONNECT_TIMEOUT.getKey(), new TimeValue(randomIntBetween(100, 300)) diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/TestDispatcherBuilder.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/TestDispatcherBuilder.java new file mode 100644 index 0000000000000..3896a6eab190c --- /dev/null +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/TestDispatcherBuilder.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.http.HttpServerTransport; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestRequest; + +import java.util.Map; +import java.util.Optional; + +/** + * A builder for creating instances of {@link HttpServerTransport.Dispatcher} + * with sensible defaults for testing purposes. + */ +public class TestDispatcherBuilder { + + private static final Logger logger = LogManager.getLogger(TestDispatcherBuilder.class); + + private DispatchRequest dispatchRequest = (request, channel, threadContext) -> { + logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); + throw new AssertionError(); + }; + private DispatchBadRequest dispatchBadRequest = (channel, threadContext, cause) -> { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError(cause); + }; + private DispatchHandler dispatchHandler = (uri, rawPath, method, params) -> Optional.empty(); + + public static TestDispatcherBuilder withDefaults() { + return new TestDispatcherBuilder(); + } + + public TestDispatcherBuilder withDispatchRequest(DispatchRequest dispatchRequest) { + this.dispatchRequest = dispatchRequest; + return this; + } + + public TestDispatcherBuilder withDispatchBadRequest(DispatchBadRequest dispatchBadRequest) { + this.dispatchBadRequest = dispatchBadRequest; + return this; + } + + public TestDispatcherBuilder withDispatchHandler(DispatchHandler dispatchHandler) { + this.dispatchHandler = dispatchHandler; + return this; + } + + public HttpServerTransport.Dispatcher build() { + return new HttpServerTransport.Dispatcher() { + @Override + public Optional dispatchHandler( + String uri, + String rawPath, + RestRequest.Method method, + Map params + ) { + return dispatchHandler.dispatchHandler(uri, rawPath, method, params); + } + + @Override + public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { + dispatchRequest.dispatchRequest(request, channel, threadContext); + } + + @Override + public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) { + dispatchBadRequest.dispatchBadRequest(channel, threadContext, cause); + } + }; + } + + @FunctionalInterface + public interface DispatchRequest { + void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext); + } + + @FunctionalInterface + public interface DispatchBadRequest { + void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause); + } + + @FunctionalInterface + public interface DispatchHandler { + Optional dispatchHandler(String uri, String rawPath, RestRequest.Method method, Map params); + } + +} diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java index c5f1e6215f098..125f5157e34ef 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java @@ -10,7 +10,6 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.common.network.NetworkAddress; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.ClusterSettings; @@ -19,7 +18,6 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.MockBigArrays; import org.opensearch.common.util.MockPageCacheRecycler; -import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.common.unit.ByteSizeValue; @@ -31,16 +29,14 @@ import org.opensearch.http.NullDispatcher; import org.opensearch.http.reactor.netty4.ReactorHttpClient; import org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport; +import org.opensearch.http.reactor.netty4.TestDispatcherBuilder; import org.opensearch.plugins.SecureHttpTransportSettingsProvider; import org.opensearch.plugins.TransportExceptionHandler; import org.opensearch.rest.BytesRestResponse; -import org.opensearch.rest.RestChannel; -import org.opensearch.rest.RestRequest; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.BouncyCastleThreadFilter; import org.opensearch.test.KeyStoreUtils; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.test.rest.FakeRestRequest; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.NettyAllocator; @@ -176,7 +172,7 @@ public Optional buildSecureHttpServerEngine(Settings settings, HttpSe } @After - public void shutdown() throws Exception { + public void shutdown() { if (threadPool != null) { threadPool.shutdownNow(); } @@ -188,18 +184,20 @@ public void shutdown() throws Exception { /** * Test that {@link ReactorNetty4HttpServerTransport} supports the "Expect: 100-continue" HTTP header + * * @throws InterruptedException if the client communication with the server is interrupted */ public void testExpectContinueHeader() throws InterruptedException { final Settings settings = createSettings(); final int contentLength = randomIntBetween(1, HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings).bytesAsInt()); - runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength, HttpResponseStatus.CONTINUE); + runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength); } /** * Test that {@link ReactorNetty4HttpServerTransport} responds to a * 100-continue expectation with too large a content-length * with a 413 status. + * * @throws InterruptedException if the client communication with the server is interrupted */ public void testExpectContinueHeaderContentLengthTooLong() throws InterruptedException { @@ -207,39 +205,28 @@ public void testExpectContinueHeaderContentLengthTooLong() throws InterruptedExc final int maxContentLength = randomIntBetween(1, 104857600); final Settings settings = createBuilderWithPort().put(key, maxContentLength + "b").build(); final int contentLength = randomIntBetween(maxContentLength + 1, Integer.MAX_VALUE); - runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE); + runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength); } /** * Test that {@link ReactorNetty4HttpServerTransport} responds to an unsupported expectation with a 417 status. + * * @throws InterruptedException if the client communication with the server is interrupted */ public void testExpectUnsupportedExpectation() throws InterruptedException { Settings settings = createSettings(); - runExpectHeaderTest(settings, "chocolate=yummy", 0, HttpResponseStatus.EXPECTATION_FAILED); + runExpectHeaderTest(settings, "chocolate=yummy", 0); } - private void runExpectHeaderTest( - final Settings settings, - final String expectation, - final int contentLength, - final HttpResponseStatus expectedStatus - ) throws InterruptedException { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - @Override - public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { - channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done"))); - } - - @Override - public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError(); - } - }; + private void runExpectHeaderTest(final Settings settings, final String expectation, final int contentLength) + throws InterruptedException { + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults() + .withDispatchRequest( + (request, channel, threadContext) -> channel.sendResponse( + new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done")) + ) + ) + .build(); try ( ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( settings, @@ -317,19 +304,7 @@ public void testBindUnavailableAddress() { } public void testBadRequest() throws InterruptedException { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { - logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); - throw new AssertionError(); - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error("--> Unexpected bad request request"); - throw new AssertionError(cause); - } - }; + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults().build(); final Settings settings; final int maxInitialLineLength; @@ -375,18 +350,11 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } public void testDispatchFailed() throws InterruptedException { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults() + .withDispatchRequest((request, channel, threadContext) -> { throw new RuntimeException("Bad things happen"); - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error("--> Unexpected bad request request"); - throw new AssertionError(cause); - } - }; + }) + .build(); final Settings settings = createSettings(); try ( @@ -423,28 +391,16 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th public void testLargeCompressedResponse() throws InterruptedException { final String responseString = randomAlphaOfLength(4 * 1024 * 1024); final String url = "/thing/"; - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults() + .withDispatchRequest((request, channel, threadContext) -> { if (url.equals(request.uri())) { channel.sendResponse(new BytesRestResponse(OK, responseString)); } else { logger.error("--> Unexpected successful uri [{}]", request.uri()); throw new AssertionError(); } - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError(); - } - - }; + }) + .build(); try ( ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( @@ -494,25 +450,7 @@ private long getHugeAllocationCount() { } public void testCorsRequest() throws InterruptedException { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { - logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); - throw new AssertionError(); - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError(); - } - - }; - + final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults().build(); final Settings settings = createBuilderWithPort().put(SETTING_CORS_ENABLED.getKey(), true) .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "test-cors.org") .build(); @@ -567,25 +505,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } public void testConnectTimeout() throws Exception { - final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { - - @Override - public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { - logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); - throw new AssertionError("Should not have received a dispatched request"); - } - - @Override - public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { - logger.error( - new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), - cause - ); - throw new AssertionError("Should not have received a dispatched request"); - } - - }; - + HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults().build(); Settings settings = createBuilderWithPort().put( HttpTransportSettings.SETTING_HTTP_CONNECT_TIMEOUT.getKey(), new TimeValue(randomIntBetween(100, 300)) diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/CreateIndexIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/CreateIndexIT.java index ab1a3dd716daa..5cd2d0c8a5310 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/CreateIndexIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/CreateIndexIT.java @@ -32,6 +32,9 @@ package org.opensearch.action.admin.indices.create; +import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite; + +import org.apache.lucene.tests.util.TimeUnits; import org.opensearch.action.UnavailableShardsException; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.admin.indices.alias.Alias; @@ -88,6 +91,7 @@ import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.core.IsNull.notNullValue; +@TimeoutSuite(millis = 2 * TimeUnits.MINUTE) @ClusterScope(scope = Scope.TEST) public class CreateIndexIT extends OpenSearchIntegTestCase {