Skip to content

Commit 585cffd

Browse files
committed
Set status as a trailer or normal header
Uses snapshot of reactor-netty for integration test. Will need to be upgraded for main project when the release comes (soon). Signed-off-by: Dave Syer <[email protected]>
1 parent 7842347 commit 585cffd

File tree

11 files changed

+189
-35
lines changed

11 files changed

+189
-35
lines changed

spring-cloud-gateway-integration-tests/grpc/pom.xml

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3-
xmlns="http://maven.apache.org/POM/4.0.0"
4-
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
55
<modelVersion>4.0.0</modelVersion>
66

77
<artifactId>grpc</artifactId>
@@ -22,6 +22,18 @@
2222
<relativePath>..</relativePath> <!-- lookup parent from repository -->
2323
</parent>
2424

25+
<dependencyManagement>
26+
<dependencies>
27+
<dependency>
28+
<groupId>io.projectreactor</groupId>
29+
<artifactId>reactor-bom</artifactId>
30+
<version>2025.0.0-SNAPSHOT</version>
31+
<type>pom</type>
32+
<scope>import</scope>
33+
</dependency>
34+
</dependencies>
35+
</dependencyManagement>
36+
2537
<dependencies>
2638
<dependency>
2739
<groupId>org.springframework.boot</groupId>
@@ -113,12 +125,15 @@
113125
<artifactId>protobuf-maven-plugin</artifactId>
114126
<version>0.6.1</version>
115127
<configuration>
116-
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
128+
<protocArtifact>
129+
com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
117130
<pluginId>grpc-java</pluginId>
118-
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
131+
<pluginArtifact>
132+
io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
119133
</configuration>
120134
<executions>
121135
<execution>
136+
<?m2e execute onConfiguration,onIncremental?>
122137
<goals>
123138
<goal>compile</goal>
124139
<goal>compile-custom</goal>
@@ -128,5 +143,4 @@
128143
</plugin>
129144
</plugins>
130145
</build>
131-
</project>
132-
146+
</project>

spring-cloud-gateway-integration-tests/grpc/src/main/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplication.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,14 @@ public void hello(HelloRequest request, StreamObserver<HelloResponse> responseOb
119119
HelloResponse response = HelloResponse.newBuilder().setGreeting(greeting).build();
120120

121121
responseObserver.onNext(response);
122+
123+
if ("failWithRuntimeExceptionAfterData!".equals(request.getFirstName())) {
124+
StatusRuntimeException exception = Status.RESOURCE_EXHAUSTED.withDescription("Too long firstNames?")
125+
.asRuntimeException();
126+
responseObserver.onError(exception);
127+
return;
128+
}
129+
122130
responseObserver.onCompleted();
123131
}
124132

spring-cloud-gateway-integration-tests/grpc/src/test/java/org/springframework/cloud/gateway/tests/grpc/GRPCApplicationTests.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,31 @@
3030
import org.junit.jupiter.api.BeforeEach;
3131
import org.junit.jupiter.api.Test;
3232

33+
import org.springframework.boot.SpringApplication;
3334
import org.springframework.boot.test.context.SpringBootTest;
35+
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
3436
import org.springframework.boot.web.server.test.LocalServerPort;
37+
import org.springframework.test.annotation.DirtiesContext;
3538

3639
import static io.grpc.Status.FAILED_PRECONDITION;
40+
import static io.grpc.Status.RESOURCE_EXHAUSTED;
3741
import static io.grpc.netty.NegotiationType.TLS;
38-
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
3942

