Skip to content

Commit 133f6f0

Browse files
author
Myron Scott
committed
Move to a single health-check timer per classloader. Add healthchecker to diagnostic connections
1 parent 2b1d369 commit 133f6f0

File tree

4 files changed

+99
-58
lines changed

4 files changed

+99
-58
lines changed

common/src/main/java/com/tc/net/protocol/tcm/CommunicationsManagerImpl.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import java.util.concurrent.ConcurrentHashMap;
7474
import java.util.concurrent.locks.ReentrantLock;
7575
import java.util.function.Predicate;
76+
import java.util.function.Supplier;
7677

7778
/**
7879
* Communications manager for setting up listeners and creating client connections
@@ -96,7 +97,7 @@ public class CommunicationsManagerImpl implements CommunicationsManager {
9697
protected final ConcurrentHashMap<TCMessageType, Class<? extends TCAction>> messageTypeClassMapping = new ConcurrentHashMap<TCMessageType, Class<? extends TCAction>>();
9798
protected final ConcurrentHashMap<TCMessageType, GeneratedMessageFactory> messageTypeFactoryMapping = new ConcurrentHashMap<TCMessageType, GeneratedMessageFactory>();
9899

99-
private ConnectionHealthChecker connectionHealthChecker;
100+
private final ConnectionHealthChecker connectionHealthChecker;
100101
private ServerID serverID = ServerID.NULL_ID;
101102
private int callbackPort = TransportHandshakeMessage.NO_CALLBACK_PORT;
102103
private final TransportHandshakeErrorHandler handshakeErrHandler;
@@ -177,15 +178,22 @@ public CommunicationsManagerImpl(MessageMonitor monitor, TCMessageRouter message
177178
this.connectionManager = connMgr;
178179

179180
Assert.eval(healthCheckerConfig != null);
180-
if (healthCheckerConfig.isHealthCheckerEnabled()) {
181+
connectionHealthChecker = healthCheckerConfig.isHealthCheckerEnabled() ?
182+
new ConnectionHealthCheckerImpl(healthCheckerConfig, connectionManager, createReferenceCheck()) :
183+
new ConnectionHealthCheckerEchoImpl();
184+
}
185+
186+
private Supplier<Boolean> createReferenceCheck() {
181187
// reference this manager. If it not reachable, all connections and threads associated need to be cleaned up.
182-
ReferenceQueue<Object> gc = new ReferenceQueue<>();
183-
PhantomReference<Object> ref = new PhantomReference<>(this, gc);
184-
connectionHealthChecker = new ConnectionHealthCheckerImpl(healthCheckerConfig, connectionManager, ()->gc.poll() != ref);
185-
} else {
186-
connectionHealthChecker = new ConnectionHealthCheckerEchoImpl();
187-
}
188-
connectionHealthChecker.start();
188+
ReferenceQueue<Object> gc = new ReferenceQueue<>();
189+
PhantomReference<Object> ref = new PhantomReference<>(this, gc);
190+
SetOnceFlag gcd = new SetOnceFlag();
191+
return ()-> {
192+
if (gc.poll() == ref) {
193+
gcd.set();
194+
}
195+
return !gcd.isSet();
196+
};
189197
}
190198

191199
@Override
@@ -392,12 +400,6 @@ public ConnectionHealthChecker getConnHealthChecker() {
392400
return this.connectionHealthChecker;
393401
}
394402

395-
public void setConnHealthChecker(ConnectionHealthChecker checker) {
396-
this.connectionHealthChecker.stop();
397-
this.connectionHealthChecker = checker;
398-
this.connectionHealthChecker.start();
399-
}
400-
401403
public TCMessageRouter getMessageRouter() {
402404
return this.messageRouter;
403405
}

common/src/main/java/com/tc/net/protocol/transport/ConnectionHealthCheckerImpl.java

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,22 @@
1818
*/
1919
package com.tc.net.protocol.transport;
2020

21+
import com.tc.lang.TCThreadGroup;
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
2324

2425
import com.tc.net.core.TCConnection;
2526
import com.tc.net.core.TCConnectionManager;
2627
import com.tc.util.Assert;
27-
import com.tc.util.concurrent.SetOnceFlag;
2828
import java.net.InetSocketAddress;
2929

3030
import java.util.Iterator;
31-
import java.util.Timer;
32-
import java.util.TimerTask;
3331
import java.util.concurrent.ConcurrentHashMap;
3432
import java.util.concurrent.ConcurrentMap;
33+
import java.util.concurrent.Future;
34+
import java.util.concurrent.ScheduledExecutorService;
35+
import java.util.concurrent.ScheduledThreadPoolExecutor;
36+
import java.util.concurrent.TimeUnit;
3537
import java.util.concurrent.atomic.AtomicLong;
3638
import java.util.function.Supplier;
3739

