Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump opensearch-protobufs dependency to 0.24.0 and update transport-grpc module compatibility ([#20059](https://github.com/opensearch-project/OpenSearch/pull/20059))

- Refactor the ShardStats, WarmerStats and IndexingPressureStats class to use the Builder pattern instead of constructors ([#19966](https://github.com/opensearch-project/OpenSearch/pull/19966))
- Cleanup HttpServerTransport.Dispatcher in Netty tests

### Fixed
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,12 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.MockBigArrays;
import org.opensearch.common.util.MockPageCacheRecycler;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -74,35 +71,29 @@ public class Netty4BadRequestTests extends OpenSearchTestCase {
private ThreadPool threadPool;

@Before
public void setup() throws Exception {
public void setup() {
networkService = new NetworkService(Collections.emptyList());
bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
threadPool = new TestThreadPool("test");
}

@After
public void shutdown() throws Exception {
public void shutdown() {
terminate(threadPool);
}

public void testBadParameterEncoding() throws Exception {
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
@Override
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
fail();
}

@Override
public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) {
HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults()
.withDispatchRequest((request, channel, threadContext) -> fail())
.withDispatchBadRequest((channel, threadContext, cause) -> {
try {
final Exception e = cause instanceof Exception ? (Exception) cause : new OpenSearchException(cause);
channel.sendResponse(new BytesRestResponse(channel, RestStatus.BAD_REQUEST, e));
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
}
};

})
.build();
Settings settings = Settings.builder().put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange()).build();
try (
HttpServerTransport httpServerTransport = new Netty4HttpServerTransport(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

package org.opensearch.http.netty4;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.common.network.NetworkAddress;
import org.opensearch.common.network.NetworkService;
Expand All @@ -42,7 +41,6 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.MockBigArrays;
import org.opensearch.common.util.MockPageCacheRecycler;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.common.unit.ByteSizeValue;
Expand All @@ -53,11 +51,8 @@
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.http.NullDispatcher;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.rest.FakeRestRequest;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.NettyAllocator;
Expand Down Expand Up @@ -118,15 +113,15 @@ public class Netty4HttpServerTransportTests extends OpenSearchTestCase {
private ClusterSettings clusterSettings;

@Before
public void setup() throws Exception {
public void setup() {
networkService = new NetworkService(Collections.emptyList());
threadPool = new TestThreadPool("test");
bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
}

@After
public void shutdown() throws Exception {
public void shutdown() {
if (threadPool != null) {
threadPool.shutdownNow();
}
Expand Down Expand Up @@ -175,21 +170,13 @@ private void runExpectHeaderTest(
final int contentLength,
final HttpResponseStatus expectedStatus
) throws InterruptedException {
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
@Override
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done")));
}

@Override
public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) {
logger.error(
new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
cause
);
throw new AssertionError();
}
};
HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults()
.withDispatchRequest(
(request, channel, threadContext) -> channel.sendResponse(
new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done"))
)
)
.build();
try (
Netty4HttpServerTransport transport = new Netty4HttpServerTransport(
settings,
Expand Down Expand Up @@ -280,27 +267,17 @@ public void testBindUnavailableAddress() {

public void testBadRequest() throws InterruptedException {
final AtomicReference<Throwable> causeReference = new AtomicReference<>();
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {

@Override
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request));
throw new AssertionError();
}

@Override
public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults()
.withDispatchBadRequest((channel, threadContext, cause) -> {
causeReference.set(cause);
try {
final OpenSearchException e = new OpenSearchException("you sent a bad request and you should feel bad");
channel.sendResponse(new BytesRestResponse(channel, BAD_REQUEST, e));
} catch (final IOException e) {
throw new AssertionError(e);
}
}

};

})
.build();
final Settings settings;
final int maxInitialLineLength;
final Setting<ByteSizeValue> httpMaxInitialLineLengthSetting = HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH;
Expand Down Expand Up @@ -352,29 +329,16 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
public void testLargeCompressedResponse() throws InterruptedException {
final String responseString = randomAlphaOfLength(4 * 1024 * 1024);
final String url = "/thing";
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {

@Override
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults()
.withDispatchRequest((request, channel, threadContext) -> {
if (url.equals(request.uri())) {
channel.sendResponse(new BytesRestResponse(OK, responseString));
} else {
logger.error("--> Unexpected successful uri [{}]", request.uri());
throw new AssertionError();
}
}

@Override
public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
logger.error(
new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
cause
);
throw new AssertionError();
}

};

})
.build();
try (
Netty4HttpServerTransport transport = new Netty4HttpServerTransport(
Settings.EMPTY,
Expand Down Expand Up @@ -425,25 +389,7 @@ private long getHugeAllocationCount() {
}

public void testCorsRequest() throws InterruptedException {
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {

@Override
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request));
throw new AssertionError();
}

@Override
public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
logger.error(
new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
cause
);
throw new AssertionError();
}

};

final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults().build();
final Settings settings = createBuilderWithPort().put(SETTING_CORS_ENABLED.getKey(), true)
.put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "test-cors.org")
.build();
Expand Down Expand Up @@ -497,25 +443,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
}

public void testReadTimeout() throws Exception {
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {

@Override
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request));
throw new AssertionError("Should not have received a dispatched request");
}

@Override
public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
logger.error(
new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
cause
);
throw new AssertionError("Should not have received a dispatched request");
}

};

HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults().build();
Settings settings = createBuilderWithPort().put(
HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(),
new TimeValue(randomIntBetween(100, 300))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.netty4;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.test.rest.FakeRestRequest;

import java.util.Map;
import java.util.Optional;

/**
* A builder for creating instances of {@link HttpServerTransport.Dispatcher}
* with sensible defaults for testing purposes.
*/
public class TestDispatcherBuilder {

private static final Logger logger = LogManager.getLogger(TestDispatcherBuilder.class);

private DispatchRequest dispatchRequest = (request, channel, threadContext) -> {
logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request));
throw new AssertionError();
};
private DispatchBadRequest dispatchBadRequest = (channel, threadContext, cause) -> {
logger.error(
new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
cause
);
throw new AssertionError(cause);
};
private DispatchHandler dispatchHandler = (uri, rawPath, method, params) -> Optional.empty();

public static TestDispatcherBuilder withDefaults() {
return new TestDispatcherBuilder();
}

public TestDispatcherBuilder withDispatchRequest(DispatchRequest dispatchRequest) {
this.dispatchRequest = dispatchRequest;
return this;
}

public TestDispatcherBuilder withDispatchBadRequest(DispatchBadRequest dispatchBadRequest) {
this.dispatchBadRequest = dispatchBadRequest;
return this;
}

public TestDispatcherBuilder withDispatchHandler(DispatchHandler dispatchHandler) {
this.dispatchHandler = dispatchHandler;
return this;
}

public HttpServerTransport.Dispatcher build() {
return new HttpServerTransport.Dispatcher() {
@Override
public Optional<RestHandler> dispatchHandler(
String uri,
String rawPath,
RestRequest.Method method,
Map<String, String> params
) {
return dispatchHandler.dispatchHandler(uri, rawPath, method, params);
}

@Override
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
dispatchRequest.dispatchRequest(request, channel, threadContext);
}

@Override
public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) {
dispatchBadRequest.dispatchBadRequest(channel, threadContext, cause);
}
};
}

@FunctionalInterface
public interface DispatchRequest {
void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext);
}

@FunctionalInterface
public interface DispatchBadRequest {
void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause);
}

@FunctionalInterface
public interface DispatchHandler {
Optional<RestHandler> dispatchHandler(String uri, String rawPath, RestRequest.Method method, Map<String, String> params);
}

}
Loading
Loading