Skip to content

Commit 3ef19b9

Browse files
author
Myron Scott
committed
add connection reachability for live connections
1 parent 25a2d06 commit 3ef19b9

File tree

4 files changed

+101
-55
lines changed

4 files changed

+101
-55
lines changed

common/src/main/java/com/tc/net/protocol/tcm/CommunicationsManagerImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@
6060
import com.tc.util.concurrent.SetOnceFlag;
6161

6262
import java.io.IOException;
63+
import java.lang.ref.PhantomReference;
64+
import java.lang.ref.ReferenceQueue;
6365
import java.net.InetSocketAddress;
6466
import java.util.Collections;
6567
import java.util.HashSet;
@@ -176,7 +178,10 @@ public CommunicationsManagerImpl(MessageMonitor monitor, TCMessageRouter message
176178

177179
Assert.eval(healthCheckerConfig != null);
178180
if (healthCheckerConfig.isHealthCheckerEnabled()) {
179-
connectionHealthChecker = new ConnectionHealthCheckerImpl(healthCheckerConfig, connectionManager);
181+
// reference this manager. If it not reachable, all connections and threads associated need to be cleaned up.
182+
ReferenceQueue<Object> gc = new ReferenceQueue<>();
183+
PhantomReference<Object> ref = new PhantomReference<>(this, gc);
184+
connectionHealthChecker = new ConnectionHealthCheckerImpl(healthCheckerConfig, connectionManager, ()->gc.poll() != ref);
180185
} else {
181186
connectionHealthChecker = new ConnectionHealthCheckerEchoImpl();
182187
}

common/src/main/java/com/tc/net/protocol/transport/ConnectionHealthCheckerImpl.java

Lines changed: 53 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.ConcurrentHashMap;
3434
import java.util.concurrent.ConcurrentMap;
3535
import java.util.concurrent.atomic.AtomicLong;
36+
import java.util.function.Supplier;
3637

3738
/**
3839
* The Engine which does the peer health checking work. Based on the config passed, it probes the peer once in specified
@@ -48,20 +49,30 @@ public class ConnectionHealthCheckerImpl implements ConnectionHealthChecker {
4849

4950
private final SetOnceFlag shutdown = new SetOnceFlag();
5051
private final SetOnceFlag started = new SetOnceFlag();
51-
52-
public ConnectionHealthCheckerImpl(HealthCheckerConfig healthCheckerConfig, TCConnectionManager connManager) {
52+
53+
public ConnectionHealthCheckerImpl(HealthCheckerConfig healthCheckerConfig, TCConnectionManager connManager, Supplier<Boolean> reachable) {
5354
Assert.assertNotNull(healthCheckerConfig);
5455
Assert.eval(healthCheckerConfig.isHealthCheckerEnabled());
5556
logger = LoggerFactory.getLogger(ConnectionHealthCheckerImpl.class.getName() + ": "
5657
+ healthCheckerConfig.getHealthCheckerName());
5758
monitorThread = new Timer(healthCheckerConfig.getHealthCheckerName() + " - HealthCheck-Timer", true);
58-
monitorThreadEngine = getHealthMonitorThreadEngine(healthCheckerConfig, connManager, logger);
59+
monitorThreadEngine = getHealthMonitorThreadEngine(healthCheckerConfig, connManager, reachable, logger);
5960
}
6061

61-
protected HealthCheckerMonitorThreadEngine getHealthMonitorThreadEngine(HealthCheckerConfig config,
62+
private HealthCheckerMonitorThreadEngine getHealthMonitorThreadEngine(HealthCheckerConfig config,
6263
TCConnectionManager connectionManager,
63-
Logger loger) {
64-
return new HealthCheckerMonitorThreadEngine(config, connectionManager, loger);
64+
Supplier<Boolean> reachable,
65+
Logger logger) {
66+
return new HealthCheckerMonitorThreadEngine(config, connectionManager, ()-> {
67+
if (reachable.get()) {
68+
return true;
69+
} else {
70+
stop();
71+
connectionManager.asynchCloseAllConnections();
72+
connectionManager.shutdown();
73+
return false;
74+
}
75+
}, logger);
6576
}
6677

6778
@Override
@@ -149,19 +160,20 @@ static class HealthCheckerMonitorThreadEngine extends TimerTask {
149160
private final long pingInterval;
150161
private final int pingProbes;
151162
private final long checkTimeInterval;
152-
private final SetOnceFlag stop = new SetOnceFlag();
153163
private final HealthCheckerConfig config;
154164
private final Logger logger;
155165
private final TCConnectionManager connectionManager;
166+
private final Supplier<Boolean> reachable;
156167
private final AtomicLong lastCheckTime = new AtomicLong(System.currentTimeMillis());
157168

158169
public HealthCheckerMonitorThreadEngine(HealthCheckerConfig healthCheckerConfig,
159-
TCConnectionManager connectionManager, Logger logger) {
170+
TCConnectionManager connectionManager, Supplier<Boolean> reachable, Logger logger) {
160171
this.pingIdleTime = healthCheckerConfig.getPingIdleTimeMillis();
161172
this.pingInterval = healthCheckerConfig.getPingIntervalMillis();
162173
this.pingProbes = healthCheckerConfig.getPingProbes();
163174
this.checkTimeInterval = healthCheckerConfig.getCheckTimeInterval();
164175
this.connectionManager = connectionManager;
176+
this.reachable = reachable;
165177
this.config = healthCheckerConfig;
166178

167179
Assert.assertNotNull(logger);
@@ -193,7 +205,6 @@ protected ConnectionHealthCheckerContext getHealthCheckerContext(MessageTranspor
193205
}
194206

195207
public void stop() {
196-
stop.attemptSet();
197208
this.cancel();
198209
}
199210

@@ -202,42 +213,44 @@ public void run() {
202213
// same interval for all connections
203214
final boolean canCheckTime = canCheckTime();
204215

205-
Iterator<MessageTransportBase> connectionIterator = connectionMap.values().iterator();
206-
while (connectionIterator.hasNext()) {
207-
MessageTransportBase mtb = connectionIterator.next();
208-
209-
TCConnection conn = mtb.getConnection();
210-
if (conn == null || !mtb.isConnected()) {
211-
logger.info("[" + (conn == null ? null : conn.getRemoteAddress().toString())
212-
+ "] is not connected. Health Monitoring for this node is now disabled.");
213-
connectionIterator.remove();
214-
continue;
215-
}
216-
217-
if (mtb.getReceiveLayer() == null) {
218-
logger.info("[" + (conn == null ? null : conn.getRemoteAddress().toString())
219-
+ "] is no longer referenced. Closing the connection");
220-
mtb.disconnect();
221-
connectionIterator.remove();
222-
continue;
223-
}
216+
if (reachable.get()) {
217+
Iterator<MessageTransportBase> connectionIterator = connectionMap.values().iterator();
218+
while (connectionIterator.hasNext()) {
219+
MessageTransportBase mtb = connectionIterator.next();
224220

225-
ConnectionHealthCheckerContext connContext = mtb.getHealthCheckerContext();
226-
if ((conn.getIdleReceiveTime() >= this.pingIdleTime)) {
221+
TCConnection conn = mtb.getConnection();
222+
if (conn == null || !mtb.isConnected()) {
223+
logger.info("[" + (conn == null ? null : conn.getRemoteAddress().toString())
224+
+ "] is not connected. Health Monitoring for this node is now disabled.");
225+
connectionIterator.remove();
226+
continue;
227+
}
227228

228-
if (!connContext.probeIfAlive()) {
229-
// Connection is dead. Disconnect the transport.
230-
logger.error("Declared connection dead " + mtb.getConnectionID() + " idle time "
231-
+ conn.getIdleReceiveTime() + "ms");
229+
if (mtb.getReceiveLayer() == null) {
230+
logger.info("[" + (conn == null ? null : conn.getRemoteAddress().toString())
231+
+ "] is no longer referenced. Closing the connection");
232232
mtb.disconnect();
233233
connectionIterator.remove();
234+
continue;
235+
}
236+
237+
ConnectionHealthCheckerContext connContext = mtb.getHealthCheckerContext();
238+
if ((conn.getIdleReceiveTime() >= this.pingIdleTime)) {
239+
240+
if (!connContext.probeIfAlive()) {
241+
// Connection is dead. Disconnect the transport.
242+
logger.error("Declared connection dead " + mtb.getConnectionID() + " idle time "
243+
+ conn.getIdleReceiveTime() + "ms");
244+
mtb.disconnect();
245+
connectionIterator.remove();
246+
}
247+
} else {
248+
connContext.refresh();
249+
}
250+
// is there any significant time difference between hosts ?
251+
if (canCheckTime) {
252+
connContext.checkTime();
234253
}
235-
} else {
236-
connContext.refresh();
237-
}
238-
// is there any significant time difference between hosts ?
239-
if (canCheckTime) {
240-
connContext.checkTime();
241254
}
242255
}
243256

common/src/test/java/com/tc/net/protocol/transport/HealthCheckerMonitorThreadEngineTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void testAllowCheckTimeIfEnabledInConfig() {
4646
// ignore interval
4747
props.setProperty("checkTime.interval", "-1");
4848
HealthCheckerConfigImpl config = new HealthCheckerConfigImpl(props, "test-config");
49-
final HealthCheckerMonitorThreadEngine engine = new HealthCheckerMonitorThreadEngine(config, null, logger);
49+
final HealthCheckerMonitorThreadEngine engine = new HealthCheckerMonitorThreadEngine(config, null, ()->Boolean.TRUE, logger);
5050

5151
assertTrue(engine.canCheckTime());
5252
}
@@ -57,7 +57,7 @@ public void testDisallowCheckTimeIfDisabledInConfig() {
5757
// disable time checking
5858
props.setProperty("checkTime.enabled", "false");
5959
HealthCheckerConfigImpl config = new HealthCheckerConfigImpl(props, "test-config");
60-
final HealthCheckerMonitorThreadEngine engine = new HealthCheckerMonitorThreadEngine(config, null, logger);
60+
final HealthCheckerMonitorThreadEngine engine = new HealthCheckerMonitorThreadEngine(config, null, ()->Boolean.TRUE, logger);
6161

6262
assertFalse(engine.canCheckTime());
6363
}
@@ -68,7 +68,7 @@ public void testDisallowCheckTimeIfIntervalNotExceeded() {
6868
// set short interval
6969
props.setProperty("checkTime.interval", "900000");
7070
HealthCheckerConfigImpl config = new HealthCheckerConfigImpl(props, "test-config");
71-
final HealthCheckerMonitorThreadEngine engine = new HealthCheckerMonitorThreadEngine(config, null, logger);
71+
final HealthCheckerMonitorThreadEngine engine = new HealthCheckerMonitorThreadEngine(config, null, ()->Boolean.TRUE, logger);
7272

7373
assertFalse(engine.canCheckTime());
7474
}

galvan-support/src/test/java/org/terracotta/testing/rules/ClientLeakIT.java

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,18 @@
1919
package org.terracotta.testing.rules;
2020

2121

22+
import java.util.Properties;
2223
import static org.junit.Assert.assertFalse;
24+
import static org.junit.Assert.assertNotNull;
2325
import static org.junit.Assert.assertNull;
2426
import org.junit.ClassRule;
2527
import org.junit.Test;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2630
import org.terracotta.connection.Connection;
2731
import org.terracotta.connection.ConnectionException;
32+
import org.terracotta.connection.ConnectionFactory;
33+
import org.terracotta.connection.ConnectionPropertyNames;
2834

2935

3036
/**
@@ -35,6 +41,8 @@
3541
// XXX: Currently ignored since this test depends on restartability of the server, which now requires a persistence service
3642
// to be plugged in (and there isn't one available, in open source).
3743
public class ClientLeakIT {
44+
private static final Logger LOGGER = LoggerFactory.getLogger(ClientLeakIT.class);
45+
3846
@ClassRule
3947
public static final Cluster CLUSTER = BasicExternalClusterBuilder.newCluster(1).build(); //logConfigExtensionResourceName("custom-logback-ext.xml").build();
4048

@@ -117,26 +125,46 @@ public void testConnectionMakerDiesWithNoRef() throws Exception {
117125
assertFalse(maker.isAlive());
118126
}
119127
}
120-
121-
private static Thread lookForConnectionMaker() {
128+
129+
@Test
130+
public void testSEDADiesWithNoRef() throws Exception {
131+
CLUSTER.getClusterControl().startAllServers();
132+
CLUSTER.getClusterControl().waitForActive();
133+
134+
String connectionName = "LEAKTESTCLIENT";
135+
Properties props = new Properties();
136+
props.setProperty(ConnectionPropertyNames.CONNECTION_NAME, connectionName);
137+
Connection leak = ConnectionFactory.connect(CLUSTER.getConnectionURI(), props);
138+
String cid = leak.toString();
139+
140+
assertNotNull(leak);
141+
assertNotNull(lookForThreadWithName(connectionName));
142+
143+
leak = null;
144+
145+
while (lookForThreadWithName(connectionName) != null) {
146+
LOGGER.info(cid);
147+
System.gc();
148+
Thread.sleep(1_000);
149+
}
150+
}
151+
152+
private static Thread lookForThreadWithName(String name) {
122153
Thread[] list = new Thread[Thread.activeCount()];
123154
Thread.enumerate(list);
124155
for (Thread t : list) {
125-
if (t != null && t.getName().startsWith("Connection Maker")) {
156+
if (t != null && t.getName().startsWith(name)) {
126157
return t;
127158
}
128159
}
129160
return null;
161+
}
162+
163+
private static Thread lookForConnectionMaker() {
164+
return lookForThreadWithName("Connection Maker");
130165
}
131166

132167
private static Thread lookForConnectionEstablisher() {
133-
Thread[] list = new Thread[Thread.activeCount()];
134-
Thread.enumerate(list);
135-
for (Thread t : list) {
136-
if (t != null && t.getName().startsWith("ConnectionEstablisher")) {
137-
return t;
138-
}
139-
}
140-
return null;
168+
return lookForThreadWithName("ConnectionEstablisher");
141169
}
142170
}

0 commit comments

Comments
 (0)