Skip to content

Commit c9295da

Browse files
yiyuabcclaude
andcommitted
Refactor gRPC converters to use registry pattern
This commit implements an extensible registry pattern for both response-side and request-side gRPC converters, improving consistency and maintainability. Response-side changes: - Add AggregateProtoConverter and AggregateProtoConverterRegistry SPI interfaces - Implement AggregateProtoConverterSpiRegistry with class hierarchy lookup - Create AggregateProtoConverterRegistryImpl as public facade - Add MinAggregateProtoConverter and MaxAggregateProtoConverter implementations - Update AggregateProtoUtils to delegate to registry instead of instanceof checks - Add comprehensive test suite (AggregateProtoConverterRegistryTests) Request-side changes: - Inline ObjectParserProtoUtils logic into ValuesSourceAggregationProtoUtils - Remove unnecessary ObjectParserProtoUtils abstraction layer - Simplify field parsing by directly calling builder methods Benefits: - Extensible: New aggregation types can be added without modifying central dispatcher - Consistent: Response-side now mirrors request-side registry pattern - Type-safe: Class-based dispatch with support for subclasses and mocks - Simplified: Reduced complexity by removing one abstraction layer All 1,280+ transport-grpc tests passing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 29bed9d commit c9295da

File tree

13 files changed

+651
-158
lines changed

13 files changed

