77import io .clientcore .core .http .models .HttpHeaderName ;
88import io .clientcore .core .http .models .HttpRequest ;
99import io .clientcore .core .http .models .Response ;
10+ import io .clientcore .core .http .models .ServerSentEventListener ;
1011import io .clientcore .core .instrumentation .logging .ClientLogger ;
1112import io .clientcore .core .models .CoreException ;
13+ import io .clientcore .core .models .ServerSentResult ;
1214import io .clientcore .core .models .binarydata .BinaryData ;
1315import io .clientcore .core .models .binarydata .FileBinaryData ;
1416import io .clientcore .core .models .binarydata .InputStreamBinaryData ;
1517import io .clientcore .core .utils .AuthenticateChallenge ;
1618import io .clientcore .core .utils .ProgressReporter ;
19+ import io .clientcore .core .utils .ServerSentEventUtils ;
1720import io .clientcore .http .netty4 .implementation .ChannelInitializationProxyHandler ;
1821import io .clientcore .http .netty4 .implementation .Netty4ProgressAndTimeoutHandler ;
1922import io .clientcore .http .netty4 .implementation .Netty4ResponseHandler ;
3134import io .netty .handler .codec .http .DefaultHttpRequest ;
3235import io .netty .handler .codec .http .HttpMethod ;
3336import io .netty .handler .codec .http .HttpVersion ;
37+ import io .netty .handler .proxy .ProxyHandler ;
3438import io .netty .handler .ssl .SslContext ;
3539import io .netty .handler .ssl .SslContextBuilder ;
3640import io .netty .handler .stream .ChunkedInput ;
4751import java .util .concurrent .CountDownLatch ;
4852import java .util .concurrent .atomic .AtomicReference ;
4953
54+ import static io .clientcore .core .utils .ServerSentEventUtils .attemptRetry ;
55+ import static io .clientcore .core .utils .ServerSentEventUtils .processTextEventStream ;
5056import static io .clientcore .http .netty4 .implementation .Netty4Utility .PROGRESS_AND_TIMEOUT_HANDLER_NAME ;
5157import static io .clientcore .http .netty4 .implementation .Netty4Utility .createCodec ;
58+ import static io .clientcore .http .netty4 .implementation .Netty4Utility .setOrSuppressError ;
5259import static io .netty .handler .codec .http .DefaultHttpHeadersFactory .trailersFactory ;
5360
5461/**
5764class NettyHttpClient implements HttpClient {
5865 private static final ClientLogger LOGGER = new ClientLogger (NettyHttpClient .class );
5966
67+ /**
68+ * Error message for when no {@link ServerSentEventListener} is attached to the {@link HttpRequest}.
69+ */
70+ private static final String NO_LISTENER_ERROR_MESSAGE
71+ = "No ServerSentEventListener attached to HttpRequest to handle the text/event-stream response" ;
72+
6073 private final Bootstrap bootstrap ;
6174 private final SslContext sslContext ;
6275 private final ChannelInitializationProxyHandler channelInitializationProxyHandler ;
@@ -93,14 +106,26 @@ public Response<BinaryData> send(HttpRequest request) {
93106 boolean addProgressAndTimeoutHandler
94107 = progressReporter != null || writeTimeoutMillis > 0 || responseTimeoutMillis > 0 || readTimeoutMillis > 0 ;
95108
109+ AtomicReference <Response <BinaryData >> responseReference = new AtomicReference <>();
110+ AtomicReference <Throwable > errorReference = new AtomicReference <>();
111+ CountDownLatch latch = new CountDownLatch (1 );
112+
96113 // Configure an immutable ChannelInitializer in the builder with everything that can be added on a non-per
97114 // request basis.
98115 bootstrap .handler (new ChannelInitializer <Channel >() {
99116 @ Override
100117 protected void initChannel (Channel ch ) throws SSLException {
101118 // Test whether proxying should be applied to this Channel. If so, add it.
102- if (channelInitializationProxyHandler .test (ch .remoteAddress ())) {
103- ch .pipeline ().addFirst (channelInitializationProxyHandler .createProxy (proxyChallenges ));
119+ boolean hasProxy = channelInitializationProxyHandler .test (ch .remoteAddress ());
120+ if (hasProxy ) {
121+ ProxyHandler proxyHandler = channelInitializationProxyHandler .createProxy (proxyChallenges );
122+ proxyHandler .connectFuture ().addListener (future -> {
123+ if (!future .isSuccess ()) {
124+ setOrSuppressError (errorReference , future .cause ());
125+ }
126+ });
127+
128+ ch .pipeline ().addFirst (proxyHandler );
104129 }
105130
106131 // Add SSL handling if the request is HTTPS.
@@ -118,35 +143,56 @@ protected void initChannel(Channel ch) throws SSLException {
118143 }
119144 });
120145
121- AtomicReference <Response <BinaryData >> responseReference = new AtomicReference <>();
122- AtomicReference <Throwable > errorReference = new AtomicReference <>();
123- CountDownLatch latch = new CountDownLatch (1 );
124-
125146 try {
126- Channel channel = bootstrap .connect (host , port ).sync ().channel ();
127-
128- // Only add CoreProgressAndTimeoutHandler if it will do anything, ex it is reporting progress or is
129- // applying timeouts.
130- // This is done to keep the ChannelPipeline shorter, therefore more performant, if this would
131- // effectively be a no-op.
132- if (addProgressAndTimeoutHandler ) {
133- channel .pipeline ()
134- .addLast (PROGRESS_AND_TIMEOUT_HANDLER_NAME , new Netty4ProgressAndTimeoutHandler (progressReporter ,
135- writeTimeoutMillis , responseTimeoutMillis , readTimeoutMillis ));
136- }
147+ bootstrap .connect (host , port ).addListener ((ChannelFutureListener ) connectListener -> {
148+ if (!connectListener .isSuccess ()) {
149+ LOGGER .atError ().setThrowable (connectListener .cause ()).log ("Failed to send request" );
150+ errorReference .set (connectListener .cause ());
151+ connectListener .channel ().close ();
152+ latch .countDown ();
153+ return ;
154+ }
137155
138- Netty4ResponseHandler responseHandler = new Netty4ResponseHandler (request , responseReference , latch );
139- channel .pipeline ().addLast (responseHandler );
156+ Channel channel = connectListener .channel ();
157+ channel .closeFuture ().addListener (closeListener -> {
158+ if (!closeListener .isSuccess ()) {
159+ LOGGER .atError ().setThrowable (closeListener .cause ()).log ("Channel closed with error" );
160+ setOrSuppressError (errorReference , closeListener .cause ());
161+ }
162+ });
163+
164+ // Only add CoreProgressAndTimeoutHandler if it will do anything, ex it is reporting progress or is
165+ // applying timeouts.
166+ // This is done to keep the ChannelPipeline shorter, therefore more performant, if this would
167+ // effectively be a no-op.
168+ if (addProgressAndTimeoutHandler ) {
169+ channel .pipeline ()
170+ .addLast (PROGRESS_AND_TIMEOUT_HANDLER_NAME , new Netty4ProgressAndTimeoutHandler (
171+ progressReporter , writeTimeoutMillis , responseTimeoutMillis , readTimeoutMillis ));
172+ }
140173
141- sendRequest (request , channel , addProgressAndTimeoutHandler ).addListener ((ChannelFutureListener ) future -> {
142- if (!future .isSuccess ()) {
143- LOGGER .atError ().setThrowable (future .cause ()).log ("Failed to send request" );
144- errorReference .set (future .cause ());
145- future .channel ().close ();
174+ Netty4ResponseHandler responseHandler
175+ = new Netty4ResponseHandler (request , responseReference , errorReference , latch );
176+ channel .pipeline ().addLast (responseHandler );
177+
178+ Throwable earlyError = errorReference .get ();
179+ if (earlyError != null ) {
180+ // If an error occurred between the connect and the request being sent, don't proceed with sending
181+ // the request.
146182 latch .countDown ();
147- } else {
148- future .channel ().read ();
183+ return ;
149184 }
185+
186+ sendRequest (request , channel , addProgressAndTimeoutHandler , errorReference )
187+ .addListener ((ChannelFutureListener ) sendListener -> {
188+ if (!sendListener .isSuccess ()) {
189+ setOrSuppressError (errorReference , sendListener .cause ());
190+ sendListener .channel ().close ();
191+ latch .countDown ();
192+ } else {
193+ sendListener .channel ().read ();
194+ }
195+ });
150196 });
151197
152198 latch .await ();
@@ -159,10 +205,42 @@ protected void initChannel(Channel ch) throws SSLException {
159205 if (response == null ) {
160206 throw CoreException .from (errorReference .get ());
161207 }
208+
209+ if (response .getValue () != BinaryData .empty ()
210+ && ServerSentEventUtils
211+ .isTextEventStreamContentType (response .getHeaders ().getValue (HttpHeaderName .CONTENT_TYPE ))) {
212+ ServerSentEventListener listener = request .getServerSentEventListener ();
213+
214+ if (listener != null ) {
215+ try {
216+ ServerSentResult serverSentResult
217+ = processTextEventStream (response .getValue ().toStream (), listener );
218+
219+ if (serverSentResult .getException () != null ) {
220+ // If an exception occurred while processing the text event stream, emit listener onError.
221+ listener .onError (serverSentResult .getException ());
222+ }
223+
224+ // If an error occurred or we want to reconnect
225+ if (!Thread .currentThread ().isInterrupted () && attemptRetry (serverSentResult , request )) {
226+ return this .send (request );
227+ }
228+
229+ response = new Response <>(response .getRequest (), response .getStatusCode (), response .getHeaders (),
230+ createBodyFromServerSentResult (serverSentResult ));
231+ } catch (IOException ex ) {
232+ throw LOGGER .logThrowableAsError (CoreException .from (ex ));
233+ }
234+ } else {
235+ throw LOGGER .logThrowableAsError (new IllegalStateException (NO_LISTENER_ERROR_MESSAGE ));
236+ }
237+ }
238+
162239 return response ;
163240 }
164241
165- private ChannelFuture sendRequest (HttpRequest request , Channel channel , boolean progressAndTimeoutHandlerAdded ) {
242+ private ChannelFuture sendRequest (HttpRequest request , Channel channel , boolean progressAndTimeoutHandlerAdded ,
243+ AtomicReference <Throwable > errorReference ) {
166244 HttpMethod nettyMethod = HttpMethod .valueOf (request .getHttpMethod ().toString ());
167245 String uri = request .getUri ().toString ();
168246 WrappedHttpHeaders wrappedHttpHeaders = new WrappedHttpHeaders (request .getHeaders ());
@@ -179,35 +257,41 @@ private ChannelFuture sendRequest(HttpRequest request, Channel channel, boolean
179257 new ChunkedNioFile (FileChannel .open (fileBinaryData .getFile (), StandardOpenOption .READ ),
180258 fileBinaryData .getPosition (), fileBinaryData .getLength (), 8192 ),
181259 new DefaultHttpRequest (HttpVersion .HTTP_1_1 , nettyMethod , uri , wrappedHttpHeaders ),
182- progressAndTimeoutHandlerAdded );
260+ progressAndTimeoutHandlerAdded , errorReference );
183261 } catch (IOException ex ) {
184262 return channel .newFailedFuture (ex );
185263 }
186264 } else if (requestBody instanceof InputStreamBinaryData ) {
187265 return sendChunked (channel , new ChunkedStream (requestBody .toStream ()),
188266 new DefaultHttpRequest (HttpVersion .HTTP_1_1 , nettyMethod , uri , wrappedHttpHeaders ),
189- progressAndTimeoutHandlerAdded );
190- }
267+ progressAndTimeoutHandlerAdded , errorReference );
268+ } else {
269+ ByteBuf body = Unpooled .EMPTY_BUFFER ;
270+ if (requestBody != null && requestBody != BinaryData .empty ()) {
271+ // Longer term, see if there is a way to have BinaryData act as the ByteBuf body to further eliminate
272+ // copying of byte[]s.
273+ body = Unpooled .wrappedBuffer (requestBody .toBytes ());
274+ }
275+ if (body .readableBytes () > 0 ) {
276+ // TODO (alzimmer): Should we be setting Content-Length here again? Shouldn't this be handled externally
277+ // by the creator of the HttpRequest?
278+ wrappedHttpHeaders .getCoreHeaders ()
279+ .set (HttpHeaderName .CONTENT_LENGTH , String .valueOf (body .readableBytes ()));
280+ }
191281
192- ByteBuf body = Unpooled .EMPTY_BUFFER ;
193- if (requestBody != null && requestBody != BinaryData .empty ()) {
194- // Longer term, see if there is a way to have BinaryData act as the ByteBuf body to further eliminate
195- // copying of byte[]s.
196- body = Unpooled .wrappedBuffer (requestBody .toBytes ());
197- }
198- if (body .readableBytes () > 0 ) {
199- // TODO (alzimmer): Should we be setting Content-Length here again? Shouldn't this be handled externally
200- // by the creator of the HttpRequest?
201- wrappedHttpHeaders .getCoreHeaders ()
202- .set (HttpHeaderName .CONTENT_LENGTH , String .valueOf (body .readableBytes ()));
203- }
282+ Throwable error = errorReference .get ();
283+ if (error != null ) {
284+ return channel .newFailedFuture (error );
285+ }
204286
205- return channel .writeAndFlush (new DefaultFullHttpRequest (HttpVersion .HTTP_1_1 , nettyMethod , uri , body ,
206- wrappedHttpHeaders , trailersFactory ().newHeaders ()));
287+ return channel .writeAndFlush (new DefaultFullHttpRequest (HttpVersion .HTTP_1_1 , nettyMethod , uri , body ,
288+ wrappedHttpHeaders , trailersFactory ().newHeaders ()));
289+ }
207290 }
208291
209292 private <T > ChannelFuture sendChunked (Channel channel , ChunkedInput <T > chunkedInput ,
210- io .netty .handler .codec .http .HttpRequest initialLineAndHeaders , boolean progressAndTimeoutHandlerAdded ) {
293+ io .netty .handler .codec .http .HttpRequest initialLineAndHeaders , boolean progressAndTimeoutHandlerAdded ,
294+ AtomicReference <Throwable > errorReference ) {
211295 if (channel .pipeline ().get (ChunkedWriteHandler .class ) == null ) {
212296 // Add the ChunkedWriteHandler which will handle sending the chunkedInput.
213297 ChunkedWriteHandler chunkedWriteHandler = new ChunkedWriteHandler ();
@@ -218,6 +302,11 @@ private <T> ChannelFuture sendChunked(Channel channel, ChunkedInput<T> chunkedIn
218302 }
219303 }
220304
305+ Throwable error = errorReference .get ();
306+ if (error != null ) {
307+ return channel .newFailedFuture (error );
308+ }
309+
221310 channel .write (initialLineAndHeaders );
222311 return channel .writeAndFlush (chunkedInput );
223312 }
@@ -228,4 +317,10 @@ public void close() {
228317 group .shutdownGracefully ();
229318 }
230319 }
320+
321+ private static BinaryData createBodyFromServerSentResult (ServerSentResult serverSentResult ) {
322+ return (serverSentResult != null && serverSentResult .getData () != null )
323+ ? BinaryData .fromString (String .join ("\n " , serverSentResult .getData ()))
324+ : BinaryData .empty ();
325+ }
231326}
0 commit comments