1818
1919public class PollingClient extends AbstractClient {
2020 TopicPoller topicPoller = null ;
21- private HashMap <List <String >, List <String >> users = new HashMap <>();
21+ // private HashMap<List<String>, List<String>> users = new HashMap<>();
2222
2323 private static final Logger log = LoggerFactory .getLogger (PollingClient .class );
2424
@@ -36,19 +36,46 @@ public PollingClient(String subscribeHost, int subscribePort) throws SocketExcep
3636
3737 @ Override
3838 protected boolean doReconnect (Site site ) {
39- try {
40- Thread .sleep (1000 );
41- BlockingQueue <List <IMessage >> queue = subscribeInternal (site .host , site .port , site .tableName , site .actionName , (MessageHandler ) null , site .msgId + 1 , true , site .filter , site .deserializer , site .allowExistTopic , site .userName , site .passWord , site .msgAstable );
42- log .info ("Successfully reconnected and subscribed " + site .host + ":" + site .port + ":" + site .tableName );
43- topicPoller .setQueue (queue );
44- return true ;
45- } catch (Exception ex ) {
46- log .error ("Unable to subscribe table. Will try again after 1 seconds." );
47- ex .printStackTrace ();
48- return false ;
39+ if (!AbstractClient .ifUseBackupSite ) {
40+ // not enable backupSite, use original logic.
41+ try {
42+ log .info ("PollingClient doReconnect: " + site .host + ":" + site .port );
43+ Thread .sleep (1000 );
44+ BlockingQueue <List <IMessage >> queue = subscribeInternal (site .host , site .port , site .tableName , site .actionName , (MessageHandler ) null , site .msgId + 1 , true , site .filter , site .deserializer , site .allowExistTopic , site .userName , site .passWord , site .msgAstable );
45+ log .info ("Successfully reconnected and subscribed " + site .host + ":" + site .port + ":" + site .tableName );
46+ topicPoller .setQueue (queue );
47+ return true ;
48+ } catch (Exception ex ) {
49+ log .error ("Unable to subscribe table. Will try again after 1 seconds." );
50+ ex .printStackTrace ();
51+ return false ;
52+ }
53+ } else {
54+ // enable backupSite, try to switch site and subscribe.
55+ try {
56+ log .info ("PollingClient doReconnect: " + site .host + ":" + site .port );
57+ Thread .sleep (1000 );
58+ subscribe (site .host , site .port , site .tableName , site .actionName , (MessageHandler ) null , site .msgId + 1 , true , site .filter , site .deserializer , site .allowExistTopic , site .userName , site .passWord , site .msgAstable , false );
59+ String topicStr = site .host + ":" + site .port + "/" + site .tableName + "/" + site .actionName ;
60+ String curTopic = tableNameToTrueTopic .get (topicStr );
61+ BlockingQueue <List <IMessage >> queue = queueManager .addQueue (curTopic );
62+ queueManager .changeQueue (curTopic , lastQueue );
63+
64+ log .info ("Successfully reconnected and subscribed " + site .host + ":" + site .port + ":" + site .tableName );
65+ topicPoller .setQueue (lastQueue );
66+ return true ;
67+ } catch (Exception ex ) {
68+ log .error ("Unable to subscribe table. Will try again after 1 seconds." );
69+ ex .printStackTrace ();
70+ return false ;
71+ }
4972 }
5073 }
5174
75+ protected void subscribe (String host , int port , String tableName , String actionName , MessageHandler handler , long offset , boolean reconnect , Vector filter , StreamDeserializer deserializer , boolean allowExistTopic , String userName , String password , boolean msgAsTable , boolean createSubInfo ) throws IOException {
76+ BlockingQueue <List <IMessage >> queue = subscribeInternal (host , port , tableName , actionName , handler , offset , reconnect , filter , deserializer , allowExistTopic , userName , password , false , createSubInfo );
77+ }
78+
5279 public TopicPoller subscribe (String host , int port , String tableName , String actionName , long offset , boolean reconnect , Vector filter , StreamDeserializer deserializer , String userName , String passWord ) throws IOException {
5380 return subscribe (host , port , tableName , actionName , offset , reconnect , filter , deserializer , userName , passWord , false );
5481 }
@@ -57,7 +84,23 @@ public TopicPoller subscribe(String host, int port, String tableName, String act
5784 BlockingQueue <List <IMessage >> queue = subscribeInternal (host , port , tableName , actionName , (MessageHandler ) null , offset , reconnect , filter , deserializer , false , userName , passWord , msgAsTable );
5885 List <String > tp = Arrays .asList (host , String .valueOf (port ), tableName , actionName );
5986 List <String > usr = Arrays .asList (userName , passWord );
60- users .put (tp , usr );
87+ // users.put(tp, usr);
88+ topicPoller = new TopicPoller (queue );
89+ return topicPoller ;
90+ }
91+
92+ public TopicPoller subscribe (String host , int port , String tableName , String actionName , long offset , boolean reconnect , Vector filter , StreamDeserializer deserializer , String userName , String passWord , boolean msgAsTable , List <String > backupSites , int resubTimeout , boolean subOnce ) throws IOException {
93+ if (resubTimeout < 0 )
94+ // resubTimeout default: 100ms
95+ resubTimeout = 100 ;
96+
97+ BlockingQueue <List <IMessage >> queue = subscribeInternal (host , port , tableName , actionName , (MessageHandler ) null , offset , reconnect , filter , deserializer , false , userName , passWord , msgAsTable , backupSites , resubTimeout , subOnce );
98+ topicPoller = new TopicPoller (queue );
99+ return topicPoller ;
100+ }
101+
102+ public TopicPoller subscribe (String host , int port , String tableName , String actionName , long offset , boolean reconnect , Vector filter , StreamDeserializer deserializer , String userName , String passWord , boolean msgAsTable , List <String > backupSites ) throws IOException {
103+ BlockingQueue <List <IMessage >> queue = subscribeInternal (host , port , tableName , actionName , (MessageHandler ) null , offset , reconnect , filter , deserializer , false , userName , passWord , msgAsTable , backupSites , 100 , false );
61104 topicPoller = new TopicPoller (queue );
62105 return topicPoller ;
63106 }
@@ -120,47 +163,59 @@ public void unsubscribe(String host, int port, String tableName) throws IOExcept
120163
121164 @ Override
122165 protected void unsubscribeInternal (String host , int port , String tableName , String actionName ) throws IOException {
123- DBConnection dbConn = new DBConnection ();
124- List <String > tp = Arrays .asList (host , String .valueOf (port ), tableName , actionName );
125- List <String > usr = users .get (tp );
126- String user = usr .get (0 );
127- String pwd = usr .get (1 );
128- if (!user .equals ("" ))
129- dbConn .connect (host , port , user , pwd );
130- else
131- dbConn .connect (host , port );
132- try {
133- String localIP = this .listeningHost ;
134- if (localIP .equals ("" ))
135- localIP = dbConn .getLocalAddress ().getHostAddress ();
136- List <Entity > params = new ArrayList <Entity >();
137- params .add (new BasicString (localIP ));
138- params .add (new BasicInt (this .listeningPort ));
139- params .add (new BasicString (tableName ));
140- params .add (new BasicString (actionName ));
141-
142- dbConn .run ("stopPublishTable" , params );
143- String topic = null ;
144- String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName ;
145- synchronized (tableNameToTrueTopic ) {
146- topic = tableNameToTrueTopic .get (fullTableName );
166+ synchronized (this ) {
167+ DBConnection dbConn = new DBConnection ();
168+ if (!currentSiteIndexMap .isEmpty ()) {
169+ String topic = tableNameToTrueTopic .get ( host + ":" + port + "/" + tableName + "/" + actionName );
170+ Integer currentSiteIndex = currentSiteIndexMap .get (topic );
171+ Site [] sites = trueTopicToSites .get (topic );
172+ host = sites [currentSiteIndex ].host ;
173+ port = sites [currentSiteIndex ].port ;
147174 }
148- synchronized (trueTopicToSites ) {
175+
176+ List <String > tp = Arrays .asList (host , String .valueOf (port ), tableName , actionName );
177+ List <String > usr = users .get (tp );
178+ String user = usr .get (0 );
179+ String pwd = usr .get (1 );
180+ if (!user .equals ("" ))
181+ dbConn .connect (host , port , user , pwd );
182+ else
183+ dbConn .connect (host , port );
184+ try {
185+ String localIP = this .listeningHost ;
186+ if (localIP .equals ("" ))
187+ localIP = dbConn .getLocalAddress ().getHostAddress ();
188+ List <Entity > params = new ArrayList <Entity >();
189+ params .add (new BasicString (localIP ));
190+ params .add (new BasicInt (this .listeningPort ));
191+ params .add (new BasicString (tableName ));
192+ params .add (new BasicString (actionName ));
193+
194+ dbConn .run ("stopPublishTable" , params );
195+ String topic = null ;
196+ String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName ;
197+ topic = tableNameToTrueTopic .get (fullTableName );
198+
149199 Site [] sites = trueTopicToSites .get (topic );
150200 if (sites == null || sites .length == 0 )
151201 ;
152202 for (int i = 0 ; i < sites .length ; i ++)
153203 sites [i ].closed = true ;
154- }
155- synchronized (queueManager ) {
204+
156205 queueManager .removeQueue (topic );
206+
207+ // init backupSites related params.
208+ if (AbstractClient .ifUseBackupSite ) {
209+ AbstractClient .ifUseBackupSite = false ;
210+ AbstractClient .subOnce = false ;
211+ AbstractClient .resubTimeout = 100 ;
212+ }
213+ log .info ("Successfully unsubscribed table " + fullTableName );
214+ } catch (Exception ex ) {
215+ throw ex ;
216+ } finally {
217+ dbConn .close ();
157218 }
158- log .info ("Successfully unsubscribed table " + fullTableName );
159- } catch (Exception ex ) {
160- throw ex ;
161- } finally {
162- dbConn .close ();
163219 }
164- return ;
165220 }
166221}
0 commit comments