Skip to content

Commit 7252dcb

Browse files
yiyuabcclaude
andcommitted
Add plugin support for response-side aggregate converters
Enable plugins to register custom AggregateProtoConverter implementations for serializing custom aggregation results to protobuf responses. Changes: - Load AggregateProtoConverter extensions from plugins in loadExtensions() - Create and initialize AggregateProtoConverterRegistryImpl in createComponents() - Set global registry via AggregateProtoUtils.setRegistry() - Register external converters in the SPI registry - Add logging for discovery and registration of aggregate converters This completes the extensibility story - plugins can now: 1. Register custom request converters (AggregationBuilderProtoConverter) 2. Register custom response converters (AggregateProtoConverter) Both directions are now fully pluggable and extensible. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 3aec589 commit 7252dcb

File tree

2 files changed

+50
-2
lines changed

2 files changed

+50
-2
lines changed

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,11 @@
3636
import org.opensearch.transport.grpc.proto.request.search.aggregation.AggregationBuilderProtoConverterRegistryImpl;
3737
import org.opensearch.transport.grpc.proto.request.search.query.AbstractQueryBuilderProtoUtils;
3838
import org.opensearch.transport.grpc.proto.request.search.query.QueryBuilderProtoConverterRegistryImpl;
39+
import org.opensearch.transport.grpc.proto.response.search.aggregation.AggregateProtoConverterRegistryImpl;
40+
import org.opensearch.transport.grpc.proto.response.search.aggregation.AggregateProtoUtils;
3941
import org.opensearch.transport.grpc.services.DocumentServiceImpl;
4042
import org.opensearch.transport.grpc.services.SearchServiceImpl;
43+
import org.opensearch.transport.grpc.spi.AggregateProtoConverter;
4144
import org.opensearch.transport.grpc.spi.AggregationBuilderProtoConverter;
4245
import org.opensearch.transport.grpc.spi.GrpcInterceptorProvider;
4346
import org.opensearch.transport.grpc.spi.GrpcInterceptorProvider.OrderedGrpcInterceptor;
@@ -85,9 +88,11 @@ public final class GrpcPlugin extends Plugin implements NetworkPlugin, Extensibl
8588

8689
private final List<QueryBuilderProtoConverter> queryConverters = new ArrayList<>();
8790
private final List<AggregationBuilderProtoConverter> aggregationConverters = new ArrayList<>();
91+
private final List<AggregateProtoConverter> aggregateConverters = new ArrayList<>();
8892
private final List<GrpcServiceFactory> servicesFactory = new ArrayList<>();
8993
private QueryBuilderProtoConverterRegistryImpl queryRegistry;
9094
private AggregationBuilderProtoConverterRegistryImpl aggregationRegistry;
95+
private AggregateProtoConverterRegistryImpl aggregateRegistry;
9196
private AbstractQueryBuilderProtoUtils queryUtils;
9297
private GrpcInterceptorChain serverInterceptor; // Initialized in createComponents
9398
private List<GrpcInterceptorProvider> interceptorProviders = new ArrayList<>();
@@ -140,6 +145,23 @@ public void loadExtensions(ExtensiblePlugin.ExtensionLoader loader) {
140145
logger.info("No AggregationBuilderProtoConverter extensions found from other plugins");
141146
}
142147

148+
// Load aggregate converters (response-side) from other plugins
149+
List<AggregateProtoConverter> aggResponseExtensions = loader.loadExtensions(AggregateProtoConverter.class);
150+
if (aggResponseExtensions != null && !aggResponseExtensions.isEmpty()) {
151+
logger.info("Loading {} AggregateProtoConverter extensions from other plugins", aggResponseExtensions.size());
152+
for (AggregateProtoConverter converter : aggResponseExtensions) {
153+
logger.info(
154+
"Discovered AggregateProtoConverter extension: {} (handles: {})",
155+
converter.getClass().getName(),
156+
converter.getHandledAggregationType().getName()
157+
);
158+
aggregateConverters.add(converter);
159+
}
160+
logger.info("Successfully loaded {} AggregateProtoConverter extensions", aggResponseExtensions.size());
161+
} else {
162+
logger.info("No AggregateProtoConverter extensions found from other plugins");
163+
}
164+
143165
List<GrpcInterceptorProvider> providers = loader.loadExtensions(GrpcInterceptorProvider.class);
144166
if (providers != null) {
145167
// Note: ThreadContext will be provided during component creation
@@ -417,6 +439,12 @@ public Collection<Object> createComponents(
417439
// Create the aggregation registry
418440
this.aggregationRegistry = new AggregationBuilderProtoConverterRegistryImpl();
419441

442+
// Create the aggregate registry (response-side)
443+
this.aggregateRegistry = new AggregateProtoConverterRegistryImpl();
444+
445+
// Set the global aggregate registry for response serialization
446+
AggregateProtoUtils.setRegistry(aggregateRegistry);
447+
420448
// Create the query utils instance
421449
this.queryUtils = new AbstractQueryBuilderProtoUtils(queryRegistry);
422450

@@ -475,6 +503,28 @@ public Collection<Object> createComponents(
475503
logger.info("No external AggregationBuilderProtoConverter(s) to register");
476504
}
477505

506+
// Register external aggregate converters (response-side)
507+
if (!aggregateConverters.isEmpty()) {
508+
logger.info(
509+
"Registering {} external AggregateProtoConverter(s)",
510+
aggregateConverters.size()
511+
);
512+
for (AggregateProtoConverter converter : aggregateConverters) {
513+
logger.info(
514+
"Processing external aggregate converter: {} (handles: {})",
515+
converter.getClass().getName(),
516+
converter.getHandledAggregationType().getName()
517+
);
518+
519+
// Register the converter in the SPI registry
520+
aggregateRegistry.getSpiRegistry().registerConverter(converter);
521+
logger.info("Registered aggregate converter: {}", converter.getClass().getName());
522+
}
523+
logger.info("Successfully registered all {} external aggregate converters", aggregateConverters.size());
524+
} else {
525+
logger.info("No external AggregateProtoConverter(s) to register");
526+
}
527+
478528
return super.createComponents(
479529
client,
480530
clusterService,

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/response/search/SearchResponseSectionsProtoUtils.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,6 @@ protected static void toProto(org.opensearch.protobufs.SearchResponse.Builder bu
8585
* @throws UnsupportedOperationException if unsupported features are present
8686
*/
8787
private static void checkUnsupportedFeatures(SearchResponse response) {
88-
// Aggregations are now supported, removed from unsupported list
89-
9088
// TODO: Implement suggest conversion
9189
if (response.getSuggest() != null) {
9290
throw new UnsupportedOperationException("suggest responses are not supported yet");

0 commit comments

Comments
 (0)