200200import org .apache .commons .pool2 .impl .DefaultPooledObject ;
201201import org .apache .thrift .TException ;
202202import org .apache .thrift .transport .TTransport ;
203- import org .apache .thrift .transport .TTransportException ;
204203import org .slf4j .Logger ;
205204import org .slf4j .LoggerFactory ;
206205
206+ import javax .net .ssl .SSLHandshakeException ;
207+
208+ import java .io .IOException ;
207209import java .nio .ByteBuffer ;
208210import java .util .ArrayList ;
209211import java .util .List ;
@@ -269,27 +271,23 @@ public ConfigNodeClient(
269271 }
270272
271273 public void connect (TEndPoint endpoint , int timeoutMs ) throws TException {
272- try {
273- transport =
274- commonConfig .isEnableInternalSSL ()
275- ? DeepCopyRpcTransportFactory .INSTANCE .getTransport (
276- endpoint .getIp (),
277- endpoint .getPort (),
278- timeoutMs ,
279- commonConfig .getTrustStorePath (),
280- commonConfig .getTrustStorePwd (),
281- commonConfig .getKeyStorePath (),
282- commonConfig .getKeyStorePwd ())
283- : DeepCopyRpcTransportFactory .INSTANCE .getTransport (
284- // As there is a try-catch already, we do not need to use TSocket.wrap
285- endpoint .getIp (), endpoint .getPort (), timeoutMs );
286- if (!transport .isOpen ()) {
287- transport .open ();
288- }
289- configNode = endpoint ;
290- } catch (TTransportException e ) {
291- throw new TException (e );
274+ transport =
275+ commonConfig .isEnableInternalSSL ()
276+ ? DeepCopyRpcTransportFactory .INSTANCE .getTransport (
277+ endpoint .getIp (),
278+ endpoint .getPort (),
279+ timeoutMs ,
280+ commonConfig .getTrustStorePath (),
281+ commonConfig .getTrustStorePwd (),
282+ commonConfig .getKeyStorePath (),
283+ commonConfig .getKeyStorePwd ())
284+ : DeepCopyRpcTransportFactory .INSTANCE .getTransport (
285+ // As there is a try-catch already, we do not need to use TSocket.wrap
286+ endpoint .getIp (), endpoint .getPort (), timeoutMs );
287+ if (!transport .isOpen ()) {
288+ transport .open ();
292289 }
290+ configNode = endpoint ;
293291
294292 client = new IConfigNodeRPCService .Client (property .getProtocolFactory ().getProtocol (transport ));
295293 }
@@ -315,13 +313,15 @@ private void connectAndSync(int timeoutMs) throws TException {
315313 }
316314
317315 private void tryToConnect (int timeoutMs ) throws TException {
316+ TException exception = null ;
318317 if (configLeader != null ) {
319318 try {
320319 connect (configLeader , timeoutMs );
321320 return ;
322- } catch (TException ignore ) {
321+ } catch (TException e ) {
323322 logger .warn ("The current node leader may have been down {}, try next node" , configLeader );
324323 configLeader = null ;
324+ exception = e ;
325325 }
326326 } else {
327327 try {
@@ -344,10 +344,17 @@ private void tryToConnect(int timeoutMs) throws TException {
344344 try {
345345 connect (tryEndpoint , timeoutMs );
346346 return ;
347- } catch (TException ignore ) {
347+ } catch (TException e ) {
348348 logger .warn ("The current node may have been down {},try next node" , tryEndpoint );
349+ exception = e ;
349350 }
350351 }
352+ if (exception != null
353+ && exception .getCause () != null
354+ && exception .getCause ().getCause () != null
355+ && exception .getCause ().getCause () instanceof IOException ) {
356+ throw new TException (exception .getCause ().getCause ());
357+ }
351358
352359 throw new TException (MSG_RECONNECTION_FAIL );
353360 }
@@ -433,6 +440,9 @@ private <T> T executeRemoteCallWithRetry(final Operation<T> call, final Predicat
433440 Thread .currentThread ().getStackTrace ()[2 ].getMethodName ());
434441 logger .warn (message , e );
435442 configLeader = null ;
443+ if (e .getCause () != null && e .getCause () instanceof SSLHandshakeException ) {
444+ throw e ;
445+ }
436446 }
437447
438448 // If we have detected all configNodes and still not return
0 commit comments