4043
/**
4144
* @author Alberto C. Ríos
4245
*/
4346
@SpringBootTest(classes = org.springframework.cloud.gateway.tests.grpc.GRPCApplication.class,
4447
webEnvironment = WebEnvironment.RANDOM_PORT)
48+
@DirtiesContext
4549
public class GRPCApplicationTests {
4650

4751
@LocalServerPort
4852
private int gatewayPort;
4953

54+
public static void main(String[] args) {
55+
SpringApplication.run(GRPCApplication.class, args);
56+
}
57+
5058
@BeforeEach
5159
void setUp() {
5260
int grpcServerPort = gatewayPort + 1;
@@ -88,6 +96,23 @@ public void gRPCUnaryCallShouldHandleRuntimeException() throws SSLException {
8896
}
8997
}
9098

99+
@Test
100+
public void gRPCUnaryCallShouldHandleRuntimeException2() throws SSLException {
101+
ManagedChannel channel = createSecuredChannel(gatewayPort);
102+
boolean thrown = false;
103+
try {
104+
HelloServiceGrpc.newBlockingStub(channel)
105+
.hello(HelloRequest.newBuilder().setFirstName("failWithRuntimeExceptionAfterData!").build())
106+
.getGreeting();
107+
}
108+
catch (StatusRuntimeException e) {
109+
thrown = true;
110+
Assertions.assertThat(e.getStatus().getCode()).isEqualTo(RESOURCE_EXHAUSTED.getCode());
111+
Assertions.assertThat(e.getStatus().getDescription()).isEqualTo("Too long firstNames?");
112+
}
113+
Assertions.assertThat(thrown).withFailMessage("Expected exception not thrown!").isTrue();
114+
}
115+
91116
private TrustManager[] createTrustAllTrustManager() {
92117
return new TrustManager[] { new X509TrustManager() {
93118
public X509Certificate[] getAcceptedIssuers() {

spring-cloud-gateway-integration-tests/grpc/src/test/java/org/springframework/cloud/gateway/tests/grpc/JsonToGrpcApplicationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@
3636
import org.apache.hc.core5.ssl.TrustStrategy;
3737
import org.assertj.core.api.Assertions;
3838
import org.junit.jupiter.api.BeforeEach;
39-
import org.junit.jupiter.api.Disabled;
4039
import org.junit.jupiter.api.Test;
4140

4241
import org.springframework.boot.test.context.SpringBootTest;
4342
import org.springframework.boot.web.server.test.LocalServerPort;
4443
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
44+
import org.springframework.test.annotation.DirtiesContext;
4545
import org.springframework.web.client.RestTemplate;
4646

4747
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@@ -50,8 +50,8 @@
5050
* @author Alberto C. Ríos
5151
* @author Abel Salgado Romero
5252
*/
53-
@Disabled
5453
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
54+
@DirtiesContext
5555
public class JsonToGrpcApplicationTests {
5656

5757
@LocalServerPort

spring-cloud-gateway-integration-tests/grpc/src/test/resources/application.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ server:
1010
management:
1111
endpoint:
1212
health:
13-
show-details: when_authorized
13+
show-details: when-authorized
1414
gateway:
1515
enabled: true
1616
endpoints:

spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/config/GatewayAutoConfiguration.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -828,8 +828,9 @@ public NettyRoutingFilter routingFilter(HttpClient httpClient,
828828

829829
@Bean
830830
@ConditionalOnEnabledGlobalFilter(NettyRoutingFilter.class)
831-
public NettyWriteResponseFilter nettyWriteResponseFilter(GatewayProperties properties) {
832-
return new NettyWriteResponseFilter(properties.getStreamingMediaTypes());
831+
public NettyWriteResponseFilter nettyWriteResponseFilter(GatewayProperties properties,
832+
ObjectProvider<List<HttpHeadersFilter>> headersFilters) {
833+
return new NettyWriteResponseFilter(properties.getStreamingMediaTypes(), headersFilters);
833834
}
834835

835836
@Bean

spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/NettyWriteResponseFilter.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@
2525
import reactor.core.publisher.Mono;
2626
import reactor.core.publisher.SignalType;
2727
import reactor.netty.Connection;
28+
import reactor.netty.http.client.HttpClientResponse;
2829

30+
import org.springframework.beans.factory.ObjectProvider;
31+
import org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter;
32+
import org.springframework.cloud.gateway.filter.headers.TrailerHeadersFilter;
2933
import org.springframework.core.Ordered;
3034
import org.springframework.core.io.buffer.DataBuffer;
3135
import org.springframework.core.io.buffer.DataBufferFactory;
@@ -36,6 +40,7 @@
3640
import org.springframework.lang.Nullable;
3741
import org.springframework.web.server.ServerWebExchange;
3842

43+
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR;
3944
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CLIENT_RESPONSE_CONN_ATTR;
4045

4146
/**
@@ -52,8 +57,22 @@ public class NettyWriteResponseFilter implements GlobalFilter, Ordered {
5257

5358
private final List<MediaType> streamingMediaTypes;
5459

55-
public NettyWriteResponseFilter(List<MediaType> streamingMediaTypes) {
60+
private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;
61+
62+
// do not use this headersFilters directly, use getHeadersFilters() instead.
63+
private volatile List<HttpHeadersFilter> headersFilters;
64+
65+
public NettyWriteResponseFilter(List<MediaType> streamingMediaTypes,
66+
ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider) {
5667
this.streamingMediaTypes = streamingMediaTypes;
68+
this.headersFiltersProvider = headersFiltersProvider;
69+
}
70+
71+
public List<HttpHeadersFilter> getHeadersFilters() {
72+
if (headersFilters == null) {
73+
headersFilters = headersFiltersProvider == null ? List.of() : headersFiltersProvider.getIfAvailable();
74+
}
75+
return headersFilters;
5776
}
5877

5978
@Override
@@ -96,9 +115,12 @@ public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
96115
log.trace("invalid media type", e);
97116
}
98117
}
99-
return (isStreamingMediaType(contentType)
118+
119+
HttpClientResponse httpClientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);
120+
Mono<Void> write = (isStreamingMediaType(contentType)
100121
? response.writeAndFlushWith(body.map(Flux::just))
101122
: response.writeWith(body));
123+
return write.then(TrailerHeadersFilter.filter(getHeadersFilters(), exchange, httpClientResponse)).then();
102124
}))
103125
.doFinally(signalType -> {
104126
if (signalType == SignalType.CANCEL || signalType == SignalType.ON_ERROR) {

spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/headers/GRPCResponseHeadersFilter.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,25 +38,20 @@ public class GRPCResponseHeadersFilter implements HttpHeadersFilter, Ordered {
3838
@Override
3939
public HttpHeaders filter(HttpHeaders headers, ServerWebExchange exchange) {
4040
ServerHttpResponse response = exchange.getResponse();
41-
HttpHeaders responseHeaders = response.getHeaders();
4241
if (isGRPC(exchange)) {
43-
String trailerHeaderValue = GRPC_STATUS_HEADER + "," + GRPC_MESSAGE_HEADER;
44-
String originalTrailerHeaderValue = responseHeaders.getFirst(HttpHeaders.TRAILER);
45-
if (originalTrailerHeaderValue != null) {
46-
trailerHeaderValue += "," + originalTrailerHeaderValue;
47-
}
48-
responseHeaders.set(HttpHeaders.TRAILER, trailerHeaderValue);
49-
50-
while (response instanceof ServerHttpResponseDecorator) {
51-
response = ((ServerHttpResponseDecorator) response).getDelegate();
52-
}
53-
if (response instanceof AbstractServerHttpResponse) {
54-
String grpcStatus = getGrpcStatus(headers);
55-
String grpcMessage = getGrpcMessage(headers);
56-
((HttpServerResponse) ((AbstractServerHttpResponse) response).getNativeResponse()).trailerHeaders(h -> {
57-
h.set(GRPC_STATUS_HEADER, grpcStatus);
58-
h.set(GRPC_MESSAGE_HEADER, grpcMessage);
59-
});
42+
String grpcStatus = getGrpcStatus(headers);
43+
String grpcMessage = getGrpcMessage(headers);
44+
if (grpcStatus != null) {
45+
while (response instanceof ServerHttpResponseDecorator) {
46+
response = ((ServerHttpResponseDecorator) response).getDelegate();
47+
}
48+
if (response instanceof AbstractServerHttpResponse) {
49+
((HttpServerResponse) ((AbstractServerHttpResponse) response).getNativeResponse())
50+
.trailerHeaders(h -> {
51+
h.set(GRPC_STATUS_HEADER, grpcStatus);
52+
h.set(GRPC_MESSAGE_HEADER, grpcMessage);
53+
});
54+
}
6055
}
6156

6257
}
@@ -70,7 +65,7 @@ private boolean isGRPC(ServerWebExchange exchange) {
7065

7166
private String getGrpcStatus(HttpHeaders headers) {
7267
final String grpcStatusValue = headers.getFirst(GRPC_STATUS_HEADER);
73-
return StringUtils.hasText(grpcStatusValue) ? grpcStatusValue : "0";
68+
return StringUtils.hasText(grpcStatusValue) ? grpcStatusValue : null;
7469
}
7570

7671
private String getGrpcMessage(HttpHeaders headers) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2013-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.gateway.filter.headers;
18+
19+
import java.util.List;
20+
21+
import reactor.core.publisher.Mono;
22+
import reactor.netty.http.client.HttpClientResponse;
23+
import reactor.netty.http.server.HttpServerResponse;
24+
25+
import org.springframework.http.HttpHeaders;
26+
import org.springframework.http.server.reactive.AbstractServerHttpResponse;
27+
import org.springframework.http.server.reactive.ServerHttpResponse;
28+
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
29+
import org.springframework.web.server.ServerWebExchange;
30+
31+
public interface TrailerHeadersFilter {
32+
33+
static Mono<HttpHeaders> filter(List<HttpHeadersFilter> filters, ServerWebExchange exchange,
34+
HttpClientResponse response) {
35+
if (response == null) {
36+
return Mono.empty();
37+
}
38+
Mono<HttpHeaders> headers = response.trailerHeaders().map(input -> {
39+
HttpHeaders httpHeaders = new HttpHeaders();
40+
input.forEach(entry -> httpHeaders.add(entry.getKey(), entry.getValue()));
41+
return httpHeaders;
42+
});
43+
return filter(filters, headers, exchange);
44+
}
45+
46+
private static Mono<HttpHeaders> filter(List<HttpHeadersFilter> filters, Mono<HttpHeaders> input,
47+
ServerWebExchange exchange) {
48+
49+
Mono<HttpHeaders> filtered = input;
50+
if (filters != null) {
51+
for (int i = 0; i < filters.size(); i++) {
52+
if (filters.get(i) instanceof TrailerHeadersFilter filter) {
53+
filtered = filtered.map(headers -> filter.trailers(headers, exchange));
54+
}
55+
}
56+
}
57+
58+
return filtered.doOnSuccess(headers -> {
59+
60+
if (headers == null || headers.isEmpty()) {
61+
return;
62+
}
63+
64+
ServerHttpResponse response = exchange.getResponse();
65+
while (response instanceof ServerHttpResponseDecorator) {
66+
response = ((ServerHttpResponseDecorator) response).getDelegate();
67+
}
68+
if (response instanceof AbstractServerHttpResponse) {
69+
((HttpServerResponse) ((AbstractServerHttpResponse) response).getNativeResponse()).trailerHeaders(h -> {
70+
headers.forEach((key, values) -> {
71+
for (String value : values) {
72+
h.add(key, value);
73+
}
74+
});
75+
});
76+
}
77+
78+
});
79+
}
80+
81+
/**
82+
* Filters a set of Http Headers.
83+
* @param input Http Headers
84+
* @param exchange a {@link ServerWebExchange} that should be filtered
85+
* @return filtered Http Headers
86+
*/
87+
HttpHeaders trailers(HttpHeaders input, ServerWebExchange exchange);
88+
89+
}

spring-cloud-gateway-server/src/test/java/org/springframework/cloud/gateway/filter/NettyWriteResponseFilterTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void testWrap_DefaultDataBufferFactory() {
4646
}
4747

4848
private void doTestWrap(MockServerHttpResponse response) {
49-
NettyWriteResponseFilter filter = new NettyWriteResponseFilter(new ArrayList<>());
49+
NettyWriteResponseFilter filter = new NettyWriteResponseFilter(new ArrayList<>(), null);
5050

5151
ByteBuf buffer = DEFAULT.buffer();
5252
buffer.writeCharSequence("test", Charset.defaultCharset());

0 commit comments

Comments
 (0)