Skip to content

Commit 7ffbffc

Browse files
committed
GH-10623: Add GrpcInboundGateway
Fixes: #10623 * Add `spring-integration-grpc` module * Manage respective new `io.grpc` dependencies * Add Checkstyle suppression for new Proto-generated classes * Add `GrpcHeaders` for convenient API * Implement `GrpcInboundGateway` with a proxy for gRPC service methods * Add `TestInProcessConfiguration` for a common in-process infrastructure * Document new feature
1 parent 46110f5 commit 7ffbffc

File tree

12 files changed

+883
-0
lines changed

12 files changed

+883
-0
lines changed

build.gradle

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ ext {
6262
graalvmVersion = '25.0.1'
6363
greenmailVersion = '2.1.8'
6464
groovyVersion = '5.0.3'
65+
grpcVersion = '1.77.0'
6566
hamcrestVersion = '3.0'
6667
hazelcastVersion = '5.6.0'
6768
hibernateVersion = '7.1.12.Final'
@@ -173,6 +174,7 @@ allprojects {
173174
dependencyManagement(platform("org.springframework.security:spring-security-bom:$springSecurityVersion"))
174175
dependencyManagement(platform("org.springframework.ws:spring-ws-bom:$springWsVersion"))
175176
dependencyManagement(platform("org.mongodb:mongodb-driver-bom:$mongoDriverVersion"))
177+
dependencyManagement(platform("io.grpc:grpc-bom:$grpcVersion"))
176178
}
177179
}
178180

@@ -627,6 +629,45 @@ project('spring-integration-groovy') {
627629
}
628630
}
629631

