Skip to content

Commit 84e01d0

Browse files
committed
AJ-951: fix unsubscribe race condition in ThreadPooledClient
1 parent 0635a2f commit 84e01d0

File tree

1 file changed

+71
-30
lines changed

1 file changed

+71
-30
lines changed

src/com/xxdb/streaming/client/ThreadPooledClient.java

Lines changed: 71 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
public class ThreadPooledClient extends AbstractClient {
2323
private static int CORES = Runtime.getRuntime().availableProcessors();
2424
private ExecutorService threadPool;
25-
// private HashMap<String, List<String>> users = new HashMap<>();
26-
private Object lock = new Object();
2725
private int threadCount = -1;
2826

2927
private static final Logger log = LoggerFactory.getLogger(ThreadPooledClient.class);
@@ -169,7 +167,7 @@ protected boolean doReconnect(Site site) {
169167
Thread.currentThread().interrupt();
170168
}
171169

172-
// after last threadPool completely shutdowncreate a new threadPool(because ExecutorService lifecycle is unidirectional).
170+
// after last threadPool completely shutdown, create a new threadPool(because ExecutorService lifecycle is unidirectional)
173171
if (threadPool.isTerminated())
174172
threadPool = Executors.newFixedThreadPool(threadCount);
175173

@@ -333,6 +331,9 @@ public void unsubscribe(String host, int port, String tableName) throws IOExcept
333331

334332
@Override
335333
protected void unsubscribeInternal(String host, int port, String tableName, String actionName) throws IOException {
334+
String originalFullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
335+
log.debug("Starting unsubscribe process for " + originalFullTableName);
336+
336337
if (!ifUseBackupSite) {
337338
// original logic:
338339
DBConnection dbConn = new DBConnection();
@@ -351,6 +352,7 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
351352
}
352353

353354
if (matchedInfo != null) {
355+
log.debug("Found HA matched info, switching from " + host + ":" + port + " to leader " + matchedInfo.getLeaderIp() + ":" + matchedInfo.getLeaderPort());
354356
host = matchedInfo.getLeaderIp();
355357
port = matchedInfo.getLeaderPort();
356358
}
@@ -361,11 +363,34 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
361363
List<String> usr = users.get(tp);
362364
String user = usr.get(0);
363365
String pwd = usr.get(1);
366+
log.debug("Connecting to server " + host + ":" + port + " to send unsubscribe signal for " + originalFullTableName);
364367
if (!user.equals(""))
365368
dbConn.connect(host, port, user, pwd);
366369
else
367370
dbConn.connect(host, port);
371+
log.debug("Connected to server " + host + ":" + port + " successfully");
368372
try {
373+
// Get topic and mark sites as closed BEFORE sending stopPublishTable
374+
// This prevents MessageParser from triggering reconnect when it receives EOF
375+
String topic;
376+
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
377+
synchronized (tableNameToTrueTopic) {
378+
topic = tableNameToTrueTopic.get(fullTableName);
379+
log.debug("Retrieved topic from tableNameToTrueTopic: " + topic + " for " + fullTableName);
380+
}
381+
synchronized (trueTopicToSites) {
382+
Site[] sites = trueTopicToSites.get(topic);
383+
if (sites == null || sites.length == 0) {
384+
log.warn("No sites found for topic: " + topic);
385+
} else {
386+
log.info("Marking " + sites.length + " site(s) as closed for topic: " + topic + " BEFORE sending stopPublishTable");
387+
for (int i = 0; i < sites.length; i++) {
388+
sites[i].closed = true;
389+
log.debug("Site " + i + " marked as closed: " + sites[i].host + ":" + sites[i].port);
390+
}
391+
}
392+
}
393+
369394
String localIP = this.listeningHost;
370395
if(localIP.equals(""))
371396
localIP = dbConn.getLocalAddress().getHostAddress();
@@ -375,37 +400,31 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
375400
params.add(new BasicString(tableName));
376401
params.add(new BasicString(actionName));
377402

403+
log.debug("Sending stopPublishTable command with params: localIP=" + localIP + ", listeningPort=" + this.listeningPort + ", tableName=" + tableName + ", actionName=" + actionName);
378404
dbConn.run("stopPublishTable", params);
379-
String topic = null;
380-
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
381-
synchronized (tableNameToTrueTopic) {
382-
topic = tableNameToTrueTopic.get(fullTableName);
383-
}
384-
synchronized (trueTopicToSites) {
385-
Site[] sites = trueTopicToSites.get(topic);
386-
if (sites == null || sites.length == 0)
387-
;
388-
for (int i = 0; i < sites.length; i++)
389-
sites[i].closed = true;
390-
}
405+
log.debug("stopPublishTable command executed successfully for " + originalFullTableName);
391406
synchronized (queueManager) {
392407
queueManager.removeQueue(topic);
408+
log.debug("Queue removed from queueManager for topic: " + topic);
393409
}
394410

395-
log.info("Successfully unsubscribed table " + fullTableName);
411+
log.debug("Successfully unsubscribed table " + fullTableName);
396412
} catch (Exception ex) {
413+
log.error("Error occurred during unsubscribe for " + originalFullTableName, ex);
397414
throw ex;
398415
} finally {
399416
dbConn.close();
417+
log.debug("DBConnection closed for " + originalFullTableName);
418+
400419
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
401-
QueueHandlerBinder queueHandler =null;
402420
synchronized (queueHandlers){
403-
queueHandler = queueHandlers.get(topicStr);
404421
queueHandlers.remove(topicStr);
405422
}
423+
log.debug("Unsubscribe process completed for " + originalFullTableName);
406424
}
407425
} else {
408426
// use backBackSite
427+
log.debug("Using backupSite mode for unsubscribe");
409428
String originHost = host;
410429
int originPort = port;
411430

@@ -415,6 +434,7 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
415434
String topic = tableNameToTrueTopic.get( host + ":" + port + "/" + tableName + "/" + actionName);
416435
Integer currentSiteIndex = currentSiteIndexMap.get(topic);
417436
Site[] sites = trueTopicToSites.get(topic);
437+
log.debug("Switching to current site index " + currentSiteIndex + ": " + sites[currentSiteIndex].host + ":" + sites[currentSiteIndex].port);
418438
host = sites[currentSiteIndex].host;
419439
port = sites[currentSiteIndex].port;
420440
}
@@ -433,6 +453,7 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
433453
}
434454

