55import java .net .InetAddress ;
66import java .net .InetSocketAddress ;
77import java .net .Socket ;
8+ import java .rmi .RemoteException ;
89import java .security .PublicKey ;
910import java .security .cert .CertificateException ;
1011import java .security .cert .X509Certificate ;
@@ -55,6 +56,7 @@ public class DBConnection {
5556 private long runSeqNo_ = 0 ;
5657 private int [] serverVersion_ ;
5758 private boolean isReverseStreaming_ = false ;
59+ private int tryReconnectNums = -1 ;
5860
5961 private static final Logger log = LoggerFactory .getLogger (DBConnection .class );
6062
@@ -189,7 +191,6 @@ private boolean connect()throws IOException{
189191 socket_ .connect (new InetSocketAddress (hostName_ ,port_ ), 3000 );
190192 }
191193 } catch (ConnectException ex ) {
192- log .error ("com.xxdb.DBConnection.DBConnectionImpl.connect() has exception. Current node hostName: " + this .hostName_ + ", port: " + this .port_ );
193194 log .error ("Connect to " + this .hostName_ + ":" + this .port_ + " failed." );
194195 throw ex ;
195196 }
@@ -668,6 +669,11 @@ public boolean connect(String hostName, int port, int timeout, boolean reconnect
668669 return connect (hostName , port , "" , "" , null , false , null , reconnect );
669670 }
670671
672+ public boolean connect (String hostName , int port , int timeout , boolean reconnect , int tryReconnectNums ) throws IOException {
673+ this .connTimeout_ = timeout ;
674+ return connect (hostName , port , "" , "" , null , false , null , reconnect , tryReconnectNums );
675+ }
676+
671677 public boolean connect (String hostName , int port , String initialScript ) throws IOException {
672678 return connect (hostName , port , "" , "" , initialScript , false , null );
673679 }
@@ -723,14 +729,32 @@ public boolean connect(String hostName, int port, String userId, String password
723729 return connect (hostName , port , userId , password , initialScript , enableHighAvailability , highAvailabilitySites , reconnect , false );
724730 }
725731
732+ public boolean connect (String hostName , int port , String userId , String password , String initialScript , boolean enableHighAvailability , String [] highAvailabilitySites , boolean reconnect , int tryReconnectNums ) throws IOException {
733+ if (enableHighAvailability )
734+ return connect (hostName , port , userId , password , initialScript , enableHighAvailability , highAvailabilitySites , reconnect , true , tryReconnectNums );
735+ else
736+ return connect (hostName , port , userId , password , initialScript , enableHighAvailability , highAvailabilitySites , reconnect , false , tryReconnectNums );
737+ }
738+
726739 public boolean connect (String hostName , int port , String userId , String password , String initialScript , boolean enableHighAvailability , String [] highAvailabilitySites , boolean reconnect , boolean enableLoadBalance ) throws IOException {
740+ return connect (hostName , port , userId , password , initialScript , enableHighAvailability , highAvailabilitySites , reconnect , enableLoadBalance , -1 );
741+ }
742+
743+ public boolean connect (String hostName , int port , String userId , String password , String initialScript , boolean enableHighAvailability , String [] highAvailabilitySites , boolean reconnect , boolean enableLoadBalance , int tryReconnectNums ) throws IOException {
727744 mutex_ .lock ();
728745 try {
729746 this .uid_ = userId ;
730747 this .pwd_ = password ;
731748 this .initialScript_ = initialScript ;
732749 this .enableHighAvailability_ = enableHighAvailability ;
733750 this .loadBalance_ = enableLoadBalance ;
751+ if (tryReconnectNums <= 0 ) {
752+ this .tryReconnectNums = -1 ;
753+ log .warn ("If the param 'tryReconnectNums' less than or equal to 0, when reconnect will be unlimited attempts." );
754+ } else {
755+ this .tryReconnectNums = tryReconnectNums ;
756+ }
757+
734758 if (this .loadBalance_ && !this .enableHighAvailability_ )
735759 throw new RuntimeException ("Cannot only enable loadbalance but not enable highAvailablity." );
736760
@@ -746,20 +770,48 @@ public boolean connect(String hostName, int port, String userId, String password
746770
747771 Node connectedNode = new Node ();
748772 BasicTable bt = null ;
773+
749774 while (!closed_ ) {
775+ int totalConnectAttemptNums = this .tryReconnectNums * nodes_ .size ();
776+ int attempt = 0 ;
750777 while (!conn_ .isConnected () && !closed_ ) {
751- for (Node one : nodes_ ) {
752- if (connectNode (one )) {
753- connectedNode = one ;
754- break ;
778+ if (this .tryReconnectNums > 0 ) {
779+ // finite try to connect.
780+ for (Node one : nodes_ ) {
781+ attempt ++;
782+ // System.out.println("Current init connect node: " + one.hostName + ":" + one.port);
783+ if (connectNode (one )) {
784+ connectedNode = one ;
785+ break ;
786+ }
787+
788+ try {
789+ Thread .sleep (100 );
790+ } catch (Exception e ){
791+ e .printStackTrace ();
792+ return false ;
793+ }
755794 }
756795
757- try {
758- Thread .sleep (100 );
759- } catch (Exception e ){
760- e .printStackTrace ();
796+ if (attempt == totalConnectAttemptNums ) {
797+ log .error ("Connect failed after " + tryReconnectNums + " reconnect attemps for every node in high availability sites." );
761798 return false ;
762799 }
800+ } else {
801+ // infinite try to connect.
802+ for (Node one : nodes_ ) {
803+ if (connectNode (one )) {
804+ connectedNode = one ;
805+ break ;
806+ }
807+
808+ try {
809+ Thread .sleep (100 );
810+ } catch (Exception e ){
811+ e .printStackTrace ();
812+ return false ;
813+ }
814+ }
763815 }
764816 }
765817
@@ -882,30 +934,44 @@ private void initConnection() throws IOException{
882934 }
883935
884936 public void switchDataNode (Node node ) throws IOException {
937+ int attempt = 0 ;
938+ boolean isConnected = false ;
885939 do {
886- if (node .hostName != null && node .hostName .length () > 0 ){
887- if (connectNode (node )){
940+ attempt ++;
941+ if (node .hostName != null && node .hostName .length () > 0 ) {
942+ if (connectNode (node )) {
888943 log .info ("Switch to node: " + node .hostName + ":" + node .port + " successfully." );
944+ isConnected = true ;
889945 break ;
890946 }
891947 }
948+
892949 if (nodes_ .isEmpty ()){
893950 log .error ("com.xxdb.DBConnection.switchDataNode nodes_ is empty. Current node hostName: " + node .hostName + ", port: " + node .port );
894951 log .error ("Connect to " + node .hostName + ":" + node .port + " failed." );
895952 throw new RuntimeException ("Connect to " + node .hostName + ":" + node .port + " failed." );
896953 }
897- int index = nodeRandom_ .nextInt (nodes_ .size ());
898- if (connectNode (nodes_ .get (index ))){
899- log .info ("Switch to node: " + nodes_ .get (index ).hostName + ":" + nodes_ .get (index ).port + " successfully." );
900- break ;
954+
955+ if (nodes_ .size () > 1 ) {
956+ int index = nodeRandom_ .nextInt (nodes_ .size ());
957+ if (connectNode (nodes_ .get (index ))){
958+ log .info ("Switch to node: " + nodes_ .get (index ).hostName + ":" + nodes_ .get (index ).port + " successfully." );
959+ isConnected = true ;
960+ break ;
961+ }
901962 }
963+
902964 try {
903965 Thread .sleep (1000 );
904- }catch (Exception e ){
966+ } catch (Exception e ){
905967 e .printStackTrace ();
906968 return ;
907969 }
908- }while (!closed_ );
970+ } while (!closed_ && (tryReconnectNums == -1 || attempt < tryReconnectNums ));
971+
972+ if (!closed_ && !isConnected )
973+ throw new RuntimeException ("Connect to " + node .hostName + ":" + node .port + " failed after " + attempt + " reconnect attemps." );
974+
909975 if (initialScript_ !=null && initialScript_ .length () > 0 ){
910976 run (initialScript_ );
911977 }
0 commit comments