632+
project('spring-integration-grpc') {
633+
description = 'Spring Integration gRPC Support'
634+
635+
apply plugin: 'com.google.protobuf'
636+
637+
configurations {
638+
[compileProtoPath, testCompileProtoPath].each {
639+
it.extendsFrom(dependencyManagement)
640+
}
641+
}
642+
643+
dependencies {
644+
api 'io.grpc:grpc-stub'
645+
646+
testImplementation 'io.grpc:grpc-protobuf'
647+
testImplementation 'io.grpc:grpc-inprocess'
648+
testImplementation "com.google.protobuf:protobuf-java:$protobufVersion"
649+
}
650+
651+
protobuf {
652+
protoc {
653+
artifact = "com.google.protobuf:protoc:$protobufVersion"
654+
}
655+
plugins {
656+
grpc {
657+
artifact = "io.grpc:protoc-gen-grpc-java:$grpcVersion"
658+
}
659+
}
660+
generateProtoTasks {
661+
// Only generate for test source set, not main
662+
ofSourceSet('test')*.plugins {
663+
grpc {
664+
option '@generated=omit'
665+
}
666+
}
667+
}
668+
}
669+
}
670+
630671
project('spring-integration-hazelcast') {
631672
description = 'Spring Integration Hazelcast Support'
632673
dependencies {
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.grpc;
18+
19+
/**
20+
* Constants for gRPC-specific message headers.
21+
*
22+
* @author Artem Bilan
23+
*
24+
* @since 7.1
25+
*/
26+
public final class GrpcHeaders {
27+
28+
/**
29+
* The prefix for all gRPC-specific headers.
30+
*/
31+
public static final String PREFIX = "grpc_";
32+
33+
/**
34+
* The header containing the called gRPC service name.
35+
*/
36+
public static final String SERVICE = PREFIX + "service";
37+
38+
/**
39+
* The header containing the gRPC service method name.
40+
*/
41+
public static final String SERVICE_METHOD = PREFIX + "serviceMethod";
42+
43+
/**
44+
* The header containing the gRPC service method type.
45+
* One of the {@link io.grpc.MethodDescriptor.MethodType}
46+
*/
47+
public static final String METHOD_TYPE = PREFIX + "methodType";
48+
49+
/**
50+
* The header containing the gRPC service method schema descriptor.
51+
* A value from the {@link io.grpc.MethodDescriptor#getSchemaDescriptor()}
52+
*/
53+
public static final String SCHEMA_DESCRIPTOR = PREFIX + "schemaDescriptor";
54+
55+
private GrpcHeaders() {
56+
}
57+
58+
}
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.grpc.inbound;
18+
19+
import java.lang.reflect.Method;
20+
import java.util.Arrays;
21+
22+
import io.grpc.BindableService;
23+
import io.grpc.MethodDescriptor;
24+
import io.grpc.ServerServiceDefinition;
25+
import io.grpc.stub.StreamObserver;
26+
import org.aopalliance.intercept.MethodInterceptor;
27+
import org.aopalliance.intercept.MethodInvocation;
28+
import org.jspecify.annotations.Nullable;
29+
import reactor.core.publisher.Flux;
30+
import reactor.core.publisher.Mono;
31+
import reactor.core.publisher.Sinks;
32+
33+
import org.springframework.aop.framework.ProxyFactory;
34+
import org.springframework.core.log.LogMessage;
35+
import org.springframework.integration.gateway.DefaultMethodInvokingMethodInterceptor;
36+
import org.springframework.integration.gateway.MessagingGatewaySupport;
37+
import org.springframework.integration.grpc.GrpcHeaders;
38+
import org.springframework.messaging.Message;
39+
import org.springframework.util.Assert;
40+
import org.springframework.util.ClassUtils;
41+
import org.springframework.util.ReflectionUtils;
42+
import org.springframework.util.StringUtils;
43+
44+
/**
45+
* The {@link MessagingGatewaySupport} implementation for gRPC {@link BindableService}.
46+
* An instance of this class requires a {@link BindableService} class from the gRPC service definition.
47+
* Only standard 'grpc' services are supported which implements a generated {@code AsyncService} interface.
48+
* This gateway is a {@link BindableService} by itself to be registered with the gRPC server.
49+
* An internal proxy is created to intercept gRPC method calls and convert them to Spring Integration messages.
50+
* A reply from the downstream flow is produced back to the gRPC response payload.
51+
* The request payload is a Proto message from gRPC request.
52+
* The reply payload must be a Proto message for gRPC response.
53+
* <p>
54+
* This gateway supports all the gRPC {@link MethodDescriptor.MethodType} types.
55+
* All the requests are produced to downstream flow in a reactive manner via {@link #sendAndReceiveMessageReactive(Object)}.
56+
* The {@link MethodDescriptor.MethodType#UNARY} and {@link MethodDescriptor.MethodType#BIDI_STREAMING}
57+
* are same from the downstream handling logic perspective.
58+
* The {@link MethodDescriptor.MethodType#CLIENT_STREAMING} produces a {@link Flux} of gRPC request payloads.
59+
* The {@link MethodDescriptor.MethodType#SERVER_STREAMING} reply can be a single entity or a {@link Flux} of them.
60+
* <p>
61+
* For convenience, the {@link GrpcHeaders} are populated into a request message.
62+
* Such information can be used, for example, in downstream flow for routing.
63+
*
64+
* @author Artem Bilan
65+
*
66+
* @since 7.1
67+
*/
68+
public class GrpcInboundGateway extends MessagingGatewaySupport implements BindableService {
69+
70+
private final Class<? extends BindableService> grpcServiceClass;
71+
72+
@SuppressWarnings("NullAway.Init")
73+
private Object asyncService;
74+
75+
@SuppressWarnings("NullAway.Init")
76+
private ServerServiceDefinition serverServiceDefinition;
77+
78+
public GrpcInboundGateway(Class<? extends BindableService> grpcServiceClass) {
79+
this.grpcServiceClass = grpcServiceClass;
80+
}
81+
82+
@Override
83+
protected void onInit() {
84+
super.onInit();
85+
Class<?>[] serviceInterfaces =
86+
ClassUtils.getAllInterfacesForClass(this.grpcServiceClass, getApplicationContext().getClassLoader());
87+
88+
for (Class<?> serviceInterface : serviceInterfaces) {
89+
if ("AsyncService".equals(serviceInterface.getSimpleName())) {
90+
createServiceProxyAndServerDefinition(serviceInterface);
91+
break;
92+
}
93+
}
94+
95+
Assert.state(this.asyncService != null,
96+
"Only standard 'grpc' service are supported providing an 'AsyncService' contract.");
97+
}
98+
99+
@SuppressWarnings("NullAway")
100+
private void createServiceProxyAndServerDefinition(Class<?> serviceInterface) {
101+
ProxyFactory proxyFactory = new ProxyFactory(serviceInterface, (MethodInterceptor) this::interceptGrpc);
102+
proxyFactory.addAdvice(new DefaultMethodInvokingMethodInterceptor());
103+
this.asyncService = proxyFactory.getProxy(getApplicationContext().getClassLoader());
104+
Method bindServiceMethod =
105+
ClassUtils.getStaticMethod(this.grpcServiceClass.getEnclosingClass(), "bindService", serviceInterface);
106+
107+
this.serverServiceDefinition =
108+
(ServerServiceDefinition) ReflectionUtils.invokeMethod(bindServiceMethod, null, this.asyncService);
109+
}
110+
111+
@Override
112+
public ServerServiceDefinition bindService() {
113+
return this.serverServiceDefinition;
114+
}
115+
116+
@SuppressWarnings({"unchecked", "NullAway"})
117+
private @Nullable Object interceptGrpc(MethodInvocation invocation) {
118+
Object[] arguments = invocation.getArguments();
119+
120+
String fullMethodName =
121+
this.serverServiceDefinition.getServiceDescriptor().getName() +
122+
'/' +
123+
StringUtils.capitalize(invocation.getMethod().getName());
124+
125+
MethodDescriptor<?, ?> serviceMethod =
126+
this.serverServiceDefinition.getMethod(fullMethodName)
127+
.getMethodDescriptor();
128+
129+
logger.debug(LogMessage.format("gRPC request for [%s] with arguments %s",
130+
fullMethodName, Arrays.toString(arguments)));
131+
132+
switch (serviceMethod.getType()) {
133+
case UNARY -> {
134+
unary(serviceMethod, arguments[0], (StreamObserver<Object>) arguments[1]);
135+
return null;
136+
}
137+
case SERVER_STREAMING -> {
138+
serverStreaming(serviceMethod, arguments[0], (StreamObserver<Object>) arguments[1]);
139+
return null;
140+
}
141+
case CLIENT_STREAMING -> {
142+
return clientStreaming(serviceMethod, (StreamObserver<Object>) arguments[0]);
143+
}
144+
case BIDI_STREAMING -> {
145+
return bidiStreaming(serviceMethod, (StreamObserver<Object>) arguments[0]);
146+
}
147+
default -> throw new IllegalStateException("Unknown gRPC method type: " + serviceMethod.getType());
148+
}
149+
}
150+
151+
private void unary(MethodDescriptor<?, ?> methodDescriptor, Object requestPayload,
152+
StreamObserver<Object> responseObserver) {
153+
154+
sendRequestAndProduceReply(methodDescriptor, requestPayload)
155+
.subscribe(responseObserver::onNext, responseObserver::onError, responseObserver::onCompleted);
156+
}
157+
158+
private void serverStreaming(MethodDescriptor<?, ?> methodDescriptor, Object requestPayload,
159+
StreamObserver<Object> responseObserver) {
160+
161+
sendRequestAndProduceReply(methodDescriptor, requestPayload)
162+
.flatMapMany(payload -> payload instanceof Flux<?> flux ? flux : Flux.just(payload))
163+
.subscribe(responseObserver::onNext, responseObserver::onError, responseObserver::onCompleted);
164+
}
165+
166+
private StreamObserver<?> clientStreaming(MethodDescriptor<?, ?> methodDescriptor,
167+
StreamObserver<Object> responseObserver) {
168+
169+
Sinks.Many<Object> requestPayload = Sinks.many().unicast().onBackpressureBuffer();
170+
171+
return new StreamObserver<>() {
172+
173+
@Override
174+
public void onNext(Object value) {
175+
requestPayload.tryEmitNext(value);
176+
}
177+
178+
@Override
179+
public void onError(Throwable t) {
180+
throw new IllegalStateException(
181+
"gRPC request [" + methodDescriptor.getFullMethodName() + "] has failed", t);
182+
}
183+
184+
@Override
185+
public void onCompleted() {
186+
requestPayload.tryEmitComplete();
187+
sendRequestAndProduceReply(methodDescriptor, requestPayload.asFlux())
188+
.subscribe(responseObserver::onNext, responseObserver::onError,
189+
responseObserver::onCompleted);
190+
}
191+
192+
};
193+
}
194+
195+
private StreamObserver<?> bidiStreaming(MethodDescriptor<?, ?> methodDescriptor,
196+
StreamObserver<Object> responseObserver) {
197+
198+
return new StreamObserver<>() {
199+
200+
@Override
201+
public void onNext(Object value) {
202+
sendRequestAndProduceReply(methodDescriptor, value)
203+
.subscribe(responseObserver::onNext, responseObserver::onError);
204+
}
205+
206+
@Override
207+
public void onError(Throwable t) {
208+
throw new IllegalStateException(
209+
"gRPC request [" + methodDescriptor.getFullMethodName() + "] has failed", t);
210+
}
211+
212+
@Override
213+
public void onCompleted() {
214+
responseObserver.onCompleted();
215+
}
216+
217+
};
218+
}
219+
220+
private Mono<?> sendRequestAndProduceReply(MethodDescriptor<?, ?> serviceMethod, Object requestPayload) {
221+
Message<?> requestMessage =
222+
getMessageBuilderFactory()
223+
.withPayload(requestPayload)
224+
.setHeader(GrpcHeaders.SERVICE, serviceMethod.getServiceName())
225+
.setHeader(GrpcHeaders.SERVICE_METHOD, serviceMethod.getBareMethodName())
226+
.setHeader(GrpcHeaders.METHOD_TYPE, serviceMethod.getType())
227+
.setHeader(GrpcHeaders.SCHEMA_DESCRIPTOR, serviceMethod.getSchemaDescriptor())
228+
.build();
229+
230+
return sendAndReceiveMessageReactive(requestMessage)
231+
.map(Message::getPayload);
232+
}
233+
234+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/**
2+
* Components for server-side gRPC support.
3+
*/
4+
@org.jspecify.annotations.NullMarked
5+
package org.springframework.integration.grpc.inbound;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/**
2+
* Base package for gRPC support.
3+
*/
4+
@org.jspecify.annotations.NullMarked
5+
package org.springframework.integration.grpc;

0 commit comments

Comments
 (0)