16
16
17
17
package org .springframework .cloud .gateway .filter .headers ;
18
18
19
- import reactor .netty .http .server .HttpServerResponse ;
20
-
21
19
import org .springframework .core .Ordered ;
22
20
import org .springframework .http .HttpHeaders ;
23
21
import org .springframework .http .server .reactive .AbstractServerHttpResponse ;
24
22
import org .springframework .http .server .reactive .ServerHttpResponse ;
25
23
import org .springframework .http .server .reactive .ServerHttpResponseDecorator ;
26
24
import org .springframework .util .StringUtils ;
27
25
import org .springframework .web .server .ServerWebExchange ;
26
+ import reactor .netty .http .client .HttpClientResponse ;
27
+ import reactor .netty .http .server .HttpServerResponse ;
28
+
29
+ import static org .springframework .cloud .gateway .support .ServerWebExchangeUtils .CLIENT_RESPONSE_ATTR ;
28
30
29
31
/**
30
32
* @author Alberto C. Ríos
@@ -37,45 +39,61 @@ public class GRPCResponseHeadersFilter implements HttpHeadersFilter, Ordered {
37
39
38
40
@ Override
39
41
public HttpHeaders filter (HttpHeaders headers , ServerWebExchange exchange ) {
40
- ServerHttpResponse response = exchange .getResponse ();
41
- HttpHeaders responseHeaders = response .getHeaders ();
42
42
if (isGRPC (exchange )) {
43
- String trailerHeaderValue = GRPC_STATUS_HEADER + "," + GRPC_MESSAGE_HEADER ;
44
- String originalTrailerHeaderValue = responseHeaders .getFirst (HttpHeaders .TRAILER );
45
- if (originalTrailerHeaderValue != null ) {
46
- trailerHeaderValue += "," + originalTrailerHeaderValue ;
47
- }
48
- responseHeaders .set (HttpHeaders .TRAILER , trailerHeaderValue );
43
+ ServerHttpResponse response = exchange .getResponse ();
44
+ HttpHeaders responseHeaders = response .getHeaders ();
49
45
50
- while (response instanceof ServerHttpResponseDecorator ) {
51
- response = ((ServerHttpResponseDecorator ) response ).getDelegate ();
46
+ if (headers .containsKey (GRPC_STATUS_HEADER )) {
47
+ if (!"0" .equals (headers .getFirst (GRPC_STATUS_HEADER ))) {
48
+ response .setComplete (); // avoid empty DATA frame
49
+ }
52
50
}
53
- if (response instanceof AbstractServerHttpResponse ) {
54
- String grpcStatus = getGrpcStatus (headers );
55
- String grpcMessage = getGrpcMessage (headers );
56
- ((HttpServerResponse ) ((AbstractServerHttpResponse ) response ).getNativeResponse ()).trailerHeaders (h -> {
57
- h .set (GRPC_STATUS_HEADER , grpcStatus );
58
- h .set (GRPC_MESSAGE_HEADER , grpcMessage );
51
+
52
+ HttpClientResponse nettyInResponse = exchange .getAttribute (CLIENT_RESPONSE_ATTR );
53
+ if (nettyInResponse != null ) {
54
+ nettyInResponse .trailerHeaders ().subscribe (entries -> {
55
+ if (entries .contains (GRPC_STATUS_HEADER )) {
56
+ addTrailingHeader (entries , response , responseHeaders );
57
+ }
59
58
});
60
59
}
61
-
62
60
}
61
+
63
62
return headers ;
64
63
}
65
64
66
- private boolean isGRPC (ServerWebExchange exchange ) {
67
- String contentTypeValue = exchange .getRequest ().getHeaders ().getFirst (HttpHeaders .CONTENT_TYPE );
68
- return StringUtils .startsWithIgnoreCase (contentTypeValue , "application/grpc" );
65
+ private void addTrailingHeader (io .netty .handler .codec .http .HttpHeaders sourceHeaders , ServerHttpResponse response , HttpHeaders responseHeaders ) {
66
+ String trailerHeaderValue = GRPC_STATUS_HEADER + "," + GRPC_MESSAGE_HEADER ;
67
+ String originalTrailerHeaderValue = responseHeaders .getFirst (HttpHeaders .TRAILER );
68
+ if (originalTrailerHeaderValue != null ) {
69
+ trailerHeaderValue += "," + originalTrailerHeaderValue ;
70
+ }
71
+ responseHeaders .set (HttpHeaders .TRAILER , trailerHeaderValue );
72
+
73
+ HttpServerResponse nettyOutResponse = getNettyResponse (response );
74
+ if (nettyOutResponse != null ) {
75
+ String grpcStatus = sourceHeaders .get (GRPC_STATUS_HEADER , "0" );
76
+ String grpcMessage = sourceHeaders .get (GRPC_MESSAGE_HEADER , "" );
77
+ nettyOutResponse .trailerHeaders (h -> {
78
+ h .set (GRPC_STATUS_HEADER , grpcStatus );
79
+ h .set (GRPC_MESSAGE_HEADER , grpcMessage );
80
+ });
81
+ }
69
82
}
70
83
71
- private String getGrpcStatus (HttpHeaders headers ) {
72
- final String grpcStatusValue = headers .getFirst (GRPC_STATUS_HEADER );
73
- return StringUtils .hasText (grpcStatusValue ) ? grpcStatusValue : "0" ;
84
+ private HttpServerResponse getNettyResponse (ServerHttpResponse response ) {
85
+ while (response instanceof ServerHttpResponseDecorator ) {
86
+ response = ((ServerHttpResponseDecorator ) response ).getDelegate ();
87
+ }
88
+ if (response instanceof AbstractServerHttpResponse ) {
89
+ return ((AbstractServerHttpResponse ) response ).getNativeResponse ();
90
+ }
91
+ return null ;
74
92
}
75
93
76
- private String getGrpcMessage ( HttpHeaders headers ) {
77
- final String grpcStatusValue = headers . getFirst (GRPC_MESSAGE_HEADER );
78
- return StringUtils .hasText ( grpcStatusValue ) ? grpcStatusValue : "" ;
94
+ private boolean isGRPC ( ServerWebExchange exchange ) {
95
+ String contentTypeValue = exchange . getRequest (). getHeaders (). getFirst (HttpHeaders . CONTENT_TYPE );
96
+ return StringUtils .startsWithIgnoreCase ( contentTypeValue , "application/grpc" ) ;
79
97
}
80
98
81
99
@ Override
0 commit comments