Skip to content

Commit e1e003f

Browse files
committed
Add streaming method to service
Signed-off-by: Dave Syer <[email protected]>
1 parent 585cffd commit e1e003f

File tree

5 files changed

+93
-9
lines changed

5 files changed

+93
-9
lines changed

spring-cloud-gateway-integration-tests/grpc/src/main/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplication.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,11 @@ private void start() throws IOException {
7373
Integer serverPort = environment.getProperty("local.server.port", Integer.class);
7474
int grpcPort = serverPort + 1;
7575
ServerCredentials creds = createServerCredentials();
76-
server = Grpc.newServerBuilderForPort(grpcPort, creds).addService(new HelloService()).build().start();
76+
server = Grpc.newServerBuilderForPort(grpcPort, creds)
77+
.addService(new HelloService())
78+
.addService(new StreamService())
79+
.build()
80+
.start();
7781

7882
log.info("Starting gRPC server in port " + grpcPort);
7983

@@ -101,6 +105,38 @@ private void stop() throws InterruptedException {
101105
log.info("gRPC server stopped");
102106
}
103107

108+
static class StreamService extends StreamServiceGrpc.StreamServiceImplBase {
109+
110+
@Override
111+
public void more(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
112+
int count = 0;
113+
while (count < 3) {
114+
HelloResponse reply = HelloResponse.newBuilder()
115+
.setGreeting("Hello(" + count + ") ==> " + request.getFirstName())
116+
.build();
117+
if ("failWithRuntimeExceptionAfterData!".equals(request.getFirstName()) && count == 2) {
118+
StatusRuntimeException exception = Status.RESOURCE_EXHAUSTED
119+
.withDescription("Too long firstNames?")
120+
.asRuntimeException();
121+
responseObserver.onError(exception);
122+
return;
123+
}
124+
responseObserver.onNext(reply);
125+
count++;
126+
try {
127+
Thread.sleep(200L);
128+
}
129+
catch (InterruptedException e) {
130+
Thread.currentThread().interrupt();
131+
responseObserver.onError(e);
132+
return;
133+
}
134+
}
135+
responseObserver.onCompleted();
136+
}
137+
138+
}
139+
104140
static class HelloService extends HelloServiceGrpc.HelloServiceImplBase {
105141

106142
@Override
@@ -122,7 +158,7 @@ public void hello(HelloRequest request, StreamObserver<HelloResponse> responseOb
122158

123159
if ("failWithRuntimeExceptionAfterData!".equals(request.getFirstName())) {
124160
StatusRuntimeException exception = Status.RESOURCE_EXHAUSTED.withDescription("Too long firstNames?")
125-
.asRuntimeException();
161+
.asRuntimeException();
126162
responseObserver.onError(exception);
127163
return;
128164
}
Binary file not shown.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
syntax = "proto3";
2+
option java_multiple_files = true;
3+
option java_package = "org.springframework.cloud.gateway.tests.grpc";
4+
package org.springframework.cloud.gateway.tests.grpc;
5+
6+
import "hello.proto";
7+
8+
service StreamService {
9+
rpc more(HelloRequest) returns (stream HelloResponse);
10+
}

spring-cloud-gateway-integration-tests/grpc/src/test/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplicationTests.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.cloud.gateway.tests.grpc;
1818

1919
import java.security.cert.X509Certificate;
20+
import java.util.Iterator;
2021

2122
import javax.net.ssl.SSLException;
2223
import javax.net.ssl.TrustManager;
@@ -72,6 +73,18 @@ public void gRPCUnaryCallShouldReturnResponse() throws SSLException {
7273
Assertions.assertThat(response.getGreeting()).isEqualTo("Hello, Sir FromClient");
7374
}
7475

76+
@Test
77+
public void gRPCStreamingCallShouldReturnResponse() throws SSLException {
78+
ManagedChannel channel = createSecuredChannel(gatewayPort);
79+
80+
final Iterator<HelloResponse> response = StreamServiceGrpc.newBlockingStub(channel)
81+
.more(HelloRequest.newBuilder().setFirstName("Sir").setLastName("FromClient").build());
82+
83+
Assertions.assertThat(response.next().getGreeting()).isEqualTo("Hello(0) ==> Sir");
84+
Assertions.assertThat(response.next().getGreeting()).isEqualTo("Hello(1) ==> Sir");
85+
Assertions.assertThat(response.next().getGreeting()).isEqualTo("Hello(2) ==> Sir");
86+
}
87+
7588
private ManagedChannel createSecuredChannel(int port) throws SSLException {
7689
TrustManager[] trustAllCerts = createTrustAllTrustManager();
7790

@@ -97,13 +110,38 @@ public void gRPCUnaryCallShouldHandleRuntimeException() throws SSLException {
97110
}
98111

99112
@Test
100-
public void gRPCUnaryCallShouldHandleRuntimeException2() throws SSLException {
113+
public void gRPCUnaryCallShouldHandleRuntimeExceptionAfterData() throws SSLException {
101114
ManagedChannel channel = createSecuredChannel(gatewayPort);
102115
boolean thrown = false;
103116
try {
104117
HelloServiceGrpc.newBlockingStub(channel)
105-
.hello(HelloRequest.newBuilder().setFirstName("failWithRuntimeExceptionAfterData!").build())
106-
.getGreeting();
118+
.hello(HelloRequest.newBuilder().setFirstName("failWithRuntimeExceptionAfterData!").build())
119+
.getGreeting();
120+
}
121+
catch (StatusRuntimeException e) {
122+
thrown = true;
123+
Assertions.assertThat(e.getStatus().getCode()).isEqualTo(RESOURCE_EXHAUSTED.getCode());
124+
Assertions.assertThat(e.getStatus().getDescription()).isEqualTo("Too long firstNames?");
125+
}
126+
Assertions.assertThat(thrown).withFailMessage("Expected exception not thrown!").isTrue();
127+
}
128+
129+
@Test
130+
public void gRPCStreamingCallShouldHandleRuntimeExceptionAfterData() throws SSLException {
131+
ManagedChannel channel = createSecuredChannel(gatewayPort);
132+
boolean thrown = false;
133+
final Iterator<HelloResponse> response = StreamServiceGrpc.newBlockingStub(channel)
134+
.more(HelloRequest.newBuilder()
135+
.setFirstName("failWithRuntimeExceptionAfterData!")
136+
.setLastName("FromClient")
137+
.build());
138+
Assertions.assertThat(response.next().getGreeting())
139+
.isEqualTo("Hello(0) ==> failWithRuntimeExceptionAfterData!");
140+
Assertions.assertThat(response.next().getGreeting())
141+
.isEqualTo("Hello(1) ==> failWithRuntimeExceptionAfterData!");
142+
try {
143+
Assertions.assertThat(response.next().getGreeting())
144+
.isEqualTo("Hello(2) ==> failWithRuntimeExceptionAfterData!");
107145
}
108146
catch (StatusRuntimeException e) {
109147
thrown = true;

spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/GRPCResponseHeadersFilter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ public HttpHeaders filter(HttpHeaders headers, ServerWebExchange exchange) {
4747
}
4848
if (response instanceof AbstractServerHttpResponse) {
4949
((HttpServerResponse) ((AbstractServerHttpResponse) response).getNativeResponse())
50-
.trailerHeaders(h -> {
51-
h.set(GRPC_STATUS_HEADER, grpcStatus);
52-
h.set(GRPC_MESSAGE_HEADER, grpcMessage);
53-
});
50+
.trailerHeaders(h -> {
51+
h.set(GRPC_STATUS_HEADER, grpcStatus);
52+
h.set(GRPC_MESSAGE_HEADER, grpcMessage);
53+
});
5454
}
5555
}
5656

0 commit comments

Comments
 (0)