|
15 | 15 | import io.clientcore.core.models.binarydata.FileBinaryData; |
16 | 16 | import io.clientcore.core.models.binarydata.InputStreamBinaryData; |
17 | 17 | import io.clientcore.core.utils.AuthenticateChallenge; |
| 18 | +import io.clientcore.core.utils.CoreUtils; |
18 | 19 | import io.clientcore.core.utils.ProgressReporter; |
19 | 20 | import io.clientcore.core.utils.ServerSentEventUtils; |
20 | 21 | import io.clientcore.http.netty4.implementation.ChannelInitializationProxyHandler; |
| 22 | +import io.clientcore.http.netty4.implementation.Netty4ChannelBinaryData; |
| 23 | +import io.clientcore.http.netty4.implementation.Netty4EagerConsumeChannelHandler; |
21 | 24 | import io.clientcore.http.netty4.implementation.Netty4ProgressAndTimeoutHandler; |
22 | 25 | import io.clientcore.http.netty4.implementation.Netty4ResponseHandler; |
23 | 26 | import io.clientcore.http.netty4.implementation.Netty4SslInitializationHandler; |
| 27 | +import io.clientcore.http.netty4.implementation.ResponseBodyHandling; |
| 28 | +import io.clientcore.http.netty4.implementation.ResponseStateInfo; |
24 | 29 | import io.clientcore.http.netty4.implementation.WrappedHttpHeaders; |
25 | 30 | import io.netty.bootstrap.Bootstrap; |
26 | 31 | import io.netty.buffer.ByteBuf; |
|
43 | 48 | import io.netty.handler.stream.ChunkedWriteHandler; |
44 | 49 |
|
45 | 50 | import javax.net.ssl.SSLException; |
| 51 | +import java.io.ByteArrayOutputStream; |
46 | 52 | import java.io.IOException; |
47 | 53 | import java.net.URI; |
48 | 54 | import java.nio.channels.FileChannel; |
|
54 | 60 | import static io.clientcore.core.utils.ServerSentEventUtils.attemptRetry; |
55 | 61 | import static io.clientcore.core.utils.ServerSentEventUtils.processTextEventStream; |
56 | 62 | import static io.clientcore.http.netty4.implementation.Netty4Utility.PROGRESS_AND_TIMEOUT_HANDLER_NAME; |
| 63 | +import static io.clientcore.http.netty4.implementation.Netty4Utility.awaitLatch; |
57 | 64 | import static io.clientcore.http.netty4.implementation.Netty4Utility.createCodec; |
58 | 65 | import static io.clientcore.http.netty4.implementation.Netty4Utility.setOrSuppressError; |
59 | 66 | import static io.netty.handler.codec.http.DefaultHttpHeadersFactory.trailersFactory; |
@@ -106,7 +113,7 @@ public Response<BinaryData> send(HttpRequest request) { |
106 | 113 | boolean addProgressAndTimeoutHandler |
107 | 114 | = progressReporter != null || writeTimeoutMillis > 0 || responseTimeoutMillis > 0 || readTimeoutMillis > 0; |
108 | 115 |
|
109 | | - AtomicReference<Response<BinaryData>> responseReference = new AtomicReference<>(); |
| 116 | + AtomicReference<ResponseStateInfo> responseReference = new AtomicReference<>(); |
110 | 117 | AtomicReference<Throwable> errorReference = new AtomicReference<>(); |
111 | 118 | CountDownLatch latch = new CountDownLatch(1); |
112 | 119 |
|
@@ -143,67 +150,123 @@ protected void initChannel(Channel ch) throws SSLException { |
143 | 150 | } |
144 | 151 | }); |
145 | 152 |
|
146 | | - try { |
147 | | - bootstrap.connect(host, port).addListener((ChannelFutureListener) connectListener -> { |
148 | | - if (!connectListener.isSuccess()) { |
149 | | - LOGGER.atError().setThrowable(connectListener.cause()).log("Failed to send request"); |
150 | | - errorReference.set(connectListener.cause()); |
151 | | - connectListener.channel().close(); |
152 | | - latch.countDown(); |
153 | | - return; |
| 153 | + bootstrap.connect(host, port).addListener((ChannelFutureListener) connectListener -> { |
| 154 | + if (!connectListener.isSuccess()) { |
| 155 | + LOGGER.atError().setThrowable(connectListener.cause()).log("Failed to send request"); |
| 156 | + errorReference.set(connectListener.cause()); |
| 157 | + connectListener.channel().close(); |
| 158 | + latch.countDown(); |
| 159 | + return; |
| 160 | + } |
| 161 | + |
| 162 | + Channel channel = connectListener.channel(); |
| 163 | + channel.closeFuture().addListener(closeListener -> { |
| 164 | + if (!closeListener.isSuccess()) { |
| 165 | + LOGGER.atError().setThrowable(closeListener.cause()).log("Channel closed with error"); |
| 166 | + setOrSuppressError(errorReference, closeListener.cause()); |
154 | 167 | } |
| 168 | + }); |
| 169 | + |
| 170 | + // Only add CoreProgressAndTimeoutHandler if it will do anything, ex it is reporting progress or is |
| 171 | + // applying timeouts. |
| 172 | + // This is done to keep the ChannelPipeline shorter, therefore more performant, if this would |
| 173 | + // effectively be a no-op. |
| 174 | + if (addProgressAndTimeoutHandler) { |
| 175 | + channel.pipeline() |
| 176 | + .addLast(PROGRESS_AND_TIMEOUT_HANDLER_NAME, new Netty4ProgressAndTimeoutHandler(progressReporter, |
| 177 | + writeTimeoutMillis, responseTimeoutMillis, readTimeoutMillis)); |
| 178 | + } |
| 179 | + |
| 180 | + Netty4ResponseHandler responseHandler |
| 181 | + = new Netty4ResponseHandler(request, responseReference, errorReference, latch); |
| 182 | + channel.pipeline().addLast(responseHandler); |
155 | 183 |
|
156 | | - Channel channel = connectListener.channel(); |
157 | | - channel.closeFuture().addListener(closeListener -> { |
158 | | - if (!closeListener.isSuccess()) { |
159 | | - LOGGER.atError().setThrowable(closeListener.cause()).log("Channel closed with error"); |
160 | | - setOrSuppressError(errorReference, closeListener.cause()); |
| 184 | + Throwable earlyError = errorReference.get(); |
| 185 | + if (earlyError != null) { |
| 186 | + // If an error occurred between the connect and the request being sent, don't proceed with sending |
| 187 | + // the request. |
| 188 | + latch.countDown(); |
| 189 | + return; |
| 190 | + } |
| 191 | + |
| 192 | + sendRequest(request, channel, addProgressAndTimeoutHandler, errorReference) |
| 193 | + .addListener((ChannelFutureListener) sendListener -> { |
| 194 | + if (!sendListener.isSuccess()) { |
| 195 | + setOrSuppressError(errorReference, sendListener.cause()); |
| 196 | + sendListener.channel().close(); |
| 197 | + latch.countDown(); |
| 198 | + } else { |
| 199 | + sendListener.channel().read(); |
161 | 200 | } |
162 | 201 | }); |
| 202 | + }); |
163 | 203 |
|
164 | | - // Only add CoreProgressAndTimeoutHandler if it will do anything, ex it is reporting progress or is |
165 | | - // applying timeouts. |
166 | | - // This is done to keep the ChannelPipeline shorter, therefore more performant, if this would |
167 | | - // effectively be a no-op. |
168 | | - if (addProgressAndTimeoutHandler) { |
169 | | - channel.pipeline() |
170 | | - .addLast(PROGRESS_AND_TIMEOUT_HANDLER_NAME, new Netty4ProgressAndTimeoutHandler( |
171 | | - progressReporter, writeTimeoutMillis, responseTimeoutMillis, readTimeoutMillis)); |
172 | | - } |
| 204 | + awaitLatch(latch); |
| 205 | + |
| 206 | + ResponseStateInfo info = responseReference.get(); |
| 207 | + if (info == null) { |
| 208 | + throw LOGGER.throwableAtError().log(errorReference.get(), CoreException::from); |
| 209 | + } |
173 | 210 |
|
174 | | - Netty4ResponseHandler responseHandler |
175 | | - = new Netty4ResponseHandler(request, responseReference, errorReference, latch); |
176 | | - channel.pipeline().addLast(responseHandler); |
| 211 | + Response<BinaryData> response; |
| 212 | + if (info.isChannelConsumptionComplete()) { |
| 213 | + // The network response is already complete, handle creating our Response based on the request method and |
| 214 | + // response headers. |
| 215 | + BinaryData body = BinaryData.empty(); |
| 216 | + ByteArrayOutputStream eagerContent = info.getEagerContent(); |
| 217 | + if (info.getResponseBodyHandling() != ResponseBodyHandling.IGNORE && eagerContent.size() > 0) { |
| 218 | + // Set the response body as the first HttpContent received if the request wasn't a HEAD request and |
| 219 | + // there was body content. |
| 220 | + body = BinaryData.fromBytes(eagerContent.toByteArray()); |
| 221 | + } |
177 | 222 |
|
178 | | - Throwable earlyError = errorReference.get(); |
179 | | - if (earlyError != null) { |
180 | | - // If an error occurred between the connect and the request being sent, don't proceed with sending |
181 | | - // the request. |
182 | | - latch.countDown(); |
183 | | - return; |
| 223 | + response = new Response<>(request, info.getStatusCode(), info.getHeaders(), body); |
| 224 | + } else { |
| 225 | + // Otherwise we aren't finished, handle the remaining content according to the documentation in |
| 226 | + // 'channelRead()'. |
| 227 | + BinaryData body = BinaryData.empty(); |
| 228 | + ResponseBodyHandling bodyHandling = info.getResponseBodyHandling(); |
| 229 | + Channel channel = info.getResponseChannel(); |
| 230 | + if (bodyHandling == ResponseBodyHandling.IGNORE) { |
| 231 | + // We're ignoring the response content. |
| 232 | + CountDownLatch drainLatch = new CountDownLatch(1); |
| 233 | + channel.pipeline().addLast(new Netty4EagerConsumeChannelHandler(drainLatch, ignored -> { |
| 234 | + })); |
| 235 | + channel.config().setAutoRead(true); |
| 236 | + awaitLatch(drainLatch); |
| 237 | + } else if (bodyHandling == ResponseBodyHandling.STREAM) { |
| 238 | + // Body streaming uses a special BinaryData that tracks the firstContent read and the Channel it came |
| 239 | + // from so it can be consumed when the BinaryData is being used. |
| 240 | + // autoRead should have been disabled already but lets make sure that it is. |
| 241 | + channel.config().setAutoRead(false); |
| 242 | + String contentLength = info.getHeaders().getValue(HttpHeaderName.CONTENT_LENGTH); |
| 243 | + Long length = null; |
| 244 | + if (!CoreUtils.isNullOrEmpty(contentLength)) { |
| 245 | + try { |
| 246 | + length = Long.parseLong(contentLength); |
| 247 | + } catch (NumberFormatException ignored) { |
| 248 | + // Ignore, we'll just read until the channel is closed. |
| 249 | + } |
184 | 250 | } |
185 | 251 |
|
186 | | - sendRequest(request, channel, addProgressAndTimeoutHandler, errorReference) |
187 | | - .addListener((ChannelFutureListener) sendListener -> { |
188 | | - if (!sendListener.isSuccess()) { |
189 | | - setOrSuppressError(errorReference, sendListener.cause()); |
190 | | - sendListener.channel().close(); |
191 | | - latch.countDown(); |
192 | | - } else { |
193 | | - sendListener.channel().read(); |
194 | | - } |
195 | | - }); |
196 | | - }); |
| 252 | + body = new Netty4ChannelBinaryData(info.getEagerContent(), channel, length); |
| 253 | + } else { |
| 254 | + // All cases otherwise assume BUFFER. |
| 255 | + CountDownLatch drainLatch = new CountDownLatch(1); |
| 256 | + channel.pipeline().addLast(new Netty4EagerConsumeChannelHandler(drainLatch, buf -> { |
| 257 | + try { |
| 258 | + buf.readBytes(info.getEagerContent(), buf.readableBytes()); |
| 259 | + } catch (IOException ex) { |
| 260 | + throw LOGGER.throwableAtError().log(ex, CoreException::from); |
| 261 | + } |
| 262 | + })); |
| 263 | + channel.config().setAutoRead(true); |
| 264 | + awaitLatch(drainLatch); |
197 | 265 |
|
198 | | - latch.await(); |
199 | | - } catch (InterruptedException e) { |
200 | | - Thread.currentThread().interrupt(); |
201 | | - throw LOGGER.throwableAtError().log("Request interrupted.", e, CoreException::from); |
202 | | - } |
| 266 | + body = BinaryData.fromBytes(info.getEagerContent().toByteArray()); |
| 267 | + } |
203 | 268 |
|
204 | | - Response<BinaryData> response = responseReference.get(); |
205 | | - if (response == null) { |
206 | | - throw LOGGER.throwableAtError().log(errorReference.get(), CoreException::from); |
| 269 | + response = new Response<>(request, info.getStatusCode(), info.getHeaders(), body); |
207 | 270 | } |
208 | 271 |
|
209 | 272 | if (response.getValue() != BinaryData.empty() |
|
0 commit comments