Skip to content

Commit b6b95c3

Browse files
yiyuabcclaude
andcommitted
Create AggregationsProtoUtils to mirror REST pattern
Adds AggregationsProtoUtils class to complete the REST-to-gRPC mapping for aggregations, mirroring Aggregations.java which iterates over aggregation results and delegates to individual converters. Changes: - Add AggregationsProtoUtils.toProto() to handle aggregations collection - Update SearchResponseSectionsProtoUtils to delegate to new class - Add comprehensive tests for AggregationsProtoUtils - Update to protobufs-1.4.0-SNAPSHOT with new Aggregate structure - Fix pre-existing test compilation errors in Min/MaxAggregateProtoUtilsTests - Disable unsupported nested aggregations code in AggregationBuilderProtoConverterSpiRegistry The REST-to-gRPC pattern mapping is now complete: - Aggregations.java → AggregationsProtoUtils.java (iterate collection) - InternalAggregation.java → AggregateProtoUtils.java (metadata + dispatch) - InternalMin/Max.doXContentBody() → Min/MaxAggregateProtoUtils.toProto() (specific content) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent b90d2c6 commit b6b95c3

File tree

16 files changed

+321
-366
lines changed

16 files changed

+321
-366
lines changed

.idea/runConfigurations/Debug_OpenSearch.xml

Lines changed: 0 additions & 11 deletions
This file was deleted.

buildSrc/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ tasks.withType(JavaCompile).configureEach {
9494
*****************************************************************************/
9595

9696
repositories {
97+
mavenLocal()
9798
mavenCentral()
9899
gradlePluginPortal()
99100
}

modules/transport-grpc/licenses/protobufs-1.2.0.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
a00747d55a39abf63d635b9dcd9986d44d89e6cf

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/search/aggregation/AggregationBuilderProtoConverterSpiRegistry.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -70,21 +70,22 @@ public AggregationBuilder fromProto(String name, AggregationContainer container)
7070
logger.debug("Applied metadata to aggregation '{}': {}", name, metadata);
7171
}
7272

73-
if (container.getAggregationsCount() > 0) {
74-
AggregatorFactories.Builder subFactories = new AggregatorFactories.Builder();
75-
76-
for (Map.Entry<String, AggregationContainer> entry : container.getAggregationsMap().entrySet()) {
77-
String subAggName = entry.getKey();
78-
AggregationContainer subAggContainer = entry.getValue();
79-
80-
logger.debug("Parsing subaggregation '{}' for parent '{}'", subAggName, name);
81-
AggregationBuilder subAgg = fromProto(subAggName, subAggContainer);
82-
subFactories.addAggregator(subAgg);
83-
}
84-
85-
builder.subAggregations(subFactories);
86-
logger.debug("Added {} subaggregation(s) to aggregation '{}'", container.getAggregationsCount(), name);
87-
}
73+
// TODO: Nested aggregations not yet supported in proto definition
74+
// if (container.getAggregationsCount() > 0) {
75+
// AggregatorFactories.Builder subFactories = new AggregatorFactories.Builder();
76+
//
77+
// for (Map.Entry<String, AggregationContainer> entry : container.getAggregationsMap().entrySet()) {
78+
// String subAggName = entry.getKey();
79+
// AggregationContainer subAggContainer = entry.getValue();
80+
//
81+
// logger.debug("Parsing subaggregation '{}' for parent '{}'", subAggName, name);
82+
// AggregationBuilder subAgg = fromProto(subAggName, subAggContainer);
83+
// subFactories.addAggregator(subAgg);
84+
// }
85+
//
86+
// builder.subAggregations(subFactories);
87+
// logger.debug("Added {} subaggregation(s) to aggregation '{}'", container.getAggregationsCount(), name);
88+
// }
8889

8990
return builder;
9091
}

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/response/search/SearchResponseSectionsProtoUtils.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,8 @@
99

