20
20
import java .util .concurrent .atomic .AtomicInteger ;
21
21
import java .util .function .BiFunction ;
22
22
23
+ import io .netty .buffer .ByteBufAllocator ;
24
+ import org .apache .commons .logging .Log ;
25
+ import org .apache .commons .logging .LogFactory ;
23
26
import reactor .core .publisher .Flux ;
24
27
import reactor .netty .Connection ;
25
28
import reactor .netty .NettyInbound ;
28
31
import org .springframework .core .io .buffer .DataBuffer ;
29
32
import org .springframework .core .io .buffer .NettyDataBufferFactory ;
30
33
import org .springframework .http .HttpHeaders ;
34
+ import org .springframework .http .HttpMethod ;
31
35
import org .springframework .http .HttpStatus ;
32
36
import org .springframework .http .ResponseCookie ;
33
37
import org .springframework .util .CollectionUtils ;
43
47
*/
44
48
class ReactorClientHttpResponse implements ClientHttpResponse {
45
49
50
+ private static final Log logger = LogFactory .getLog (ReactorClientHttpResponse .class );
51
+
46
52
private final HttpClientResponse response ;
47
53
54
+ private final HttpHeaders headers ;
55
+
48
56
private final NettyInbound inbound ;
49
57
50
58
private final NettyDataBufferFactory bufferFactory ;
51
59
52
- private final Connection connection ;
53
-
54
- private final HttpHeaders headers ;
55
-
56
- // 0 - not subscribed, 1 - subscribed, 2 - cancelled
60
+ // 0 - not subscribed, 1 - subscribed, 2 - cancelled, 3 - cancelled via connector (before subscribe)
57
61
private final AtomicInteger state = new AtomicInteger (0 );
58
62
63
+ private final String logPrefix ;
64
+
59
65
60
66
/**
61
67
* Constructor that matches the inputs from
@@ -64,26 +70,46 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
64
70
*/
65
71
public ReactorClientHttpResponse (HttpClientResponse response , Connection connection ) {
66
72
this .response = response ;
73
+ MultiValueMap <String , String > adapter = new NettyHeadersAdapter (response .responseHeaders ());
74
+ this .headers = HttpHeaders .readOnlyHttpHeaders (adapter );
67
75
this .inbound = connection .inbound ();
68
76
this .bufferFactory = new NettyDataBufferFactory (connection .outbound ().alloc ());
69
- this .connection = connection ;
77
+ this .logPrefix = (logger .isDebugEnabled () ? "[" + connection .channel ().id ().asShortText () + "] " : "" );
78
+ }
79
+
80
+ /**
81
+ * Constructor with inputs extracted from a {@link Connection}.
82
+ * @deprecated as of 5.2.8
83
+ */
84
+ @ Deprecated
85
+ public ReactorClientHttpResponse (HttpClientResponse response , NettyInbound inbound , ByteBufAllocator alloc ) {
86
+ this .response = response ;
70
87
MultiValueMap <String , String > adapter = new NettyHeadersAdapter (response .responseHeaders ());
71
88
this .headers = HttpHeaders .readOnlyHttpHeaders (adapter );
89
+ this .inbound = inbound ;
90
+ this .bufferFactory = new NettyDataBufferFactory (alloc );
91
+ this .logPrefix = "" ;
72
92
}
73
93
74
94
75
95
@ Override
76
96
public Flux <DataBuffer > getBody () {
77
97
return this .inbound .receive ()
78
98
.doOnSubscribe (s -> {
79
- if (!this .state .compareAndSet (0 , 1 )) {
80
- // https://github.com/reactor/reactor-netty/issues/503
81
- // FluxReceive rejects multiple subscribers, but not after a cancel().
82
- // Subsequent subscribers after cancel() will not be rejected, but will hang instead.
83
- // So we need to reject once in cancelled state.
84
- if (this .state .get () == 2 ) {
85
- throw new IllegalStateException ("The client response body can only be consumed once." );
86
- }
99
+ if (this .state .compareAndSet (0 , 1 )) {
100
+ return ;
101
+ }
102
+ // https://github.com/reactor/reactor-netty/issues/503
103
+ // FluxReceive rejects multiple subscribers, but not after a cancel().
104
+ // Subsequent subscribers after cancel() will not be rejected, but will hang instead.
105
+ // So we need to reject once in cancelled state.
106
+ if (this .state .get () == 2 ) {
107
+ throw new IllegalStateException (
108
+ "The client response body can only be consumed once." );
109
+ }
110
+ else if (this .state .get () == 3 ) {
111
+ throw new IllegalStateException (
112
+ "The client response body has been released already due to cancellation." );
87
113
}
88
114
})
89
115
.doOnCancel (() -> this .state .compareAndSet (1 , 2 ))
@@ -113,6 +139,7 @@ public MultiValueMap<String, ResponseCookie> getCookies() {
113
139
MultiValueMap <String , ResponseCookie > result = new LinkedMultiValueMap <>();
114
140
this .response .cookies ().values ().stream ().flatMap (Collection ::stream )
115
141
.forEach (c ->
142
+
116
143
result .add (c .name (), ResponseCookie .fromClientResponse (c .name (), c .value ())
117
144
.domain (c .domain ())
118
145
.path (c .path ())
@@ -124,17 +151,25 @@ public MultiValueMap<String, ResponseCookie> getCookies() {
124
151
}
125
152
126
153
/**
127
- * For use by {@link ReactorClientHttpConnector}.
154
+ * Called by {@link ReactorClientHttpConnector} when a cancellation is detected
155
+ * but the content has not been subscribed to. If the subscription never
156
+ * materializes then the content will remain not drained. Or it could still
157
+ * materialize if the cancellation happened very early, or the response
158
+ * reading was delayed for some reason.
128
159
*/
129
- boolean bodyNotSubscribed () {
130
- return this .state .get () == 0 ;
160
+ void releaseAfterCancel (HttpMethod method ) {
161
+ if (mayHaveBody (method ) && this .state .compareAndSet (0 , 3 )) {
162
+ if (logger .isDebugEnabled ()) {
163
+ logger .debug (this .logPrefix + "Releasing body, not yet subscribed." );
164
+ }
165
+ this .inbound .receive ().doOnNext (byteBuf -> {}).subscribe (byteBuf -> {}, ex -> {});
166
+ }
131
167
}
132
168
133
- /**
134
- * For use by {@link ReactorClientHttpConnector}.
135
- */
136
- Connection getConnection () {
137
- return this .connection ;
169
+ private boolean mayHaveBody (HttpMethod method ) {
170
+ int code = this .getRawStatusCode ();
171
+ return !((code >= 100 && code < 200 ) || code == 204 || code == 205 ||
172
+ method .equals (HttpMethod .HEAD ) || getHeaders ().getContentLength () == 0 );
138
173
}
139
174
140
175
@ Override
0 commit comments