@@ -44,20 +46,35 @@
4446
public class ConnectionHealthCheckerImpl implements ConnectionHealthChecker {
4547

4648
private final Logger logger;
47-
private final Timer monitorThread;
49+
private static final ScheduledExecutorService monitorThread = createHealthCheckExecutor();
4850
private final HealthCheckerMonitorThreadEngine monitorThreadEngine;
4951

50-
private final SetOnceFlag shutdown = new SetOnceFlag();
51-
private final SetOnceFlag started = new SetOnceFlag();
52+
private Future<?> task;
5253

5354
public ConnectionHealthCheckerImpl(HealthCheckerConfig healthCheckerConfig, TCConnectionManager connManager, Supplier<Boolean> reachable) {
5455
Assert.assertNotNull(healthCheckerConfig);
5556
Assert.eval(healthCheckerConfig.isHealthCheckerEnabled());
5657
logger = LoggerFactory.getLogger(ConnectionHealthCheckerImpl.class.getName() + ": "
5758
+ healthCheckerConfig.getHealthCheckerName());
58-
monitorThread = new Timer(healthCheckerConfig.getHealthCheckerName() + " - HealthCheck-Timer", true);
5959
monitorThreadEngine = getHealthMonitorThreadEngine(healthCheckerConfig, connManager, reachable, logger);
6060
}
61+
62+
private static ScheduledExecutorService createHealthCheckExecutor() {
63+
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, (r) -> {
64+
ThreadGroup grp = Thread.currentThread().getThreadGroup();
65+
String id = "";
66+
while (grp instanceof TCThreadGroup) {
67+
id = "-" + grp.getName();
68+
grp = grp.getParent();
69+
}
70+
Thread t = new Thread(grp, r, "HealthCheck" + id);
71+
t.setDaemon(true);
72+
return t;
73+
});
74+
exec.setKeepAliveTime(5, TimeUnit.SECONDS);
75+
exec.allowCoreThreadTimeOut(true);
76+
return exec;
77+
}
6178

6279
private HealthCheckerMonitorThreadEngine getHealthMonitorThreadEngine(HealthCheckerConfig config,
6380
TCConnectionManager connectionManager,
@@ -69,17 +86,18 @@ private HealthCheckerMonitorThreadEngine getHealthMonitorThreadEngine(HealthChec
6986
} else {
7087
stop();
7188
connectionManager.asynchCloseAllConnections();
89+
connectionManager.closeAllListeners();
7290
connectionManager.shutdown();
7391
return false;
7492
}
7593
}, logger);
7694
}
7795

