Skip to content

Commit ee7c238

Browse files
committed
Add streaming method to service
Signed-off-by: Dave Syer <[email protected]>
1 parent 585cffd commit ee7c238

File tree

4 files changed

+94
-24
lines changed

4 files changed

+94
-24
lines changed

spring-cloud-gateway-integration-tests/grpc/src/main/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplication.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.springframework.boot.SpringApplication;
3636
import org.springframework.boot.SpringBootConfiguration;
3737
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
38+
import org.springframework.cloud.gateway.tests.grpc.StreamServiceGrpc.StreamServiceImplBase;
3839
import org.springframework.core.env.Environment;
3940
import org.springframework.core.io.ClassPathResource;
4041
import org.springframework.stereotype.Component;
@@ -73,15 +74,16 @@ private void start() throws IOException {
7374
Integer serverPort = environment.getProperty("local.server.port", Integer.class);
7475
int grpcPort = serverPort + 1;
7576
ServerCredentials creds = createServerCredentials();
76-
server = Grpc.newServerBuilderForPort(grpcPort, creds).addService(new HelloService()).build().start();
77+
server = Grpc.newServerBuilderForPort(grpcPort, creds)
78+
.addService(new HelloService())
79+
.addService(new StreamService()).build().start();
7780

7881
log.info("Starting gRPC server in port " + grpcPort);
7982

8083
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
8184
try {
8285
GRPCServer.this.stop();
83-
}
84-
catch (InterruptedException e) {
86+
} catch (InterruptedException e) {
8587
e.printStackTrace(System.err);
8688
}
8789
}));
@@ -101,13 +103,43 @@ private void stop() throws InterruptedException {
101103
log.info("gRPC server stopped");
102104
}
103105

