@@ -59,8 +59,9 @@ public final class BasicClientExchangeHandler<T> implements AsyncClientExchangeH
59
59
private final AsyncRequestProducer requestProducer ;
60
60
private final AsyncResponseConsumer <T > responseConsumer ;
61
61
private final AtomicBoolean completed ;
62
- private final FutureCallback <T > resultCallback ;
63
62
private final AtomicBoolean outputTerminated ;
63
+ private final AtomicBoolean inputTerminated ;
64
+ private final FutureCallback <T > resultCallback ;
64
65
65
66
public BasicClientExchangeHandler (
66
67
final AsyncRequestProducer requestProducer ,
@@ -71,6 +72,7 @@ public BasicClientExchangeHandler(
71
72
this .completed = new AtomicBoolean ();
72
73
this .resultCallback = resultCallback ;
73
74
this .outputTerminated = new AtomicBoolean ();
75
+ this .inputTerminated = new AtomicBoolean ();
74
76
}
75
77
76
78
@ Override
@@ -100,64 +102,31 @@ public void consumeInformation(final HttpResponse response, final HttpContext ht
100
102
@ Override
101
103
public void consumeResponse (final HttpResponse response , final EntityDetails entityDetails , final HttpContext httpContext ) throws HttpException , IOException {
102
104
if (response .getCode () >= HttpStatus .SC_CLIENT_ERROR ) {
103
- outputTerminated .set (true );
104
- requestProducer .releaseResources ();
105
+ releaseRequestProducer ();
105
106
}
106
107
responseConsumer .consumeResponse (response , entityDetails , httpContext , new FutureCallback <T >() {
107
108
108
109
@ Override
109
110
public void completed (final T result ) {
110
- if (completed .compareAndSet (false , true )) {
111
- try {
112
- if (resultCallback != null ) {
113
- resultCallback .completed (result );
114
- }
115
- } finally {
116
- internalReleaseResources ();
117
- }
118
- }
111
+ completedInternal (result );
119
112
}
120
113
121
114
@ Override
122
115
public void failed (final Exception ex ) {
123
- if (completed .compareAndSet (false , true )) {
124
- try {
125
- if (resultCallback != null ) {
126
- resultCallback .failed (ex );
127
- }
128
- } finally {
129
- internalReleaseResources ();
130
- }
131
- }
116
+ failedInternal (ex );
132
117
}
133
118
134
119
@ Override
135
120
public void cancelled () {
136
- if (completed .compareAndSet (false , true )) {
137
- try {
138
- if (resultCallback != null ) {
139
- resultCallback .cancelled ();
140
- }
141
- } finally {
142
- internalReleaseResources ();
143
- }
144
- }
121
+ cancelledInternal ();
145
122
}
146
123
147
124
});
148
125
}
149
126
150
127
@ Override
151
128
public void cancel () {
152
- if (completed .compareAndSet (false , true )) {
153
- try {
154
- if (resultCallback != null ) {
155
- resultCallback .cancelled ();
156
- }
157
- } finally {
158
- internalReleaseResources ();
159
- }
160
- }
129
+ cancelledInternal ();
161
130
}
162
131
163
132
@ Override
@@ -178,28 +147,77 @@ public void streamEnd(final List<? extends Header> trailers) throws HttpExceptio
178
147
@ Override
179
148
public void failed (final Exception cause ) {
180
149
try {
181
- requestProducer .failed (cause );
182
- responseConsumer .failed (cause );
150
+ if (inputTerminated .get ()) {
151
+ responseConsumer .failed (cause );
152
+ }
153
+ if (!outputTerminated .get ()) {
154
+ requestProducer .failed (cause );
155
+ }
183
156
} finally {
184
- if (completed .compareAndSet (false , true )) {
185
- try {
186
- if (resultCallback != null ) {
187
- resultCallback .failed (cause );
188
- }
189
- } finally {
190
- internalReleaseResources ();
157
+ failedInternal (cause );
158
+ }
159
+ }
160
+
161
+ private void completedInternal (final T result ) {
162
+ if (completed .compareAndSet (false , true )) {
163
+ try {
164
+ if (resultCallback != null ) {
165
+ resultCallback .completed (result );
191
166
}
167
+ } finally {
168
+ releaseResourcesInternal ();
192
169
}
193
170
}
194
171
}
195
172
196
- private void internalReleaseResources () {
197
- requestProducer .releaseResources ();
198
- responseConsumer .releaseResources ();
173
+ private void failedInternal (final Exception ex ) {
174
+ if (completed .compareAndSet (false , true )) {
175
+ try {
176
+ if (resultCallback != null ) {
177
+ resultCallback .failed (ex );
178
+ }
179
+ } finally {
180
+ releaseResourcesInternal ();
181
+ }
182
+ }
183
+ }
184
+
185
+ private void cancelledInternal () {
186
+ if (completed .compareAndSet (false , true )) {
187
+ try {
188
+ if (resultCallback != null ) {
189
+ resultCallback .cancelled ();
190
+ }
191
+ } finally {
192
+ releaseResourcesInternal ();
193
+ }
194
+ }
195
+ }
196
+
197
+ private void releaseResponseConsumer () {
198
+ if (inputTerminated .compareAndSet (false , true )) {
199
+ responseConsumer .releaseResources ();
200
+ }
201
+ }
202
+
203
+ private void releaseRequestProducer () {
204
+ if (outputTerminated .compareAndSet (false , true )) {
205
+ requestProducer .releaseResources ();
206
+ }
207
+ }
208
+
209
+ private void releaseResourcesInternal () {
210
+ releaseRequestProducer ();
211
+ releaseResponseConsumer ();
199
212
}
200
213
201
214
@ Override
202
215
public void releaseResources () {
216
+ // Note even though the message exchange has been fully
217
+ // completed on the transport level, the response
218
+ // consumer may still be busy consuming and digesting
219
+ // the response message
220
+ releaseRequestProducer ();
203
221
}
204
222
205
223
}
0 commit comments