7896
@Override
79-
public void start() {
80-
if (started.attemptSet() && !shutdown.isSet()) {
97+
public synchronized void start() {
98+
if (task == null) {
8199
try {
82-
monitorThread.scheduleAtFixedRate(monitorThreadEngine, 0L, monitorThreadEngine.pingInterval);
100+
task = monitorThread.scheduleAtFixedRate(monitorThreadEngine, 0L, monitorThreadEngine.pingInterval, TimeUnit.MILLISECONDS);
83101
} catch (IllegalStateException state) {
84102
logger.warn("HealthChecker cannot start");
85103
return;
@@ -91,22 +109,15 @@ public void start() {
91109
}
92110

93111
@Override
94-
public void stop() {
95-
if (shutdown.attemptSet()) {
96-
monitorThreadEngine.stop();
97-
monitorThread.cancel();
98-
// don't bother to join the monitorThread here. shutdown should take care of all the
99-
// threads in the thread group
112+
public synchronized void stop() {
113+
if (task != null && !task.isCancelled()) {
114+
task.cancel(true);
100115
logger.debug("HealthChecker STOP requested");
101116
} else {
102117
logger.warn("HealthChecker STOP already requested");
103118
}
104119
}
105120

106-
public boolean isRunning() {
107-
return started.isSet() && !shutdown.isSet();
108-
}
109-
110121
@Override
111122
public void notifyTransportClosed(MessageTransport transport) {
112123
// HealthChecker Ping Thread can anyway determine this in the next probe interval thru mtb.isConnected and remove it
@@ -129,6 +140,7 @@ public void notifyTransportConnectAttempt(MessageTransport transport) {
129140

130141
@Override
131142
public void notifyTransportConnected(MessageTransport transport) {
143+
start();
132144
monitorThreadEngine.addConnection(transport);
133145
}
134146

@@ -153,7 +165,7 @@ public void notifyTransportReconnectionRejected(MessageTransport transport) {
153165
// NOP
154166
}
155167

156-
static class HealthCheckerMonitorThreadEngine extends TimerTask {
168+
static class HealthCheckerMonitorThreadEngine implements Runnable {
157169
private final ConcurrentMap<ConnectionID, MessageTransportBase> connectionMap =
158170
new ConcurrentHashMap<>();
159171
private final long pingIdleTime;
@@ -204,10 +216,6 @@ protected ConnectionHealthCheckerContext getHealthCheckerContext(MessageTranspor
204216
return new ConnectionHealthCheckerContextImpl(transport, conf, connManager);
205217
}
206218

207-
public void stop() {
208-
this.cancel();
209-
}
210-
211219
@Override
212220
public void run() {
213221
// same interval for all connections

galvan-support/src/test/java/org/terracotta/testing/rules/ClientLeakIT.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.terracotta.testing.rules;
2020

2121

22+
import java.net.InetSocketAddress;
2223
import java.util.Properties;
2324
import static org.junit.Assert.assertFalse;
2425
import static org.junit.Assert.assertNotNull;
@@ -31,6 +32,8 @@
3132
import org.terracotta.connection.ConnectionException;
3233
import org.terracotta.connection.ConnectionFactory;
3334
import org.terracotta.connection.ConnectionPropertyNames;
35+
import org.terracotta.connection.Diagnostics;
36+
import org.terracotta.connection.DiagnosticsFactory;
3437

3538

3639
/**
@@ -147,8 +150,54 @@ public void testSEDADiesWithNoRef() throws Exception {
147150
System.gc();
148151
Thread.sleep(1_000);
149152
}
153+
154+
String[] hp = CLUSTER.getClusterHostPorts();
155+
String[] server = hp[0].split(":");
156+
Diagnostics diag = DiagnosticsFactory.connect(InetSocketAddress.createUnresolved(server[0], Integer.parseInt(server[1])), props);
157+
158+
assertNotNull(diag);
159+
assertNotNull(lookForThreadWithName(connectionName));
160+
161+
diag = null;
162+
163+
while (lookForThreadWithName(connectionName) != null) {
164+
LOGGER.info(cid);
165+
System.gc();
166+
Thread.sleep(1_000);
167+
}
150168
}
169+
170+
@Test
171+
public void testHealthCheckThreadDies() throws Exception {
172+
CLUSTER.getClusterControl().startAllServers();
173+
CLUSTER.getClusterControl().waitForActive();
174+
175+
String connectionName = "LEAKTESTCLIENT";
176+
Properties props = new Properties();
177+
props.setProperty(ConnectionPropertyNames.CONNECTION_NAME, connectionName);
178+
Connection leak = ConnectionFactory.connect(CLUSTER.getConnectionURI(), props);
179+
String cid = leak.toString();
151180

181+
assertNotNull(leak);
182+
assertNotNull(lookForThreadWithName(connectionName));
183+
184+
leak = null;
185+
186+
while (lookForThreadWithName(connectionName) != null) {
187+
LOGGER.info(cid);
188+
System.gc();
189+
Thread.sleep(1_000);
190+
}
191+
192+
CLUSTER.getClusterControl().terminateAllServers();
193+
194+
while (lookForThreadWithName("HealthCheck") != null) {
195+
LOGGER.info(cid);
196+
System.gc();
197+
Thread.sleep(1_000);
198+
}
199+
}
200+
152201
private static Thread lookForThreadWithName(String name) {
153202
Thread[] list = new Thread[Thread.activeCount()];
154203
Thread.enumerate(list);

tc-client/src/main/java/com/terracotta/diagnostic/DiagnosticClientBuilder.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,40 +19,22 @@
1919
package com.terracotta.diagnostic;
2020

2121

22-
import com.tc.async.api.StageManager;
2322
import com.tc.net.basic.BasicConnectionManager;
2423
import com.tc.net.core.BufferManagerFactory;
2524
import com.tc.net.core.TCConnectionManager;
26-
import com.tc.net.protocol.NetworkStackHarnessFactory;
2725
import com.tc.net.protocol.tcm.ClientMessageChannel;
28-
import com.tc.net.protocol.tcm.CommunicationsManager;
29-
import com.tc.net.protocol.tcm.MessageMonitor;
30-
import com.tc.net.protocol.tcm.TCMessageRouter;
31-
import com.tc.net.protocol.tcm.TCMessageType;
32-
import com.tc.net.protocol.transport.ConnectionPolicy;
33-
import com.tc.net.protocol.transport.DisabledHealthCheckerConfigImpl;
34-
import com.tc.net.protocol.transport.HealthCheckerConfig;
35-
import com.tc.net.protocol.transport.ReconnectionRejectedHandler;
3626
import com.tc.object.ClientEntityManager;
3727
import com.tc.object.StandardClientBuilder;
3828
import com.tc.net.core.ProductID;
39-
import java.util.Map;
4029

4130
import java.util.Properties;
42-
import com.tc.net.protocol.tcm.TCAction;
4331

4432

4533
public class DiagnosticClientBuilder extends StandardClientBuilder {
4634

4735
public DiagnosticClientBuilder(Properties connectionProperties, BufferManagerFactory buffers) {
4836
super(connectionProperties, buffers);
4937
}
50-
51-
@Override
52-
public CommunicationsManager createCommunicationsManager(MessageMonitor monitor, TCMessageRouter messageRouter, NetworkStackHarnessFactory stackHarnessFactory, ConnectionPolicy connectionPolicy,
53-
TCConnectionManager connections, HealthCheckerConfig aConfig, Map<TCMessageType, Class<? extends TCAction>> messageTypeClassMapping, ReconnectionRejectedHandler reconnectionRejectedHandler) {
54-
return super.createCommunicationsManager(monitor, messageRouter, stackHarnessFactory, connectionPolicy, connections, new DisabledHealthCheckerConfigImpl(), messageTypeClassMapping, reconnectionRejectedHandler);
55-
}
5638

5739
@Override
5840
public ClientEntityManager createClientEntityManager(ClientMessageChannel channel) {

0 commit comments

Comments
 (0)