1010
import org.opensearch.action.search.SearchResponse;
1111
import org.opensearch.action.search.SearchResponseSections;
12-
import org.opensearch.search.aggregations.InternalAggregations;
1312
import org.opensearch.transport.grpc.proto.response.common.ObjectMapProtoUtils;
14-
import org.opensearch.transport.grpc.proto.response.search.aggregation.AggregateProtoUtils;
13+
import org.opensearch.transport.grpc.proto.response.search.aggregation.AggregationsProtoUtils;
1514

1615
import java.io.IOException;
1716
import java.util.List;
@@ -42,6 +41,9 @@ protected static void toProto(org.opensearch.protobufs.SearchResponse.Builder bu
4241
SearchHitsProtoUtils.toProto(response.getHits(), hitsBuilder);
4342
builder.setHits(hitsBuilder.build());
4443

44+
// Convert aggregations if present
45+
AggregationsProtoUtils.toProto(response.getAggregations(), builder);
46+
4547
// Convert processor results
4648
List<org.opensearch.search.pipeline.ProcessorExecutionDetail> processorResults = response.getInternalResponse()
4749
.getProcessorResult();
@@ -72,11 +74,6 @@ protected static void toProto(org.opensearch.protobufs.SearchResponse.Builder bu
7274
}
7375
}
7476

75-
// Convert aggregations if present
76-
if (response.getAggregations() != null) {
77-
AggregateProtoUtils.toProtoInternal((InternalAggregations) response.getAggregations(), (name, aggProto) -> builder.putAggregations(name, aggProto));
78-
}
79-
8077
// Check for other unsupported features
8178
checkUnsupportedFeatures(response);
8279
}

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/response/search/aggregation/AggregateProtoUtils.java

Lines changed: 20 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,17 @@
99

1010
import org.opensearch.protobufs.Aggregate;
1111
import org.opensearch.protobufs.ObjectMap;
12-
import org.opensearch.search.aggregations.InternalAggregation;
13-
import org.opensearch.search.aggregations.InternalAggregations;
12+
import org.opensearch.search.aggregations.Aggregation;
1413
import org.opensearch.search.aggregations.metrics.InternalMax;
1514
import org.opensearch.search.aggregations.metrics.InternalMin;
1615
import org.opensearch.transport.grpc.proto.response.common.ObjectMapProtoUtils;
1716
import org.opensearch.transport.grpc.proto.response.search.aggregation.metrics.MaxAggregateProtoUtils;
1817
import org.opensearch.transport.grpc.proto.response.search.aggregation.metrics.MinAggregateProtoUtils;
1918

2019
import java.io.IOException;
21-
import java.util.function.Consumer;
2220

