Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Return full error for GRPC error response ([#19568](https://github.com/opensearch-project/OpenSearch/pull/19568))
- Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005))
- Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))
- Add separate grpc.detailed_errors.enabled error handling setting for gRPC ([#19644](https://github.com/opensearch-project/OpenSearch/pull/19644))
- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635))

### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.GRPC_TRANSPORT_SETTING_KEY;
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_BIND_HOST;
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_DETAILED_ERRORS_ENABLED;
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_EXECUTOR_COUNT;
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_HOST;
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_KEEPALIVE_TIMEOUT;
Expand Down Expand Up @@ -205,9 +206,10 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports(
throw new IllegalStateException("createComponents must be called before getAuxTransports to initialize the registry");
}

boolean detailedErrorsEnabled = SETTING_GRPC_DETAILED_ERRORS_ENABLED.get(settings);
List<BindableService> grpcServices = registerGRPCServices(
new DocumentServiceImpl(client),
new SearchServiceImpl(client, queryUtils)
new DocumentServiceImpl(client, detailedErrorsEnabled),
new SearchServiceImpl(client, queryUtils, detailedErrorsEnabled)
);
return Collections.singletonMap(
GRPC_TRANSPORT_SETTING_KEY,
Expand Down Expand Up @@ -248,9 +250,10 @@ public Map<String, Supplier<AuxTransport>> getSecureAuxTransports(
throw new IllegalStateException("createComponents must be called before getSecureAuxTransports to initialize the registry");
}

boolean detailedErrorsEnabled = SETTING_GRPC_DETAILED_ERRORS_ENABLED.get(settings);
List<BindableService> grpcServices = registerGRPCServices(
new DocumentServiceImpl(client),
new SearchServiceImpl(client, queryUtils)
new DocumentServiceImpl(client, detailedErrorsEnabled),
new SearchServiceImpl(client, queryUtils, detailedErrorsEnabled)
);
return Collections.singletonMap(
GRPC_SECURE_TRANSPORT_SETTING_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,19 @@ public class Netty4GrpcServerTransport extends AuxTransport {
Setting.Property.NodeScope
);

/**
* Requests that require detailed error tracing via {@code error_trace} will
* include relevant error details for better debugging in case this setting is enabled.
* Otherwise,when this setting is disabled, only an error summary is included
* when {@code error_trace} is omitted or disabled and an error response is generated in case
* it is enabled.
*/
public static final Setting<Boolean> SETTING_GRPC_DETAILED_ERRORS_ENABLED = Setting.boolSetting(
"grpc.detailed_errors.enabled",
true,
Setting.Property.NodeScope
);

/**
* Port range on which servers bind.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
public class DocumentServiceImpl extends DocumentServiceGrpc.DocumentServiceImplBase {
private static final Logger logger = LogManager.getLogger(DocumentServiceImpl.class);
private final Client client;
private final boolean detailedErrorsEnabled;

/**
* Creates a new DocumentServiceImpl.
*
* @param client Client for executing actions on the local node
* @param detailedErrorsEnabled Whether detailed error tracing is enabled
*/
public DocumentServiceImpl(Client client) {
public DocumentServiceImpl(Client client, boolean detailedErrorsEnabled) {
this.client = client;
this.detailedErrorsEnabled = detailedErrorsEnabled;
}

/**
Expand All @@ -44,6 +47,7 @@ public DocumentServiceImpl(Client client) {
@Override
public void bulk(org.opensearch.protobufs.BulkRequest request, StreamObserver<org.opensearch.protobufs.BulkResponse> responseObserver) {
try {
GrpcErrorHandler.validateErrorTracingConfiguration(detailedErrorsEnabled, request.getGlobalParams());
org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request);
BulkRequestActionListener listener = new BulkRequestActionListener(responseObserver);
client.bulk(bulkRequest, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ public class SearchServiceImpl extends SearchServiceGrpc.SearchServiceImplBase {
private static final Logger logger = LogManager.getLogger(SearchServiceImpl.class);
private final Client client;
private final AbstractQueryBuilderProtoUtils queryUtils;
private final boolean detailedErrorsEnabled;

/**
* Creates a new SearchServiceImpl.
*
* @param client Client for executing actions on the local node
* @param queryUtils Query utils instance for parsing protobuf queries
* @param detailedErrorsEnabled Whether detailed error tracing is enabled
*/
public SearchServiceImpl(Client client, AbstractQueryBuilderProtoUtils queryUtils) {
public SearchServiceImpl(Client client, AbstractQueryBuilderProtoUtils queryUtils, boolean detailedErrorsEnabled) {
if (client == null) {
throw new IllegalArgumentException("Client cannot be null");
}
Expand All @@ -48,6 +50,7 @@ public SearchServiceImpl(Client client, AbstractQueryBuilderProtoUtils queryUtil

this.client = client;
this.queryUtils = queryUtils;
this.detailedErrorsEnabled = detailedErrorsEnabled;
}

/**
Expand All @@ -61,8 +64,8 @@ public void search(
org.opensearch.protobufs.SearchRequest request,
StreamObserver<org.opensearch.protobufs.SearchResponse> responseObserver
) {

try {
GrpcErrorHandler.validateErrorTracingConfiguration(detailedErrorsEnabled, request.getGlobalParams());
org.opensearch.action.search.SearchRequest searchRequest = SearchRequestProtoUtils.prepareRequest(request, client, queryUtils);
SearchRequestActionListener listener = new SearchRequestActionListener(responseObserver);
client.search(searchRequest, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.protobufs.GlobalParams;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
Expand All @@ -38,6 +39,19 @@ private GrpcErrorHandler() {
// Utility class, no instances
}

/**
* Validates if error tracing is allowed based on server configuration and request parameters.
*
* @param detailedErrorsEnabled Whether detailed errors are enabled on the server
* @param globalRequestParams The global parameters from the gRPC request
* @throws IllegalArgumentException if error tracing is requested but disabled by the server side
*/
public static void validateErrorTracingConfiguration(boolean detailedErrorsEnabled, GlobalParams globalRequestParams) {
if (detailedErrorsEnabled == false && globalRequestParams.getErrorTrace()) {
throw new IllegalArgumentException("error traces in responses are disabled.");
}
}

/**
* Converts an exception to an appropriate gRPC StatusRuntimeException.
* Uses comprehensive exception type mapping for granular gRPC status codes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
Expand All @@ -41,32 +42,35 @@ public class SearchServiceImplTests extends OpenSearchTestCase {
public void setup() throws IOException {
MockitoAnnotations.openMocks(this);
queryUtils = QueryBuilderProtoTestUtils.createQueryUtils();
service = new SearchServiceImpl(client, queryUtils);
service = new SearchServiceImpl(client, queryUtils, true);
}

public void testConstructorWithNullClient() {
// Test that constructor throws IllegalArgumentException when client is null
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new SearchServiceImpl(null, queryUtils));
IllegalArgumentException exception = expectThrows(
IllegalArgumentException.class,
() -> new SearchServiceImpl(null, queryUtils, true)
);

assertEquals("Client cannot be null", exception.getMessage());
}

public void testConstructorWithNullQueryUtils() {
// Test that constructor throws IllegalArgumentException when queryUtils is null
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new SearchServiceImpl(client, null));
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new SearchServiceImpl(client, null, true));

assertEquals("Query utils cannot be null", exception.getMessage());
}

public void testConstructorWithBothNull() {
// Test that constructor throws IllegalArgumentException when both parameters are null
// Should fail on the first null check (client)
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new SearchServiceImpl(null, null));
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new SearchServiceImpl(null, null, true));

assertEquals("Client cannot be null", exception.getMessage());
}

public void testSearchSuccess() throws IOException {
public void testSearchSuccess() {
// Create a test request
SearchRequest request = createTestSearchRequest();

Expand All @@ -77,7 +81,7 @@ public void testSearchSuccess() throws IOException {
verify(client).search(any(org.opensearch.action.search.SearchRequest.class), any());
}

public void testSearchWithException() throws IOException {
public void testSearchWithException() {
// Create a test request
SearchRequest request = createTestSearchRequest();

Expand All @@ -91,7 +95,37 @@ public void testSearchWithException() throws IOException {
verify(responseObserver).onError(any());
}

public void testErrorTracingConfigValidationFailsWhenServerSettingIsDisabledAndRequestRequiresTracing() {
// Setup request and the service, server setting is off and request requires tracing
SearchRequest request = createTestSearchRequest();
SearchServiceImpl serviceWithDisabledErrorsTracing = new SearchServiceImpl(client, queryUtils, false);

// Call search method
serviceWithDisabledErrorsTracing.search(request, responseObserver);

// Verify that responseObserver.onError reports request parameter must be disabled
verify(responseObserver).onError(any(StatusRuntimeException.class));
}

public void testErrorTracingConfigValidationPassesWhenServerSettingIsDisabledAndRequestSkipsTracing() {
// Setup request and the service, server setting is off and request skips tracing
SearchRequest request = createTestSearchRequest().toBuilder()
.setGlobalParams(org.opensearch.protobufs.GlobalParams.newBuilder().setErrorTrace(false))
.build();
SearchServiceImpl serviceWithDisabledErrorsTracing = new SearchServiceImpl(client, queryUtils, false);

// Call search method
serviceWithDisabledErrorsTracing.search(request, responseObserver);

// Verify that client.search was called
verify(client).search(any(org.opensearch.action.search.SearchRequest.class), any());
}

private SearchRequest createTestSearchRequest() {
return SearchRequest.newBuilder().addIndex("test-index").setRequestBody(SearchRequestBody.newBuilder().setSize(10).build()).build();
return SearchRequest.newBuilder()
.addIndex("test-index")
.setRequestBody(SearchRequestBody.newBuilder().setSize(10).build())
.setGlobalParams(org.opensearch.protobufs.GlobalParams.newBuilder().setErrorTrace(true).build())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
Expand All @@ -40,10 +41,10 @@ public class DocumentServiceImplTests extends OpenSearchTestCase {
@Before
public void setup() throws IOException {
MockitoAnnotations.openMocks(this);
service = new DocumentServiceImpl(client);
service = new DocumentServiceImpl(client, true);
}

public void testBulkSuccess() throws IOException {
public void testBulkSuccess() {
// Create a test request
BulkRequest request = createTestBulkRequest();

Expand All @@ -54,7 +55,7 @@ public void testBulkSuccess() throws IOException {
verify(client).bulk(any(org.opensearch.action.bulk.BulkRequest.class), any());
}

public void testBulkError() throws IOException {
public void testBulkError() {
// Create a test request
BulkRequest request = createTestBulkRequest();

Expand All @@ -68,6 +69,32 @@ public void testBulkError() throws IOException {
verify(responseObserver).onError(any(RuntimeException.class));
}

public void testErrorTracingConfigValidationFailsWhenServerSettingIsDisabledAndRequestRequiresTracing() {
// Setup request and the service, server setting is off and request requires tracing
BulkRequest request = createTestBulkRequest();
DocumentServiceImpl serviceWithDisabledErrorsTracing = new DocumentServiceImpl(client, false);

// Call bulk method
serviceWithDisabledErrorsTracing.bulk(request, responseObserver);

// Verify that an error was sent
verify(responseObserver).onError(any(StatusRuntimeException.class));
}

public void testErrorTracingConfigValidationPassesWhenServerSettingIsDisabledAndRequestSkipsTracing() {
// Setup request and the service, server setting is off and request does not require tracing
BulkRequest request = createTestBulkRequest().toBuilder()
.setGlobalParams(org.opensearch.protobufs.GlobalParams.newBuilder().setErrorTrace(false))
.build();
DocumentServiceImpl serviceWithDisabledErrorsTracing = new DocumentServiceImpl(client, false);

// Call bulk method
serviceWithDisabledErrorsTracing.bulk(request, responseObserver);

// Verify that client.bulk was called
verify(client).bulk(any(org.opensearch.action.bulk.BulkRequest.class), any());
}

private BulkRequest createTestBulkRequest() {
IndexOperation indexOp = IndexOperation.newBuilder().setXIndex("test-index").setXId("test-id").build();

Expand All @@ -76,6 +103,9 @@ private BulkRequest createTestBulkRequest() {
.setObject(ByteString.copyFromUtf8("{\"field\":\"value\"}"))
.build();

return BulkRequest.newBuilder().addRequestBody(requestBody).build();
return BulkRequest.newBuilder()
.addRequestBody(requestBody)
.setGlobalParams(org.opensearch.protobufs.GlobalParams.newBuilder().setErrorTrace(true))
.build();
}
}
Loading