7777import io .netty .handler .codec .http .HttpUtil ;
7878import io .netty .handler .codec .http .HttpVersion ;
7979import io .netty .handler .proxy .HttpProxyHandler ;
80- import io .netty .handler .ssl .SslContextBuilder ;
80+ import io .netty .handler .ssl .ClientAuth ;
81+ import io .netty .handler .ssl .JdkSslContext ;
8182import io .netty .handler .stream .ChunkedWriteHandler ;
83+ import io .netty .util .concurrent .GenericFutureListener ;
8284import jersey .repackaged .com .google .common .util .concurrent .SettableFuture ;
8385import org .glassfish .jersey .client .ClientProperties ;
8486import org .glassfish .jersey .client .ClientRequest ;
@@ -97,6 +99,7 @@ class NettyConnector implements Connector {
9799
98100 final ExecutorService executorService ;
99101 final EventLoopGroup group ;
102+ final Client client ;
100103
101104 NettyConnector (Client client ) {
102105
@@ -109,6 +112,7 @@ class NettyConnector implements Connector {
109112 }
110113
111114 this .group = new NioEventLoopGroup ();
115+ this .client = client ;
112116 }
113117
114118 @ Override
@@ -173,8 +177,9 @@ protected void initChannel(SocketChannel ch) throws Exception {
173177
174178 // Enable HTTPS if necessary.
175179 if ("https" .equals (requestUri .getScheme ())) {
176- // TODO how to transform client.getSslContext (JDK) to Netty SslContext?
177- p .addLast (SslContextBuilder .forClient ().build ().newHandler (ch .alloc ()));
180+ // making client authentication optional for now; it could be extracted to configurable property
181+ JdkSslContext jdkSslContext = new JdkSslContext (client .getSslContext (), true , ClientAuth .NONE );
182+ p .addLast (jdkSslContext .newHandler (ch .alloc ()));
178183 }
179184
180185 // http proxy
@@ -208,7 +213,20 @@ protected void initChannel(SocketChannel ch) throws Exception {
208213 }
209214
210215 // Make the connection attempt.
211- Channel ch = b .connect (host , port ).sync ().channel ();
216+ final Channel ch = b .connect (host , port ).sync ().channel ();
217+
218+ // guard against prematurely closed channel
219+ final GenericFutureListener <io .netty .util .concurrent .Future <? super Void >> closeListener =
220+ new GenericFutureListener <io .netty .util .concurrent .Future <? super Void >>() {
221+ @ Override
222+ public void operationComplete (io .netty .util .concurrent .Future <? super Void > future ) throws Exception {
223+ if (!settableFuture .isDone ()) {
224+ settableFuture .setException (new IOException ("Channel closed." ));
225+ }
226+ }
227+ };
228+
229+ ch .closeFuture ().addListener (closeListener );
212230
213231 HttpRequest nettyRequest ;
214232
@@ -238,10 +256,9 @@ protected void initChannel(SocketChannel ch) throws Exception {
238256 }
239257 }
240258
241- // Send the HTTP request.
242- ch .writeAndFlush (nettyRequest );
243-
244259 if (jerseyRequest .hasEntity ()) {
260+ // Send the HTTP request.
261+ ch .writeAndFlush (nettyRequest );
245262
246263 final JerseyChunkedInput jerseyChunkedInput = new JerseyChunkedInput (ch );
247264 jerseyRequest .setStreamProvider (new OutboundMessageContext .StreamProvider () {
@@ -260,6 +277,9 @@ public OutputStream getOutputStream(int contentLength) throws IOException {
260277 executorService .execute (new Runnable () {
261278 @ Override
262279 public void run () {
280+ // close listener is not needed any more.
281+ ch .closeFuture ().removeListener (closeListener );
282+
263283 try {
264284 jerseyRequest .writeEntity ();
265285 } catch (IOException e ) {
@@ -270,10 +290,14 @@ public void run() {
270290 });
271291
272292 ch .flush ();
293+ } else {
294+ // close listener is not needed any more.
295+ ch .closeFuture ().removeListener (closeListener );
296+
297+ // Send the HTTP request.
298+ ch .writeAndFlush (nettyRequest );
273299 }
274300
275- // Wait for the server to close the connection.
276- // ch.closeFuture().sync();
277301 } catch (InterruptedException e ) {
278302 settableFuture .setException (e );
279303 return settableFuture ;
0 commit comments