106+
static class StreamService extends StreamServiceGrpc.StreamServiceImplBase {
107+
108+
@Override
109+
public void more(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
110+
int count = 0;
111+
while (count < 3) {
112+
HelloResponse reply = HelloResponse.newBuilder()
113+
.setGreeting("Hello(" + count + ") ==> " + request.getFirstName()).build();
114+
if ("failWithRuntimeExceptionAfterData!".equals(request.getFirstName()) && count == 2) {
115+
StatusRuntimeException exception = Status.RESOURCE_EXHAUSTED
116+
.withDescription("Too long firstNames?")
117+
.asRuntimeException();
118+
responseObserver.onError(exception);
119+
return;
120+
}
121+
responseObserver.onNext(reply);
122+
count++;
123+
try {
124+
Thread.sleep(200L);
125+
} catch (InterruptedException e) {
126+
Thread.currentThread().interrupt();
127+
responseObserver.onError(e);
128+
return;
129+
}
130+
}
131+
responseObserver.onCompleted();
132+
}
133+
134+
}
135+
104136
static class HelloService extends HelloServiceGrpc.HelloServiceImplBase {
105137

106138
@Override
107139
public void hello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
108140
if ("failWithRuntimeException!".equals(request.getFirstName())) {
109141
StatusRuntimeException exception = Status.FAILED_PRECONDITION.withDescription("Invalid firstName")
110-
.asRuntimeException();
142+
.asRuntimeException();
111143
responseObserver.onError(exception);
112144
responseObserver.onCompleted();
113145
return;
Binary file not shown.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
syntax = "proto3";
2+
option java_multiple_files = true;
3+
option java_package = "org.springframework.cloud.gateway.tests.grpc";
4+
package org.springframework.cloud.gateway.tests.grpc;
5+
6+
import "hello.proto";
7+
8+
service StreamService {
9+
rpc more(HelloRequest) returns (stream HelloResponse);
10+
}

spring-cloud-gateway-integration-tests/grpc/src/test/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplicationTests.java

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,35 @@
1616

1717
package org.springframework.cloud.gateway.tests.grpc;
1818

19+
import static io.grpc.Status.FAILED_PRECONDITION;
20+
import static io.grpc.Status.RESOURCE_EXHAUSTED;
21+
import static io.grpc.netty.NegotiationType.TLS;
22+
1923
import java.security.cert.X509Certificate;
24+
import java.util.Iterator;
2025

2126
import javax.net.ssl.SSLException;
2227
import javax.net.ssl.TrustManager;
2328
import javax.net.ssl.X509TrustManager;
2429

25-
import io.grpc.ManagedChannel;
26-
import io.grpc.StatusRuntimeException;
27-
import io.grpc.netty.GrpcSslContexts;
28-
import io.grpc.netty.NettyChannelBuilder;
2930
import org.assertj.core.api.Assertions;
3031
import org.junit.jupiter.api.BeforeEach;
3132
import org.junit.jupiter.api.Test;
32-
3333
import org.springframework.boot.SpringApplication;
3434
import org.springframework.boot.test.context.SpringBootTest;
3535
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
3636
import org.springframework.boot.web.server.test.LocalServerPort;
3737
import org.springframework.test.annotation.DirtiesContext;
3838

39-
import static io.grpc.Status.FAILED_PRECONDITION;
40-
import static io.grpc.Status.RESOURCE_EXHAUSTED;
41-
import static io.grpc.netty.NegotiationType.TLS;
39+
import io.grpc.ManagedChannel;
40+
import io.grpc.StatusRuntimeException;
41+
import io.grpc.netty.GrpcSslContexts;
42+
import io.grpc.netty.NettyChannelBuilder;
4243

4344
/**
4445
* @author Alberto C. Ríos
4546
*/
46-
@SpringBootTest(classes = org.springframework.cloud.gateway.tests.grpc.GRPCApplication.class,
47-
webEnvironment = WebEnvironment.RANDOM_PORT)
47+
@SpringBootTest(classes = org.springframework.cloud.gateway.tests.grpc.GRPCApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
4848
@DirtiesContext
4949
public class GRPCApplicationTests {
5050

@@ -67,19 +67,31 @@ public void gRPCUnaryCallShouldReturnResponse() throws SSLException {
6767
ManagedChannel channel = createSecuredChannel(gatewayPort);
6868

6969
final HelloResponse response = HelloServiceGrpc.newBlockingStub(channel)
70-
.hello(HelloRequest.newBuilder().setFirstName("Sir").setLastName("FromClient").build());
70+
.hello(HelloRequest.newBuilder().setFirstName("Sir").setLastName("FromClient").build());
7171

7272
Assertions.assertThat(response.getGreeting()).isEqualTo("Hello, Sir FromClient");
7373
}
7474

75+
@Test
76+
public void gRPCStreamingCallShouldReturnResponse() throws SSLException {
77+
ManagedChannel channel = createSecuredChannel(gatewayPort);
78+
79+
final Iterator<HelloResponse> response = StreamServiceGrpc.newBlockingStub(channel)
80+
.more(HelloRequest.newBuilder().setFirstName("Sir").setLastName("FromClient").build());
81+
82+
Assertions.assertThat(response.next().getGreeting()).isEqualTo("Hello(0) ==> Sir");
83+
Assertions.assertThat(response.next().getGreeting()).isEqualTo("Hello(1) ==> Sir");
84+
Assertions.assertThat(response.next().getGreeting()).isEqualTo("Hello(2) ==> Sir");
85+
}
86+
7587
private ManagedChannel createSecuredChannel(int port) throws SSLException {
7688
TrustManager[] trustAllCerts = createTrustAllTrustManager();
7789

7890
return NettyChannelBuilder.forAddress("localhost", port)
79-
.useTransportSecurity()
80-
.sslContext(GrpcSslContexts.forClient().trustManager(trustAllCerts[0]).build())
81-
.negotiationType(TLS)
82-
.build();
91+
.useTransportSecurity()
92+
.sslContext(GrpcSslContexts.forClient().trustManager(trustAllCerts[0]).build())
93+
.negotiationType(TLS)
94+
.build();
8395
}
8496

8597
@Test
@@ -88,24 +100,40 @@ public void gRPCUnaryCallShouldHandleRuntimeException() throws SSLException {
88100

89101
try {
90102
HelloServiceGrpc.newBlockingStub(channel)
91-
.hello(HelloRequest.newBuilder().setFirstName("failWithRuntimeException!").build());
92-
}
93-
catch (StatusRuntimeException e) {
103+
.hello(HelloRequest.newBuilder().setFirstName("failWithRuntimeException!").build());
104+
} catch (StatusRuntimeException e) {
94105
Assertions.assertThat(FAILED_PRECONDITION.getCode()).isEqualTo(e.getStatus().getCode());
95106
Assertions.assertThat("Invalid firstName").isEqualTo(e.getStatus().getDescription());
96107
}
97108
}
98109

99110
@Test
100-
public void gRPCUnaryCallShouldHandleRuntimeException2() throws SSLException {
111+
public void gRPCUnaryCallShouldHandleRuntimeExceptionAfterData() throws SSLException {
101112
ManagedChannel channel = createSecuredChannel(gatewayPort);
102113
boolean thrown = false;
103114
try {
104115
HelloServiceGrpc.newBlockingStub(channel)
105116
.hello(HelloRequest.newBuilder().setFirstName("failWithRuntimeExceptionAfterData!").build())
106117
.getGreeting();
118+
} catch (StatusRuntimeException e) {
119+
thrown = true;
120+
Assertions.assertThat(e.getStatus().getCode()).isEqualTo(RESOURCE_EXHAUSTED.getCode());
121+
Assertions.assertThat(e.getStatus().getDescription()).isEqualTo("Too long firstNames?");
107122
}
108-
catch (StatusRuntimeException e) {
123+
Assertions.assertThat(thrown).withFailMessage("Expected exception not thrown!").isTrue();
124+
}
125+
126+
@Test
127+
public void gRPCStreamingCallShouldHandleRuntimeExceptionAfterData() throws SSLException {
128+
ManagedChannel channel = createSecuredChannel(gatewayPort);
129+
boolean thrown = false;
130+
final Iterator<HelloResponse> response = StreamServiceGrpc.newBlockingStub(channel)
131+
.more(HelloRequest.newBuilder().setFirstName("failWithRuntimeExceptionAfterData!").setLastName("FromClient").build());
132+
Assertions.assertThat(response.next().getGreeting()).isEqualTo("Hello(0) ==> failWithRuntimeExceptionAfterData!");
133+
Assertions.assertThat(response.next().getGreeting()).isEqualTo("Hello(1) ==> failWithRuntimeExceptionAfterData!");
134+
try {
135+
Assertions.assertThat(response.next().getGreeting()).isEqualTo("Hello(2) ==> failWithRuntimeExceptionAfterData!");
136+
} catch (StatusRuntimeException e) {
109137
thrown = true;
110138
Assertions.assertThat(e.getStatus().getCode()).isEqualTo(RESOURCE_EXHAUSTED.getCode());
111139
Assertions.assertThat(e.getStatus().getDescription()).isEqualTo("Too long firstNames?");

0 commit comments

Comments
 (0)