Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add range validations in query builder and field mapper ([#20497](https://github.com/opensearch-project/OpenSearch/issues/20497))
- [Workload Management] Enhance Scroll API support for autotagging ([#20151](https://github.com/opensearch-project/OpenSearch/pull/20151))
- Add indices to search request slowlog ([#20588](https://github.com/opensearch-project/OpenSearch/pull/20588))
- Add gRPC support for Min and Max metric aggregations ([#20676](https://github.com/opensearch-project/OpenSearch/pull/20676))

### Changed
- Move Randomness from server to libs/common ([#20570](https://github.com/opensearch-project/OpenSearch/pull/20570))
Expand Down
1 change: 1 addition & 0 deletions buildSrc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ tasks.withType(JavaCompile).configureEach {
*****************************************************************************/

repositories {
mavenLocal()
mavenCentral()
gradlePluginPortal()
}
Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ kotlin = "1.7.10"
antlr4 = "4.13.1"
guava = "33.2.1-jre"
gson = "2.13.2"
opensearchprotobufs = "1.2.0"
opensearchprotobufs = "1.4.0-SNAPSHOT"
protobuf = "3.25.8"
jakarta_annotation = "1.3.5"
google_http_client = "1.44.1"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc.spi;

import org.opensearch.protobufs.Aggregate;
import org.opensearch.search.aggregations.InternalAggregation;

import java.io.IOException;

/**
* SPI interface for converting OpenSearch InternalAggregation objects to Protocol Buffer Aggregate messages.
* Follows the same pattern as {@link AggregationBuilderProtoConverter} for request-side conversions.
*
* <p>The registry handles metadata centrally. Converters should delegate to existing
* {@code *AggregateProtoUtils} classes for the actual conversion logic.
*/
public interface AggregateProtoConverter {

/**
* Returns the InternalAggregation subclass this converter handles.
*
* @return The class type of the aggregation this converter supports
*/
Class<? extends InternalAggregation> getHandledAggregationType();

/**
* Converts an InternalAggregation to its Protocol Buffer Aggregate.Builder representation.
* Returns a builder to allow the registry to construct the final Aggregate.
* Mirrors REST-side {@link org.opensearch.search.aggregations.InternalAggregation#toXContent}
*
* @param aggregation The InternalAggregation to convert (guaranteed to be of the handled type)
* @return An Aggregate.Builder with the aggregation-specific fields populated
* @throws IOException if an error occurs during protobuf conversion
*/
Aggregate.Builder toProto(InternalAggregation aggregation) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc.spi;

import org.opensearch.protobufs.Aggregate;
import org.opensearch.search.aggregations.InternalAggregation;

import java.io.IOException;

/**
* SPI interface for the aggregate converter registry.
* Provides the main entry point for converting InternalAggregation objects to Protocol Buffer Aggregate messages.
*/
public interface AggregateProtoConverterRegistry {

/**
* Converts an InternalAggregation to its Protocol Buffer Aggregate representation.
* Handles metadata and delegates to the appropriate converter.
*
* @param aggregation The InternalAggregation to convert (must not be null)
* @return The corresponding Protocol Buffer Aggregate message
* @throws IllegalArgumentException if aggregation is null or type is not supported
* @throws IOException if an error occurs during protobuf conversion
*/
Aggregate toProto(InternalAggregation aggregation) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc.spi;

import org.opensearch.protobufs.AggregationContainer;
import org.opensearch.search.aggregations.AggregationBuilder;

/**
* SPI interface for converting protobuf aggregation containers to OpenSearch AggregationBuilders.
* Follows the same pattern as {@link QueryBuilderProtoConverter}.
*
* <p>The registry handles metadata and subaggregations. Converters should delegate to existing
* {@code *ProtoUtils} classes.
*
* @see org.opensearch.search.aggregations.AggregatorFactories#parseAggregators
*/
public interface AggregationBuilderProtoConverter {

/**
* Returns the aggregation case this converter handles.
*/
AggregationContainer.AggregationContainerCase getHandledAggregationCase();

/**
* Converts a protobuf aggregation container to an AggregationBuilder.
* Similar to {@link org.opensearch.search.aggregations.AggregatorFactories.Builder#addAggregator}.
*
* @param name The aggregation name
* @param container The protobuf container
* @return The OpenSearch AggregationBuilder
*/
AggregationBuilder fromProto(String name, AggregationContainer container);

/**
* Sets the registry for nested aggregations. Default no-op for metric aggregations.
*/
default void setRegistry(AggregationBuilderProtoConverterRegistry registry) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc.spi;

import org.opensearch.protobufs.AggregationContainer;
import org.opensearch.search.aggregations.AggregationBuilder;

/**
* SPI interface for the aggregation converter registry.
* Handles converter lookup, metadata, and recursive subaggregation parsing.
*
* @see org.opensearch.search.aggregations.AggregatorFactories#parseAggregators
*/
public interface AggregationBuilderProtoConverterRegistry {

/**
* Converts a protobuf aggregation container to an OpenSearch AggregationBuilder.
* Similar to {@link org.opensearch.search.aggregations.AggregatorFactories#parseAggregators}.
*
* @param name The aggregation name
* @param container The protobuf container
* @return The AggregationBuilder with metadata and subaggregations
*/
AggregationBuilder fromProto(String name, AggregationContainer container);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

package org.opensearch.transport.grpc;

import org.opensearch.protobufs.Aggregate;
import org.opensearch.protobufs.AggregationContainer;
import org.opensearch.protobufs.MaxAggregation;
import org.opensearch.protobufs.MinAggregation;
import org.opensearch.protobufs.SearchRequest;
import org.opensearch.protobufs.SearchRequestBody;
import org.opensearch.protobufs.SearchResponse;
Expand Down Expand Up @@ -57,4 +61,86 @@ public void testSearchServiceSearch() throws Exception {
assertEquals("Hit should have correct ID", "1", searchResponse.getHits().getHits(0).getXId());
}
}

/**
* Tests min aggregation via gRPC.
*/
public void testMinAggregation() throws Exception {
// Create a test index
String indexName = "test-min-agg";
createTestIndex(indexName);

// Index documents with different price values
indexTestDocument(indexName, "1", "{\"price\": 10.5}");
indexTestDocument(indexName, "2", "{\"price\": 25.0}");
indexTestDocument(indexName, "3", "{\"price\": 5.2}");

// Create a gRPC client
try (NettyGrpcClient client = createGrpcClient()) {
ManagedChannel channel = client.getChannel();
SearchServiceGrpc.SearchServiceBlockingStub searchStub = SearchServiceGrpc.newBlockingStub(channel);

// Build min aggregation request
MinAggregation minAgg = MinAggregation.newBuilder().setField("price").build();

SearchRequestBody requestBody = SearchRequestBody.newBuilder()
.setSize(0)
.putAggregations("min_price", AggregationContainer.newBuilder().setMin(minAgg).build())
.build();

SearchRequest searchRequest = SearchRequest.newBuilder().addIndex(indexName).setSearchRequestBody(requestBody).build();

// Execute search
SearchResponse response = searchStub.search(searchRequest);

// Verify min aggregation result
assertNotNull("Search response should not be null", response);
assertTrue("Should have aggregations", response.getAggregationsCount() > 0);
Aggregate minResult = response.getAggregationsMap().get("min_price");
assertNotNull("Min aggregation should exist", minResult);
assertTrue("Should have value", minResult.hasValue());
assertEquals("Min value should be 5.2", 5.2, minResult.getValue().getDouble(), 0.001);
}
}

/**
* Tests max aggregation via gRPC.
*/
public void testMaxAggregation() throws Exception {
// Create a test index
String indexName = "test-max-agg";
createTestIndex(indexName);

// Index documents with different price values
indexTestDocument(indexName, "1", "{\"price\": 10.5}");
indexTestDocument(indexName, "2", "{\"price\": 25.0}");
indexTestDocument(indexName, "3", "{\"price\": 5.2}");

// Create a gRPC client
try (NettyGrpcClient client = createGrpcClient()) {
ManagedChannel channel = client.getChannel();
SearchServiceGrpc.SearchServiceBlockingStub searchStub = SearchServiceGrpc.newBlockingStub(channel);

// Build max aggregation request
MaxAggregation maxAgg = MaxAggregation.newBuilder().setField("price").build();

SearchRequestBody requestBody = SearchRequestBody.newBuilder()
.setSize(0)
.putAggregations("max_price", AggregationContainer.newBuilder().setMax(maxAgg).build())
.build();

SearchRequest searchRequest = SearchRequest.newBuilder().addIndex(indexName).setSearchRequestBody(requestBody).build();

// Execute search
SearchResponse response = searchStub.search(searchRequest);

// Verify max aggregation result
assertNotNull("Search response should not be null", response);
assertTrue("Should have aggregations", response.getAggregationsCount() > 0);
Aggregate maxResult = response.getAggregationsMap().get("max_price");
assertNotNull("Max aggregation should exist", maxResult);
assertTrue("Should have value", maxResult.hasValue());
assertEquals("Max value should be 25.0", 25.0, maxResult.getValue().getDouble(), 0.001);
}
}
}
Loading
Loading