2321
/**
24-
* Utility class for converting OpenSearch InternalAggregation objects to Protocol Buffer Aggregate messages.
25-
*
26-
* <p>This class serves as a central dispatcher that routes different aggregation types to their specific
27-
* converters, and provides common helper methods for aggregation serialization that are shared across
28-
* all aggregation types.
29-
*
30-
* @see InternalAggregation
31-
* @see org.opensearch.search.aggregations.Aggregations
22+
* Converts InternalAggregation to Aggregate protobuf.
3223
*/
3324
public class AggregateProtoUtils {
3425

@@ -37,104 +28,39 @@ private AggregateProtoUtils() {
3728
}
3829

3930
/**
40-
* Converts an InternalAggregation to its Protocol Buffer Aggregate representation.
31+
* Converts an Aggregation to Aggregate protobuf.
4132
*
42-
* <p>This method acts as a central dispatcher that routes different aggregation types
43-
* to their specific converter utilities using instanceof checks.
33+
* <p>Dispatches to specific converters and handles metadata centrally.
34+
* Mirrors REST-side {@link org.opensearch.search.aggregations.InternalAggregation#toXContent}.
4435
*
45-
* @param aggregation The OpenSearch internal aggregation (must not be null)
36+
* @param aggregation The OpenSearch aggregation (must not be null)
4637
* @return The corresponding Protocol Buffer Aggregate message
4738
* @throws IllegalArgumentException if aggregation is null or type is not supported
4839
* @throws IOException if an error occurs during protobuf conversion
40+
* @see org.opensearch.search.aggregations.InternalAggregation#toXContent
4941
*/
50-
public static Aggregate toProto(InternalAggregation aggregation) throws IOException {
42+
public static Aggregate toProto(Aggregation aggregation) throws IOException {
5143
if (aggregation == null) {
52-
throw new IllegalArgumentException("InternalAggregation must not be null");
53-
}
54-
55-
Aggregate.Builder aggregateBuilder = Aggregate.newBuilder();
56-
57-
// Dispatch based on runtime type
58-
if (aggregation instanceof InternalMin) {
59-
aggregateBuilder.setMin(MinAggregateProtoUtils.toProto((InternalMin) aggregation));
60-
} else if (aggregation instanceof InternalMax) {
61-
aggregateBuilder.setMax(MaxAggregateProtoUtils.toProto((InternalMax) aggregation));
62-
} else {
63-
// Future aggregation types will be added here
64-
throw new IllegalArgumentException("Unsupported aggregation type: " + aggregation.getClass().getName());
44+
throw new IllegalArgumentException("Aggregation must not be null");
6545
}
6646

67-
return aggregateBuilder.build();
68-
}
47+
Aggregate.Builder builder = Aggregate.newBuilder();
6948

70-
/**
71-
* Sets the aggregation metadata if present.
72-
*
73-
* <p>Mirrors {@link InternalAggregation#toXContent} which serializes metadata when present.
74-
* This is a common helper method used by all aggregation types since all aggregations
75-
* inherit from {@link InternalAggregation} and can optionally have metadata.
76-
*
77-
* <p>Metadata is only included in the protobuf message when non-null and non-empty.
78-
*
79-
* @param metadata The metadata map from InternalAggregation.getMetadata()
80-
* @param setter Consumer that sets the ObjectMap in the protobuf builder (e.g., builder::setMeta)
81-
*/
82-
public static void setMetadataIfPresent(
83-
java.util.Map<String, Object> metadata,
84-
Consumer<ObjectMap> setter
85-
) {
86-
if (metadata != null && !metadata.isEmpty()) {
87-
ObjectMap.Value metaValue = ObjectMapProtoUtils.toProto(metadata);
49+
if (aggregation.getMetadata() != null && !aggregation.getMetadata().isEmpty()) {
50+
ObjectMap.Value metaValue = ObjectMapProtoUtils.toProto(aggregation.getMetadata());
8851
if (metaValue.hasObjectMap()) {
89-
setter.accept(metaValue.getObjectMap());
52+
builder.setMeta(metaValue.getObjectMap());
9053
}
9154
}
92-
}
9355

94-
/**
95-
* Converts sub-aggregations to protobuf format without outer wrapper.
96-
*
97-
* <p>Mirrors {@link org.opensearch.search.aggregations.Aggregations#toXContentInternal} which iterates
98-
* through aggregations and serializes each one. This is called by all bucket aggregations when serializing
99-
* sub-aggregations, as seen in {@link org.opensearch.search.aggregations.bucket.terms.InternalTerms.Bucket#toXContent}.
100-
*
101-
* <p>Iterates through sub-aggregations and converts each to protobuf format, then adds to parent bucket's
102-
* aggregate map. No-op if aggregations is null or empty.
103-
*
104-
* @param aggregations The InternalAggregations from a bucket (can be null or empty)
105-
* @param adder BiConsumer that adds each converted aggregate to the parent bucket builder.
106-
* First parameter is aggregation name, second is the converted Aggregate protobuf.
107-
* @throws IOException if an error occurs during aggregation conversion
108-
*/
109-
public static void toProtoInternal(
110-
InternalAggregations aggregations,
111-
BiConsumerWithException<String, Aggregate> adder
112-
) throws IOException {
113-
if (aggregations != null && !aggregations.asList().isEmpty()) {
114-
for (org.opensearch.search.aggregations.Aggregation agg : aggregations.asList()) {
115-
Aggregate protoAgg = AggregateProtoUtils.toProto((InternalAggregation) agg);
116-
adder.accept(agg.getName(), protoAgg);
117-
}
56+
if (aggregation instanceof InternalMin) {
57+
builder.mergeFrom(MinAggregateProtoUtils.toProto((InternalMin) aggregation));
58+
} else if (aggregation instanceof InternalMax) {
59+
builder.mergeFrom(MaxAggregateProtoUtils.toProto((InternalMax) aggregation));
60+
} else {
61+
throw new IllegalArgumentException("Unsupported aggregation type: " + aggregation.getClass().getName());
11862
}
119-
}
12063

121-
/**
122-
* Functional interface for consumers that can throw IOException.
123-
*
124-
* <p>Used by {@link #toProtoInternal} to add converted aggregates to protobuf bucket builders.
125-
*
126-
* @param <T> First argument type (aggregation name)
127-
* @param <U> Second argument type (Aggregate protobuf)
128-
*/
129-
@FunctionalInterface
130-
public interface BiConsumerWithException<T, U> {
131-
/**
132-
* Performs this operation on the given arguments.
133-
*
134-
* @param t First input argument (aggregation name)
135-
* @param u Second input argument (Aggregate protobuf)
136-
* @throws IOException if an I/O error occurs
137-
*/
138-
void accept(T t, U u) throws IOException;
64+
return builder.build();
13965
}
14066
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.transport.grpc.proto.response.search.aggregation;
10+
11+
import org.opensearch.search.aggregations.Aggregation;
12+
import org.opensearch.search.aggregations.Aggregations;
13+
14+
import java.io.IOException;
15+
16+
/**
17+
* Converts Aggregations collection to protobuf map.
18+
*
19+
* <p>Note: Unlike REST which has an "aggregations" JSON object wrapper, the protobuf
20+
* uses map&lt;string, Aggregate&gt; directly in SearchResponse. This class exists to mirror
21+
* the REST pattern (Aggregations.toXContentInternal) for consistency and discoverability,
22+
* even though there's no corresponding "Aggregations" proto message.
23+
*
24+
* <p>Mirrors REST-side {@link Aggregations#toXContentInternal}.
25+
*/
26+
public class AggregationsProtoUtils {
27+
28+
private AggregationsProtoUtils() {
29+
// Utility class
30+
}
31+
32+
/**
33+
* Converts Aggregations to protobuf map entries.
34+
*
35+
* <p>Mirrors {@link Aggregations#toXContentInternal} which iterates through
36+
* aggregations and serializes each one.
37+
*
38+
* @param aggregations The aggregations collection (can be null)
39+
* @param builder The SearchResponse builder to populate
40+
* @throws IOException if an error occurs during conversion
41+
* @see Aggregations#toXContentInternal
42+
*/
43+
public static void toProto(
44+
Aggregations aggregations,
45+
org.opensearch.protobufs.SearchResponse.Builder builder
46+
) throws IOException {
47+
if (aggregations == null) {
48+
return;
49+
}
50+
51+
for (Aggregation agg : aggregations.asList()) {
52+
builder.putAggregations(
53+
agg.getName(),
54+
AggregateProtoUtils.toProto(agg)
55+
);
56+
}
57+
}
58+
}

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/response/search/aggregation/metrics/MaxAggregateProtoUtils.java

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,36 +7,14 @@
77
*/
88
package org.opensearch.transport.grpc.proto.response.search.aggregation.metrics;
99

10-
import org.opensearch.protobufs.MaxAggregate;
10+
import org.opensearch.protobufs.Aggregate;
1111
import org.opensearch.protobufs.NullValue;
12-
import org.opensearch.protobufs.SingleMetricAggregateBaseAllOfValue;
12+
import org.opensearch.protobufs.SingleMetricAggregateBaseValue;
1313
import org.opensearch.search.DocValueFormat;
1414
import org.opensearch.search.aggregations.metrics.InternalMax;
15-
import org.opensearch.transport.grpc.proto.response.search.aggregation.AggregateProtoUtils;
1615

1716
/**
18-
* Utility class for converting InternalMax aggregation results to MaxAggregate protocol buffer format.
19-
*
20-
* <p>This utility mirrors the REST API serialization logic found in {@link InternalMax#doXContentBody}
21-
* but produces Protocol Buffer messages instead of XContent (JSON/YAML). The conversion handles:
22-
* <ul>
23-
* <li>Maximum numeric value with special handling for empty result sets (NEGATIVE_INFINITY → NULL_VALUE)</li>
24-
* <li>Formatted string representation (value_as_string) - only included when format != RAW</li>
25-
* <li>Aggregation metadata</li>
26-
* </ul>
27-
*
28-
* <p>The field mapping between REST API and gRPC protobuf:
29-
* <pre>
30-
* REST (XContent) gRPC (Protobuf)
31-
* --------------- ---------------
32-
* "value" → value (SingleMetricAggregateBaseAllOfValue)
33-
* "value_as_string" → value_as_string (optional, format-dependent)
34-
* metadata → meta (ObjectMap)
35-
* </pre>
36-
*
37-
* @see InternalMax
38-
* @see InternalMax#doXContentBody
39-
* @see MaxAggregate
17+
* Utility class for converting {@link InternalMax} aggregation results to Aggregate protocol buffer format.
4018
*/
4119
public class MaxAggregateProtoUtils {
4220

@@ -45,24 +23,22 @@ private MaxAggregateProtoUtils() {
4523
}
4624

4725
/**
48-
* Converts an InternalMax aggregation result to a MaxAggregate protobuf message.
26+
* Converts an InternalMax aggregation result to Aggregate proto.
4927
*
5028
* <p>Mirrors {@link InternalMax#doXContentBody} structure exactly.
29+
* Metadata is handled centrally by {@link org.opensearch.transport.grpc.proto.response.search.aggregation.AggregateProtoUtils#toProto}.
5130
*
5231
* @param internalMax The InternalMax aggregation result from OpenSearch
53-
* @return A MaxAggregate protobuf message with all applicable fields populated
32+
* @return Aggregate proto (metadata not included)
5433
* @see InternalMax#doXContentBody
55-
* @see MaxAggregate
5634
*/
57-
public static MaxAggregate toProto(InternalMax internalMax) {
58-
MaxAggregate.Builder builder = MaxAggregate.newBuilder();
35+
public static Aggregate toProto(InternalMax internalMax) {
36+
Aggregate.Builder builder = Aggregate.newBuilder();
5937

60-
// Line 100: boolean hasValue = !Double.isInfinite(max);
6138
double max = internalMax.getValue();
6239
boolean hasValue = !Double.isInfinite(max);
6340

64-
// Line 101: builder.field(CommonFields.VALUE.getPreferredName(), hasValue ? max : null);
65-
SingleMetricAggregateBaseAllOfValue.Builder valueBuilder = SingleMetricAggregateBaseAllOfValue.newBuilder();
41+
SingleMetricAggregateBaseValue.Builder valueBuilder = SingleMetricAggregateBaseValue.newBuilder();
6642
if (hasValue) {
6743
valueBuilder.setDouble(max);
6844
} else {
@@ -74,8 +50,6 @@ public static MaxAggregate toProto(InternalMax internalMax) {
7450
builder.setValueAsString(internalMax.getValueAsString());
7551
}
7652

77-
AggregateProtoUtils.setMetadataIfPresent(internalMax.getMetadata(), builder::setMeta);
78-
7953
return builder.build();
8054
}
8155
}

0 commit comments

Comments
 (0)