+651
-158
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.transport.grpc.spi;
10+
11+
import org.opensearch.protobufs.Aggregate;
12+
import org.opensearch.search.aggregations.InternalAggregation;
13+
14+
import java.io.IOException;
15+
16+
/**
17+
* SPI interface for converting OpenSearch InternalAggregation objects to Protocol Buffer Aggregate messages.
18+
* Follows the same pattern as {@link AggregationBuilderProtoConverter} for request-side conversions.
19+
*
20+
* <p>The registry handles metadata centrally. Converters should delegate to existing
21+
* {@code *AggregateProtoUtils} classes for the actual conversion logic.
22+
*/
23+
public interface AggregateProtoConverter {
24+
25+
/**
26+
* Returns the InternalAggregation subclass this converter handles.
27+
*
28+
* @return The class type of the aggregation this converter supports
29+
*/
30+
Class<? extends InternalAggregation> getHandledAggregationType();
31+
32+
/**
33+
* Converts an InternalAggregation to its Protocol Buffer Aggregate.Builder representation.
34+
* Returns a builder to allow the registry to construct the final Aggregate.
35+
* Mirrors REST-side {@link org.opensearch.search.aggregations.InternalAggregation#toXContent}
36+
*
37+
* @param aggregation The InternalAggregation to convert (guaranteed to be of the handled type)
38+
* @return An Aggregate.Builder with the aggregation-specific fields populated
39+
* @throws IOException if an error occurs during protobuf conversion
40+
*/
41+
Aggregate.Builder toProto(InternalAggregation aggregation) throws IOException;
42+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.transport.grpc.spi;
10+
11+
import org.opensearch.protobufs.Aggregate;
12+
import org.opensearch.search.aggregations.InternalAggregation;
13+
14+
import java.io.IOException;
15+
16+
/**
17+
* SPI interface for the aggregate converter registry.
18+
* Provides the main entry point for converting InternalAggregation objects to Protocol Buffer Aggregate messages.
19+
*/
20+
public interface AggregateProtoConverterRegistry {
21+
22+
/**
23+
* Converts an InternalAggregation to its Protocol Buffer Aggregate representation.
24+
* Handles metadata and delegates to the appropriate converter.
25+
*
26+
* @param aggregation The InternalAggregation to convert (must not be null)
27+
* @return The corresponding Protocol Buffer Aggregate message
28+
* @throws IllegalArgumentException if aggregation is null or type is not supported
29+
* @throws IOException if an error occurs during protobuf conversion
30+
*/
31+
Aggregate toProto(InternalAggregation aggregation) throws IOException;
32+
}

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,11 @@
3636
import org.opensearch.transport.grpc.proto.request.search.aggregation.AggregationBuilderProtoConverterRegistryImpl;
3737
import org.opensearch.transport.grpc.proto.request.search.query.AbstractQueryBuilderProtoUtils;
3838
import org.opensearch.transport.grpc.proto.request.search.query.QueryBuilderProtoConverterRegistryImpl;
39+
import org.opensearch.transport.grpc.proto.response.search.aggregation.AggregateProtoConverterRegistryImpl;
40+
import org.opensearch.transport.grpc.proto.response.search.aggregation.AggregateProtoUtils;
3941
import org.opensearch.transport.grpc.services.DocumentServiceImpl;
4042
import org.opensearch.transport.grpc.services.SearchServiceImpl;
43+
import org.opensearch.transport.grpc.spi.AggregateProtoConverter;
4144
import org.opensearch.transport.grpc.spi.AggregationBuilderProtoConverter;
4245
import org.opensearch.transport.grpc.spi.GrpcInterceptorProvider;
4346
import org.opensearch.transport.grpc.spi.GrpcInterceptorProvider.OrderedGrpcInterceptor;
@@ -85,9 +88,11 @@ public final class GrpcPlugin extends Plugin implements NetworkPlugin, Extensibl
8588

8689
private final List<QueryBuilderProtoConverter> queryConverters = new ArrayList<>();
8790
private final List<AggregationBuilderProtoConverter> aggregationConverters = new ArrayList<>();
91+
private final List<AggregateProtoConverter> aggregateConverters = new ArrayList<>();
8892
private final List<GrpcServiceFactory> servicesFactory = new ArrayList<>();
8993
private QueryBuilderProtoConverterRegistryImpl queryRegistry;
9094
private AggregationBuilderProtoConverterRegistryImpl aggregationRegistry;
95+
private AggregateProtoConverterRegistryImpl aggregateRegistry;
9196
private AbstractQueryBuilderProtoUtils queryUtils;
9297
private GrpcInterceptorChain serverInterceptor; // Initialized in createComponents
9398
private List<GrpcInterceptorProvider> interceptorProviders = new ArrayList<>();
@@ -140,6 +145,23 @@ public void loadExtensions(ExtensiblePlugin.ExtensionLoader loader) {
140145
logger.info("No AggregationBuilderProtoConverter extensions found from other plugins");
141146
}
142147

148+
// Load aggregate converters (response-side) from other plugins
149+
List<AggregateProtoConverter> aggResponseExtensions = loader.loadExtensions(AggregateProtoConverter.class);
150+
if (aggResponseExtensions != null && !aggResponseExtensions.isEmpty()) {
151+
logger.info("Loading {} AggregateProtoConverter extensions from other plugins", aggResponseExtensions.size());
152+
for (AggregateProtoConverter converter : aggResponseExtensions) {
153+
logger.info(
154+
"Discovered AggregateProtoConverter extension: {} (handles: {})",
155+
converter.getClass().getName(),
156+
converter.getHandledAggregationType().getName()
157+
);
158+
aggregateConverters.add(converter);
159+
}
160+
logger.info("Successfully loaded {} AggregateProtoConverter extensions", aggResponseExtensions.size());
161+
} else {
162+
logger.info("No AggregateProtoConverter extensions found from other plugins");
163+
}
164+
143165
List<GrpcInterceptorProvider> providers = loader.loadExtensions(GrpcInterceptorProvider.class);
144166
if (providers != null) {
145167
// Note: ThreadContext will be provided during component creation
@@ -417,6 +439,12 @@ public Collection<Object> createComponents(
417439
// Create the aggregation registry
418440
this.aggregationRegistry = new AggregationBuilderProtoConverterRegistryImpl();
419441

442+
// Create the aggregate registry (response-side)
443+
this.aggregateRegistry = new AggregateProtoConverterRegistryImpl();
444+
445+
// Set the global aggregate registry for response serialization
446+
AggregateProtoUtils.setRegistry(aggregateRegistry);
447+
420448
// Create the query utils instance
421449
this.queryUtils = new AbstractQueryBuilderProtoUtils(queryRegistry);
422450

@@ -475,6 +503,28 @@ public Collection<Object> createComponents(
475503
logger.info("No external AggregationBuilderProtoConverter(s) to register");
476504
}
477505

506+
// Register external aggregate converters (response-side)
507+
if (!aggregateConverters.isEmpty()) {
508+
logger.info(
509+
"Registering {} external AggregateProtoConverter(s)",
510+
aggregateConverters.size()
511+
);
512+
for (AggregateProtoConverter converter : aggregateConverters) {
513+
logger.info(
514+
"Processing external aggregate converter: {} (handles: {})",
515+
converter.getClass().getName(),
516+
converter.getHandledAggregationType().getName()
517+
);
518+
519+
// Register the converter in the SPI registry
520+
aggregateRegistry.getSpiRegistry().registerConverter(converter);
521+
logger.info("Registered aggregate converter: {}", converter.getClass().getName());
522+
}
523+
logger.info("Successfully registered all {} external aggregate converters", aggregateConverters.size());
524+
} else {
525+
logger.info("No external AggregateProtoConverter(s) to register");
526+
}
527+
478528
return super.createComponents(
479529
client,
480530
clusterService,

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/common/ObjectParserProtoUtils.java

Lines changed: 0 additions & 76 deletions
This file was deleted.

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/search/aggregation/support/ValuesSourceAggregationProtoUtils.java

Lines changed: 51 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,8 @@
1111
import org.opensearch.script.Script;
1212
import org.opensearch.search.aggregations.support.ValuesSourceAggregationBuilder;
1313
import org.opensearch.search.aggregations.support.ValueType;
14-
import org.opensearch.transport.grpc.proto.request.common.ObjectParserProtoUtils;
1514
import org.opensearch.transport.grpc.util.ProtobufEnumUtils;
1615

17-
import java.util.function.Function;
18-
1916
/**
2017
* Utility class for parsing common fields from ValuesSource-based aggregation Protocol Buffer messages.
2118
*/
@@ -28,7 +25,7 @@ private ValuesSourceAggregationProtoUtils() {
2825
/**
2926
* Declares common fields for ValuesSource aggregations, mirroring {@link ValuesSourceAggregationBuilder#declareFields}.
3027
*
31-
* <p>Uses {@link ObjectParserProtoUtils} for explicit, declarative field parsing that mirrors REST's ObjectParser pattern.
28+
* <p>Directly sets fields on the builder, mirroring REST's ObjectParser pattern.
3229
*
3330
* @param builder The aggregation builder to set fields on
3431
* @param fields Wrapper containing all ValuesSource field values from proto
@@ -50,7 +47,7 @@ public static void declareFields(
5047
/**
5148
* Declares common fields for ValuesSource aggregations, mirroring {@link ValuesSourceAggregationBuilder#declareFields}.
5249
*
53-
* <p>Uses {@link ObjectParserProtoUtils} for explicit, declarative field parsing that mirrors REST's ObjectParser pattern.
50+
* <p>Directly sets field values when present, mirroring REST's ObjectParser behavior without the abstraction layer.
5451
*
5552
* @param builder The aggregation builder to set fields on
5653
* @param fields Wrapper containing all ValuesSource field values from proto
@@ -68,60 +65,65 @@ public static void declareFields(
6865
boolean timezoneAware,
6966
boolean fieldRequired
7067
) {
71-
ObjectParserProtoUtils.declareField(
72-
builder,
73-
ValuesSourceAggregationBuilder::field,
74-
fields.getField(),
75-
Function.identity(),
76-
ParseField.CommonFields.FIELD.getPreferredName()
77-
);
68+
// Field declaration - mirrors REST ObjectParser.declareField behavior
69+
if (fields.getField() != null) {
70+
try {
71+
builder.field(fields.getField());
72+
} catch (Exception e) {
73+
throw new IllegalArgumentException("Failed to parse [field]", e);
74+
}
75+
}
7876

79-
ObjectParserProtoUtils.declareField(
80-
builder,
81-
ValuesSourceAggregationBuilder::missing,
82-
fields.getMissing(),
83-
Function.identity(),
84-
ParseField.CommonFields.MISSING.getPreferredName()
85-
);
77+
// Missing value declaration
78+
if (fields.getMissing() != null) {
79+
try {
80+
builder.missing(fields.getMissing());
81+
} catch (Exception e) {
82+
throw new IllegalArgumentException("Failed to parse [missing]", e);
83+
}
84+
}
8685

87-
ObjectParserProtoUtils.declareField(
88-
builder,
89-
ValuesSourceAggregationBuilder::userValueTypeHint,
90-
fields.getValueTypeProto(),
91-
proto -> {
92-
String valueTypeStr = ProtobufEnumUtils.convertToString(proto);
93-
if (valueTypeStr == null) {
94-
return null;
95-
}
96-
ValueType valueType = ValueType.lenientParse(valueTypeStr);
97-
if (valueType == null) {
98-
throw new IllegalArgumentException("Unknown value type [" + valueTypeStr + "]");
86+
// Value type declaration with transformation
87+
if (fields.getValueTypeProto() != null) {
88+
try {
89+
String valueTypeStr = ProtobufEnumUtils.convertToString(fields.getValueTypeProto());
90+
if (valueTypeStr != null) {
91+
ValueType valueType = ValueType.lenientParse(valueTypeStr);
92+
if (valueType == null) {
93+
throw new IllegalArgumentException("Unknown value type [" + valueTypeStr + "]");
94+
}
95+
builder.userValueTypeHint(valueType);
9996
}
100-
return valueType;
101-
},
102-
ValueType.VALUE_TYPE.getPreferredName()
103-
);
97+
} catch (IllegalArgumentException e) {
98+
throw e;
99+
} catch (Exception e) {
100+
throw new IllegalArgumentException("Failed to parse [value_type]", e);
101+
}
102+
}
104103

104+
// Format declaration (conditional)
105105
if (formattable) {
106-
ObjectParserProtoUtils.declareField(
107-
builder,
108-
ValuesSourceAggregationBuilder::format,
109-
fields.getFormat(),
110-
Function.identity(),
111-
ParseField.CommonFields.FORMAT.getPreferredName()
112-
);
106+
if (fields.getFormat() != null) {
107+
try {
108+
builder.format(fields.getFormat());
109+
} catch (Exception e) {
110+
throw new IllegalArgumentException("Failed to parse [format]", e);
111+
}
112+
}
113113
}
114114

115+
// Field/script requirement validation
115116
if (scriptable) {
116-
ObjectParserProtoUtils.declareField(
117-
builder,
118-
ValuesSourceAggregationBuilder::script,
119-
fields.getScript(),
120-
Function.identity(),
121-
Script.SCRIPT_PARSE_FIELD.getPreferredName()
122-
);
117+
if (fields.getScript() != null) {
118+
try {
119+
builder.script(fields.getScript());
120+
} catch (Exception e) {
121+
throw new IllegalArgumentException("Failed to parse [script]", e);
122+
}
123+
}
123124

124125
if (fieldRequired) {
126+
// throw exception when neither field nor script is specified
125127
if (fields.getField() == null && fields.getScript() == null) {
126128
throw new IllegalArgumentException(
127129
"Required one of fields ["

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/response/search/SearchResponseSectionsProtoUtils.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,6 @@ protected static void toProto(org.opensearch.protobufs.SearchResponse.Builder bu
8585
* @throws UnsupportedOperationException if unsupported features are present
8686
*/
8787
private static void checkUnsupportedFeatures(SearchResponse response) {
88-
// Aggregations are now supported, removed from unsupported list
89-
9088
// TODO: Implement suggest conversion
9189
if (response.getSuggest() != null) {
9290
throw new UnsupportedOperationException("suggest responses are not supported yet");

0 commit comments

Comments
 (0)