435455
if (matchedInfo != null) {
456+
log.debug("Found HA matched info, switching from " + host + ":" + port + " to leader " + matchedInfo.getLeaderIp() + ":" + matchedInfo.getLeaderPort());
436457
host = matchedInfo.getLeaderIp();
437458
port = matchedInfo.getLeaderPort();
438459
}
@@ -443,11 +464,34 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
443464
List<String> usr = users.get(tp);
444465
String user = usr.get(0);
445466
String pwd = usr.get(1);
467+
468+
log.debug("Connecting to server " + host + ":" + port + " to send unsubscribe signal (backupSite mode) for " + originalFullTableName);
446469
if (!user.equals(""))
447470
dbConn.connect(host, port, user, pwd);
448471
else
449472
dbConn.connect(host, port);
473+
474+
log.debug("Connected to server " + host + ":" + port + " successfully (backupSite mode)");
475+
450476
try {
477+
// Get topic and mark sites as closed BEFORE sending stopPublishTable
478+
// This prevents MessageParser from triggering reconnect when it receives EOF
479+
String topic;
480+
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
481+
topic = tableNameToTrueTopic.get(fullTableName);
482+
log.debug("Retrieved topic from tableNameToTrueTopic: " + topic + " for " + fullTableName);
483+
484+
Site[] sites = trueTopicToSites.get(topic);
485+
if (sites == null || sites.length == 0) {
486+
log.warn("No sites found for topic: " + topic);
487+
} else {
488+
log.debug("Marking " + sites.length + " site(s) as closed for topic: " + topic + " BEFORE sending stopPublishTable (backupSite mode)");
489+
for (int i = 0; i < sites.length; i++) {
490+
sites[i].closed = true;
491+
log.debug("Site " + i + " marked as closed: " + sites[i].host + ":" + sites[i].port);
492+
}
493+
}
494+
451495
String localIP = this.listeningHost;
452496
if(localIP.equals(""))
453497
localIP = dbConn.getLocalAddress().getHostAddress();
@@ -457,36 +501,33 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
457501
params.add(new BasicString(tableName));
458502
params.add(new BasicString(actionName));
459503

504+
log.debug("Sending stopPublishTable command (backupSite mode) with params: localIP=" + localIP + ", listeningPort=" + this.listeningPort + ", tableName=" + tableName + ", actionName=" + actionName);
460505
dbConn.run("stopPublishTable", params);
461-
String topic = null;
462-
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
463-
topic = tableNameToTrueTopic.get(fullTableName);
464-
465-
Site[] sites = trueTopicToSites.get(topic);
466-
if (sites == null || sites.length == 0)
467-
;
468-
for (int i = 0; i < sites.length; i++)
469-
sites[i].closed = true;
506+
log.debug("stopPublishTable command executed successfully (backupSite mode) for " + originalFullTableName);
470507

471508
queueManager.removeQueue(topic);
509+
log.debug("Queue removed from queueManager for topic: " + topic);
472510

473511
// init backupSites related params.
474512
if (AbstractClient.ifUseBackupSite) {
513+
log.debug("Resetting backupSite related parameters");
475514
AbstractClient.ifUseBackupSite = false;
476515
AbstractClient.subOnce = false;
477516
AbstractClient.resubscribeInterval = 100;
478517
}
479-
log.info("Successfully unsubscribed table " + fullTableName);
518+
log.debug("Successfully unsubscribed table " + fullTableName);
480519
} catch (Exception ex) {
520+
log.error("Error occurred during unsubscribe (backupSite mode) for " + originalFullTableName, ex);
481521
throw ex;
482522
} finally {
483523
dbConn.close();
524+
log.debug("DBConnection closed (backupSite mode) for " + originalFullTableName);
525+
484526
String topicStr = originHost + ":" + originPort + "/" + tableName + "/" + actionName;
485-
QueueHandlerBinder queueHandler =null;
486527
synchronized (queueHandlers){
487-
queueHandler = queueHandlers.get(topicStr);
488528
queueHandlers.remove(topicStr);
489529
}
530+
log.debug("Unsubscribe process completed (backupSite mode) for " + originalFullTableName);
490531
}
491532
}
492533
}

0 commit comments

Comments
 (0)