Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions .idea/runConfigurations/Debug_OpenSearch.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump opensearch-protobufs dependency to 0.24.0 and update transport-grpc module compatibility ([#20059](https://github.com/opensearch-project/OpenSearch/pull/20059))

- Refactor the ShardStats, WarmerStats and IndexingPressureStats class to use the Builder pattern instead of constructors ([#19966](https://github.com/opensearch-project/OpenSearch/pull/19966))
- Cleanup HttpServerTransport.Dispatcher in Netty tests ([#20160](https://github.com/opensearch-project/OpenSearch/pull/20160))

### Fixed
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))
Expand Down
2 changes: 2 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,5 @@ ${path.logs}
# Once there is no observed impact on performance, this feature flag can be removed.
#
#opensearch.experimental.optimization.datetime_formatter_caching.enabled: false
aux.transport.types: [transport-grpc]
aux.transport.transport-grpc.port: 9400
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,12 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.MockBigArrays;
import org.opensearch.common.util.MockPageCacheRecycler;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -74,35 +71,29 @@ public class Netty4BadRequestTests extends OpenSearchTestCase {
private ThreadPool threadPool;

@Before
public void setup() throws Exception {
public void setup() {
networkService = new NetworkService(Collections.emptyList());
bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
threadPool = new TestThreadPool("test");
}

@After
public void shutdown() throws Exception {
public void shutdown() {
terminate(threadPool);
}

public void testBadParameterEncoding() throws Exception {
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
@Override
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
fail();
}

@Override
public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) {
HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults()
.withDispatchRequest((request, channel, threadContext) -> fail())
.withDispatchBadRequest((channel, threadContext, cause) -> {
try {
final Exception e = cause instanceof Exception ? (Exception) cause : new OpenSearchException(cause);
channel.sendResponse(new BytesRestResponse(channel, RestStatus.BAD_REQUEST, e));
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
}
};

})
.build();
Settings settings = Settings.builder().put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange()).build();
try (
HttpServerTransport httpServerTransport = new Netty4HttpServerTransport(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

package org.opensearch.http.netty4;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.common.network.NetworkAddress;
import org.opensearch.common.network.NetworkService;
Expand All @@ -42,7 +41,6 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.MockBigArrays;
import org.opensearch.common.util.MockPageCacheRecycler;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.common.unit.ByteSizeValue;
Expand All @@ -53,11 +51,8 @@
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.http.NullDispatcher;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.rest.FakeRestRequest;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.NettyAllocator;
Expand Down Expand Up @@ -118,15 +113,15 @@ public class Netty4HttpServerTransportTests extends OpenSearchTestCase {
private ClusterSettings clusterSettings;

@Before
public void setup() throws Exception {
public void setup() {
networkService = new NetworkService(Collections.emptyList());
threadPool = new TestThreadPool("test");
bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
}

@After
public void shutdown() throws Exception {
public void shutdown() {
if (threadPool != null) {
threadPool.shutdownNow();
}
Expand Down Expand Up @@ -175,21 +170,13 @@ private void runExpectHeaderTest(
final int contentLength,
final HttpResponseStatus expectedStatus
) throws InterruptedException {
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
@Override
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done")));
}

@Override
public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) {
logger.error(
new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
cause
);
throw new AssertionError();
}
};
HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults()
.withDispatchRequest(
(request, channel, threadContext) -> channel.sendResponse(
new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done"))
)
)
.build();
try (
Netty4HttpServerTransport transport = new Netty4HttpServerTransport(
settings,
Expand Down Expand Up @@ -280,27 +267,17 @@ public void testBindUnavailableAddress() {

public void testBadRequest() throws InterruptedException {
final AtomicReference<Throwable> causeReference = new AtomicReference<>();
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {

@Override
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request));
throw new AssertionError();
}

@Override
public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults()
.withDispatchBadRequest((channel, threadContext, cause) -> {
causeReference.set(cause);
try {
final OpenSearchException e = new OpenSearchException("you sent a bad request and you should feel bad");
channel.sendResponse(new BytesRestResponse(channel, BAD_REQUEST, e));
} catch (final IOException e) {
throw new AssertionError(e);
}
}

};

})
.build();
final Settings settings;
final int maxInitialLineLength;
final Setting<ByteSizeValue> httpMaxInitialLineLengthSetting = HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH;
Expand Down Expand Up @@ -352,29 +329,16 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
public void testLargeCompressedResponse() throws InterruptedException {
final String responseString = randomAlphaOfLength(4 * 1024 * 1024);
final String url = "/thing";
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {

@Override
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults()
.withDispatchRequest((request, channel, threadContext) -> {
if (url.equals(request.uri())) {
channel.sendResponse(new BytesRestResponse(OK, responseString));
} else {
logger.error("--> Unexpected successful uri [{}]", request.uri());
throw new AssertionError();
}
}

@Override
public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
logger.error(
new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
cause
);
throw new AssertionError();
}

};

})
.build();
try (
Netty4HttpServerTransport transport = new Netty4HttpServerTransport(
Settings.EMPTY,
Expand Down Expand Up @@ -425,25 +389,7 @@ private long getHugeAllocationCount() {
}

public void testCorsRequest() throws InterruptedException {
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {

@Override
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request));
throw new AssertionError();
}

@Override
public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
logger.error(
new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
cause
);
throw new AssertionError();
}

};

final HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults().build();
final Settings settings = createBuilderWithPort().put(SETTING_CORS_ENABLED.getKey(), true)
.put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "test-cors.org")
.build();
Expand Down Expand Up @@ -497,25 +443,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
}

public void testReadTimeout() throws Exception {
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {

@Override
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request));
throw new AssertionError("Should not have received a dispatched request");
}

@Override
public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
logger.error(
new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
cause
);
throw new AssertionError("Should not have received a dispatched request");
}

};

HttpServerTransport.Dispatcher dispatcher = TestDispatcherBuilder.withDefaults().build();
Settings settings = createBuilderWithPort().put(
HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(),
new TimeValue(randomIntBetween(100, 300))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.netty4;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.test.rest.FakeRestRequest;

import java.util.Map;
import java.util.Optional;

/**
* A builder for creating instances of {@link HttpServerTransport.Dispatcher}
* with sensible defaults for testing purposes.
*/
public class TestDispatcherBuilder {

private static final Logger logger = LogManager.getLogger(TestDispatcherBuilder.class);

private DispatchRequest dispatchRequest = (request, channel, threadContext) -> {
logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request));
throw new AssertionError();
};
private DispatchBadRequest dispatchBadRequest = (channel, threadContext, cause) -> {
logger.error(
new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
cause
);
throw new AssertionError(cause);
};
private DispatchHandler dispatchHandler = (uri, rawPath, method, params) -> Optional.empty();

public static TestDispatcherBuilder withDefaults() {
return new TestDispatcherBuilder();
}

public TestDispatcherBuilder withDispatchRequest(DispatchRequest dispatchRequest) {
this.dispatchRequest = dispatchRequest;
return this;
}

public TestDispatcherBuilder withDispatchBadRequest(DispatchBadRequest dispatchBadRequest) {
this.dispatchBadRequest = dispatchBadRequest;
return this;
}

public TestDispatcherBuilder withDispatchHandler(DispatchHandler dispatchHandler) {
this.dispatchHandler = dispatchHandler;
return this;
}

public HttpServerTransport.Dispatcher build() {
return new HttpServerTransport.Dispatcher() {
@Override
public Optional<RestHandler> dispatchHandler(
String uri,
String rawPath,
RestRequest.Method method,
Map<String, String> params
) {
return dispatchHandler.dispatchHandler(uri, rawPath, method, params);
}

@Override
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
dispatchRequest.dispatchRequest(request, channel, threadContext);
}

@Override
public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) {
dispatchBadRequest.dispatchBadRequest(channel, threadContext, cause);
}
};
}

@FunctionalInterface
public interface DispatchRequest {
void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext);
}

@FunctionalInterface
public interface DispatchBadRequest {
void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause);
}

@FunctionalInterface
public interface DispatchHandler {
Optional<RestHandler> dispatchHandler(String uri, String rawPath, RestRequest.Method method, Map<String, String> params);
}

}
Loading
Loading