Skip to content

Commit de64ea2

Browse files
authored
Merge pull request #1284 from myronkscott/reference_cleanup
Reference tracking reduction
2 parents 928bdd3 + aa69062 commit de64ea2

File tree

15 files changed

+165
-246
lines changed

15 files changed

+165
-246
lines changed

common/src/main/java/com/tc/async/api/SEDA.java

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ public class SEDA {
3333

3434
public SEDA(TCThreadGroup threadGroup) {
3535
this.threadGroup = threadGroup;
36-
this.stageManager = new StageManagerImpl(threadGroup, new QueueFactory(), new StageListener() {
37-
@Override
38-
public void stageStalled(String name, long delay, int queueDepth) {
39-
stageWarning(new StallWarning(name, delay, queueDepth));
40-
}
41-
});
36+
this.stageManager = new StageManagerImpl(threadGroup, new QueueFactory());
37+
}
38+
39+
public SEDA(TCThreadGroup threadGroup, StageListener listener) {
40+
this.threadGroup = threadGroup;
41+
this.stageManager = new StageManagerImpl(threadGroup, new QueueFactory(), listener);
4242
}
4343

4444
public StageManager getStageManager() {
@@ -48,25 +48,4 @@ public StageManager getStageManager() {
4848
protected TCThreadGroup getThreadGroup() {
4949
return this.threadGroup;
5050
}
51-
52-
public void stageWarning(Object description) {
53-
54-
}
55-
56-
private static class StallWarning {
57-
private final String name;
58-
private final long delay;
59-
private final int depth;
60-
61-
public StallWarning(String name, long delay, int depth) {
62-
this.name = name;
63-
this.delay = delay;
64-
this.depth = depth;
65-
}
66-
67-
@Override
68-
public String toString() {
69-
return "StallWarning{" + "name=" + name + ", delay=" + delay + ", depth=" + depth + '}';
70-
}
71-
}
7251
}

common/src/main/java/com/tc/net/protocol/tcm/CommunicationsManagerImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@
6060
import com.tc.util.concurrent.SetOnceFlag;
6161

6262
import java.io.IOException;
63+
import java.lang.ref.PhantomReference;
64+
import java.lang.ref.ReferenceQueue;
6365
import java.net.InetSocketAddress;
6466
import java.util.Collections;
6567
import java.util.HashSet;
@@ -176,7 +178,10 @@ public CommunicationsManagerImpl(MessageMonitor monitor, TCMessageRouter message
176178

177179
Assert.eval(healthCheckerConfig != null);
178180
if (healthCheckerConfig.isHealthCheckerEnabled()) {
179-
connectionHealthChecker = new ConnectionHealthCheckerImpl(healthCheckerConfig, connectionManager);
181+
// reference this manager. If it not reachable, all connections and threads associated need to be cleaned up.
182+
ReferenceQueue<Object> gc = new ReferenceQueue<>();
183+
PhantomReference<Object> ref = new PhantomReference<>(this, gc);
184+
connectionHealthChecker = new ConnectionHealthCheckerImpl(healthCheckerConfig, connectionManager, ()->gc.poll() != ref);
180185
} else {
181186
connectionHealthChecker = new ConnectionHealthCheckerEchoImpl();
182187
}

common/src/main/java/com/tc/net/protocol/transport/ConnectionHealthCheckerImpl.java

Lines changed: 53 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.ConcurrentHashMap;
3434
import java.util.concurrent.ConcurrentMap;
3535
import java.util.concurrent.atomic.AtomicLong;
36+
import java.util.function.Supplier;
3637

3738
/**
3839
* The Engine which does the peer health checking work. Based on the config passed, it probes the peer once in specified
@@ -48,20 +49,30 @@ public class ConnectionHealthCheckerImpl implements ConnectionHealthChecker {
4849

4950
private final SetOnceFlag shutdown = new SetOnceFlag();
5051
private final SetOnceFlag started = new SetOnceFlag();
51-
52-
public ConnectionHealthCheckerImpl(HealthCheckerConfig healthCheckerConfig, TCConnectionManager connManager) {
52+
53+
public ConnectionHealthCheckerImpl(HealthCheckerConfig healthCheckerConfig, TCConnectionManager connManager, Supplier<Boolean> reachable) {
5354
Assert.assertNotNull(healthCheckerConfig);
5455
Assert.eval(healthCheckerConfig.isHealthCheckerEnabled());
5556
logger = LoggerFactory.getLogger(ConnectionHealthCheckerImpl.class.getName() + ": "
5657
+ healthCheckerConfig.getHealthCheckerName());
5758
monitorThread = new Timer(healthCheckerConfig.getHealthCheckerName() + " - HealthCheck-Timer", true);
58-
monitorThreadEngine = getHealthMonitorThreadEngine(healthCheckerConfig, connManager, logger);
59+
monitorThreadEngine = getHealthMonitorThreadEngine(healthCheckerConfig, connManager, reachable, logger);
5960
}
6061

61-
protected HealthCheckerMonitorThreadEngine getHealthMonitorThreadEngine(HealthCheckerConfig config,
62+
private HealthCheckerMonitorThreadEngine getHealthMonitorThreadEngine(HealthCheckerConfig config,
6263
TCConnectionManager connectionManager,
63-
Logger loger) {
64-
return new HealthCheckerMonitorThreadEngine(config, connectionManager, loger);
64+
Supplier<Boolean> reachable,
65+
Logger logger) {
66+
return new HealthCheckerMonitorThreadEngine(config, connectionManager, ()-> {
67+
if (reachable.get()) {
68+
return true;
69+
} else {
70+
stop();
71+
connectionManager.asynchCloseAllConnections();
72+
connectionManager.shutdown();
73+
return false;
74+
}
75+
}, logger);
6576
}
6677

6778
@Override
@@ -149,19 +160,20 @@ static class HealthCheckerMonitorThreadEngine extends TimerTask {
149160
private final long pingInterval;
150161
private final int pingProbes;
151162
private final long checkTimeInterval;
152-
private final SetOnceFlag stop = new SetOnceFlag();
153163
private final HealthCheckerConfig config;
154164
private final Logger logger;
155165
private final TCConnectionManager connectionManager;
166+
private final Supplier<Boolean> reachable;
156167
private final AtomicLong lastCheckTime = new AtomicLong(System.currentTimeMillis());
157168

158169
public HealthCheckerMonitorThreadEngine(HealthCheckerConfig healthCheckerConfig,
159-
TCConnectionManager connectionManager, Logger logger) {
170+
TCConnectionManager connectionManager, Supplier<Boolean> reachable, Logger logger) {
160171
this.pingIdleTime = healthCheckerConfig.getPingIdleTimeMillis();
161172
this.pingInterval = healthCheckerConfig.getPingIntervalMillis();
162173
this.pingProbes = healthCheckerConfig.getPingProbes();
163174
this.checkTimeInterval = healthCheckerConfig.getCheckTimeInterval();
164175
this.connectionManager = connectionManager;
176+
this.reachable = reachable;
165177
this.config = healthCheckerConfig;
166178

167179
Assert.assertNotNull(logger);
@@ -193,7 +205,6 @@ protected ConnectionHealthCheckerContext getHealthCheckerContext(MessageTranspor
193205
}
194206

195207
public void stop() {
196-
stop.attemptSet();
197208
this.cancel();
198209
}
199210

@@ -202,42 +213,44 @@ public void run() {
202213
// same interval for all connections
203214
final boolean canCheckTime = canCheckTime();
204215

205-
Iterator<MessageTransportBase> connectionIterator = connectionMap.values().iterator();
206-
while (connectionIterator.hasNext()) {
207-
MessageTransportBase mtb = connectionIterator.next();
208-
209-
TCConnection conn = mtb.getConnection();
210-
if (conn == null || !mtb.isConnected()) {
211-
logger.info("[" + (conn == null ? null : conn.getRemoteAddress().toString())
212-
+ "] is not connected. Health Monitoring for this node is now disabled.");
213-
connectionIterator.remove();
214-
continue;
215-
}
216-
217-
if (mtb.getReceiveLayer() == null) {
218-
logger.info("[" + (conn == null ? null : conn.getRemoteAddress().toString())
219-
+ "] is no longer referenced. Closing the connection");
220-
mtb.disconnect();
221-
connectionIterator.remove();
222-
continue;
223-
}
216+
if (reachable.get()) {
217+
Iterator<MessageTransportBase> connectionIterator = connectionMap.values().iterator();
218+
while (connectionIterator.hasNext()) {
219+
MessageTransportBase mtb = connectionIterator.next();
224220

225-
ConnectionHealthCheckerContext connContext = mtb.getHealthCheckerContext();
226-
if ((conn.getIdleReceiveTime() >= this.pingIdleTime)) {
221+
TCConnection conn = mtb.getConnection();
222+
if (conn == null || !mtb.isConnected()) {
223+
logger.info("[" + (conn == null ? null : conn.getRemoteAddress().toString())
224+
+ "] is not connected. Health Monitoring for this node is now disabled.");
225+
connectionIterator.remove();
226+
continue;
227+
}
227228

228-
if (!connContext.probeIfAlive()) {
229-
// Connection is dead. Disconnect the transport.
230-
logger.error("Declared connection dead " + mtb.getConnectionID() + " idle time "
231-
+ conn.getIdleReceiveTime() + "ms");
229+
if (mtb.getReceiveLayer() == null) {
230+
logger.info("[" + (conn == null ? null : conn.getRemoteAddress().toString())
231+
+ "] is no longer referenced. Closing the connection");
232232
mtb.disconnect();
233233
connectionIterator.remove();
234+
continue;
235+
}
236+
237+
ConnectionHealthCheckerContext connContext = mtb.getHealthCheckerContext();
238+
if ((conn.getIdleReceiveTime() >= this.pingIdleTime)) {
239+
240+
if (!connContext.probeIfAlive()) {
241+
// Connection is dead. Disconnect the transport.
242+
logger.error("Declared connection dead " + mtb.getConnectionID() + " idle time "
243+
+ conn.getIdleReceiveTime() + "ms");
244+
mtb.disconnect();
245+
connectionIterator.remove();
246+
}
247+
} else {
248+
connContext.refresh();
249+
}
250+
// is there any significant time difference between hosts ?
251+
if (canCheckTime) {
252+
connContext.checkTime();
234253
}
235-
} else {
236-
connContext.refresh();
237-
}
238-
// is there any significant time difference between hosts ?
239-
if (canCheckTime) {
240-
connContext.checkTime();
241254
}
242255
}
243256

common/src/main/java/com/tc/net/protocol/transport/ConnectionWatcher.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,21 @@ public class ConnectionWatcher implements MessageTransportListener {
3939
* Listens to events from a MessageTransport, acts on them, and passes events through to target
4040
*/
4141
public ConnectionWatcher(ClientMessageChannel target, ClientConnectionEstablisher cce) {
42+
// this the channel is no longer reachable, make sure all the the connection threads are cleaned up
4243
this.targetHolder = new WeakReference<>(target, stopQueue);
4344
this.cce = cce;
4445
this.connection = target.getConnectionID();
4546
}
4647

4748
private boolean checkForStop() {
4849
Reference<? extends ClientMessageChannel> target = stopQueue.poll();
49-
if (target != null) {
50+
while (target != null) {
5051
if (target == targetHolder) {
5152
stopped.set();
5253
LOGGER.warn("unreferenced connection left open {} {}", targetHolder.get(), connection);
5354
cce.shutdown();
5455
}
56+
target = stopQueue.poll();
5557
}
5658
return stopped.isSet();
5759
}

common/src/test/java/com/tc/net/protocol/transport/HealthCheckerMonitorThreadEngineTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void testAllowCheckTimeIfEnabledInConfig() {
4646
// ignore interval
4747
props.setProperty("checkTime.interval", "-1");
4848
HealthCheckerConfigImpl config = new HealthCheckerConfigImpl(props, "test-config");
49-
final HealthCheckerMonitorThreadEngine engine = new HealthCheckerMonitorThreadEngine(config, null, logger);
49+
final HealthCheckerMonitorThreadEngine engine = new HealthCheckerMonitorThreadEngine(config, null, ()->Boolean.TRUE, logger);
5050

5151
assertTrue(engine.canCheckTime());
5252
}
@@ -57,7 +57,7 @@ public void testDisallowCheckTimeIfDisabledInConfig() {
5757
// disable time checking
5858
props.setProperty("checkTime.enabled", "false");
5959
HealthCheckerConfigImpl config = new HealthCheckerConfigImpl(props, "test-config");
60-
final HealthCheckerMonitorThreadEngine engine = new HealthCheckerMonitorThreadEngine(config, null, logger);
60+
final HealthCheckerMonitorThreadEngine engine = new HealthCheckerMonitorThreadEngine(config, null, ()->Boolean.TRUE, logger);
6161

6262
assertFalse(engine.canCheckTime());
6363
}
@@ -68,7 +68,7 @@ public void testDisallowCheckTimeIfIntervalNotExceeded() {
6868
// set short interval
6969
props.setProperty("checkTime.interval", "900000");
7070
HealthCheckerConfigImpl config = new HealthCheckerConfigImpl(props, "test-config");
71-
final HealthCheckerMonitorThreadEngine engine = new HealthCheckerMonitorThreadEngine(config, null, logger);
71+
final HealthCheckerMonitorThreadEngine engine = new HealthCheckerMonitorThreadEngine(config, null, ()->Boolean.TRUE, logger);
7272

7373
assertFalse(engine.canCheckTime());
7474
}

galvan-support/src/test/java/org/terracotta/testing/rules/ClientLeakIT.java

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,18 @@
1919
package org.terracotta.testing.rules;
2020

2121

22+
import java.util.Properties;
2223
import static org.junit.Assert.assertFalse;
24+
import static org.junit.Assert.assertNotNull;
2325
import static org.junit.Assert.assertNull;
2426
import org.junit.ClassRule;
2527
import org.junit.Test;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2630
import org.terracotta.connection.Connection;
2731
import org.terracotta.connection.ConnectionException;
32+
import org.terracotta.connection.ConnectionFactory;
33+
import org.terracotta.connection.ConnectionPropertyNames;
2834

2935

3036
/**
@@ -35,6 +41,8 @@
3541
// XXX: Currently ignored since this test depends on restartability of the server, which now requires a persistence service
3642
// to be plugged in (and there isn't one available, in open source).
3743
public class ClientLeakIT {
44+
private static final Logger LOGGER = LoggerFactory.getLogger(ClientLeakIT.class);
45+
3846
@ClassRule
3947
public static final Cluster CLUSTER = BasicExternalClusterBuilder.newCluster(1).build(); //logConfigExtensionResourceName("custom-logback-ext.xml").build();
4048

@@ -117,26 +125,46 @@ public void testConnectionMakerDiesWithNoRef() throws Exception {
117125
assertFalse(maker.isAlive());
118126
}
119127
}
120-
121-
private static Thread lookForConnectionMaker() {
128+
129+
@Test
130+
public void testSEDADiesWithNoRef() throws Exception {
131+
CLUSTER.getClusterControl().startAllServers();
132+
CLUSTER.getClusterControl().waitForActive();
133+
134+
String connectionName = "LEAKTESTCLIENT";
135+
Properties props = new Properties();
136+
props.setProperty(ConnectionPropertyNames.CONNECTION_NAME, connectionName);
137+
Connection leak = ConnectionFactory.connect(CLUSTER.getConnectionURI(), props);
138+
String cid = leak.toString();
139+
140+
assertNotNull(leak);
141+
assertNotNull(lookForThreadWithName(connectionName));
142+
143+
leak = null;
144+
145+
while (lookForThreadWithName(connectionName) != null) {
146+
LOGGER.info(cid);
147+
System.gc();
148+
Thread.sleep(1_000);
149+
}
150+
}
151+
152+
private static Thread lookForThreadWithName(String name) {
122153
Thread[] list = new Thread[Thread.activeCount()];
123154
Thread.enumerate(list);
124155
for (Thread t : list) {
125-
if (t != null && t.getName().startsWith("Connection Maker")) {
156+
if (t != null && t.getName().startsWith(name)) {
126157
return t;
127158
}
128159
}
129160
return null;
161+
}
162+
163+
private static Thread lookForConnectionMaker() {
164+
return lookForThreadWithName("Connection Maker");
130165
}
131166

132167
private static Thread lookForConnectionEstablisher() {
133-
Thread[] list = new Thread[Thread.activeCount()];
134-
Thread.enumerate(list);
135-
for (Thread t : list) {
136-
if (t != null && t.getName().startsWith("ConnectionEstablisher")) {
137-
return t;
138-
}
139-
}
140-
return null;
168+
return lookForThreadWithName("ConnectionEstablisher");
141169
}
142170
}

tc-client/src/main/java/com/tc/object/ClientBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ ClientHandshakeManager createClientHandshakeManager(Logger logger,
6161
String clientRevision,
6262
ClientEntityManager entity);
6363

64-
ClientEntityManager createClientEntityManager(ClientMessageChannel channel, StageManager stages);
64+
ClientEntityManager createClientEntityManager(ClientMessageChannel channel);
6565

6666
void setClientConnectionErrorListener(ClientConnectionErrorListener listener);
6767

tc-client/src/main/java/com/tc/object/ClientConfigurationContext.java

Lines changed: 0 additions & 30 deletions
This file was deleted.

0 commit comments

Comments
 (0)