Skip to content

Commit 4c01f51

Browse files
committed
Update InlineServer shutdown code and spin a process if thread leak is detected
1 parent 045dded commit 4c01f51

File tree

16 files changed

+198
-145
lines changed

16 files changed

+198
-145
lines changed

common/src/main/java/com/tc/lang/TCThreadGroup.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public TCThreadGroup(ThrowableHandler throwableHandler, String name) {
4646
}
4747

4848
public TCThreadGroup(ThrowableHandler throwableHandler, String name, boolean stoppable) {
49-
this(throwableHandler, name, stoppable, !stoppable);
49+
this(throwableHandler, name, stoppable, false);
5050
}
5151

5252
public TCThreadGroup(ThrowableHandler throwableHandler, String name, boolean stoppable, boolean ignorePool) {
@@ -76,6 +76,7 @@ public boolean isStoppable() {
7676
public void printLiveThreads(Consumer<String> reporter) {
7777
for (Thread t : threads()) {
7878
if (t != null && t != Thread.currentThread()) {
79+
reporter.accept(t.getThreadGroup().getName() + " - " + t.getName());
7980
reporter.accept(ThreadDumpUtil.getThreadDump(t));
8081
}
8182
}
@@ -121,7 +122,7 @@ private boolean lookForThreadExit(Thread t, Consumer<InterruptedException> inter
121122
}
122123
}
123124

124-
private synchronized List<Thread> threads() {
125+
private List<Thread> threads() {
125126
int ac = activeCount();
126127
Thread[] list = new Thread[ac];
127128
enumerate(list, true);

common/src/main/java/com/tc/net/protocol/tcm/CommunicationsManagerImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import com.tc.net.protocol.transport.WireProtocolMessageSink;
5555
import com.tc.net.core.ProductID;
5656
import com.tc.util.Assert;
57-
import com.tc.util.TCTimeoutException;
5857
import com.tc.util.concurrent.SetOnceFlag;
5958

6059
import java.io.IOException;

galvan-support/src/main/java/org/terracotta/testing/rules/BasicExternalCluster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ private void internalStart(CompletableFuture<Void> checker) throws Throwable {
247247
ServerInstance serverProcess = !inline ?
248248
new ServerProcess(serverName, server, serverWorkingDir, serverHeapSize, debugPort, systemProperties, parentOutput, builder.build())
249249
:
250-
new InlineServer(serverName, server, serverWorkingDir, systemProperties, parentOutput, builder.build());
250+
new InlineServer(serverName, server, serverWorkingDir, serverHeapSize, debugPort, systemProperties, parentOutput, builder.build());
251251

252252
stripeInstaller.installNewServer(serverProcess);
253253
}

galvan-support/src/test/java/org/terracotta/testing/rules/SimpleActivePassiveWithClassRuleIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.junit.ClassRule;
2222
import org.junit.Test;
23-
import org.terracotta.passthrough.IClusterControl;
2423

2524

2625
/**

galvan/src/main/java/org/terracotta/testing/master/IGalvanServer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,6 @@ public interface IGalvanServer {
3939
void waitForTermination();
4040

4141
void waitForState(ServerMode mode) throws InterruptedException;
42+
43+
IGalvanServer newInstance();
4244
}

galvan/src/main/java/org/terracotta/testing/master/InlineServer.java

Lines changed: 112 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,14 @@
2828
import java.nio.file.Path;
2929
import static java.nio.file.StandardOpenOption.APPEND;
3030
import static java.nio.file.StandardOpenOption.CREATE;
31+
import java.util.ArrayList;
3132
import java.util.Arrays;
33+
import java.util.Collections;
3234
import java.util.List;
3335
import java.util.Objects;
3436
import java.util.Properties;
3537
import java.util.UUID;
38+
import java.util.stream.Collectors;
3639
import org.slf4j.Logger;
3740
import org.slf4j.LoggerFactory;
3841
import org.terracotta.testing.logging.VerboseOutputStream;
@@ -47,16 +50,26 @@ public class InlineServer extends ServerInstance {
4750
private final OutputStream parentOutput;
4851
private final String[] cmd;
4952

50-
private ServerThread server;
5153

52-
public InlineServer(String serverName, Path serverInstall, Path serverWorkingDir, Properties serverProperties, OutputStream out, String[] cmd) {
54+
private final int heap;
55+
private final int debug;
56+
private boolean leakDetected = false;
57+
private ServerThread serverThread;
58+
59+
public InlineServer(String serverName, Path serverInstall, Path serverWorkingDir, int heap, int debug, Properties serverProperties, OutputStream out, String[] cmd) {
5360
super(serverName);
5461
// We need to specify a positive integer as the heap size.
5562
this.serverInstall = serverInstall;
5663
this.serverWorkingDir = serverWorkingDir;
5764
this.serverProperties = serverProperties;
5865
this.parentOutput = out;
5966
this.cmd = cmd;
67+
this.debug = debug;
68+
this.heap = heap;
69+
}
70+
71+
public InlineServer(String serverName, Path serverInstall, Path serverWorkingDir, Properties serverProperties, OutputStream out, String[] cmd) {
72+
this(serverName, serverInstall, serverWorkingDir, 512, 0, serverProperties, out, cmd);
6073
}
6174

6275
/**
@@ -76,10 +89,10 @@ public void start() throws IOException {
7689
try {
7790
// First thing we need to do is make sure that we aren't already running.
7891
Assert.assertFalse(this.isServerRunning());
79-
Assert.assertTrue(server == null || server.shutdown());
92+
Assert.assertTrue(serverThread == null || serverThread.shutdown());
8093
setCurrentState(ServerMode.STARTUP);
81-
server = new ServerThread();
82-
server.start();
94+
serverThread = new ServerThread();
95+
serverThread.start();
8396
} finally {
8497
exit(token);
8598
}
@@ -147,10 +160,10 @@ public void stop() throws InterruptedException {
147160
// Can't stop something not running.
148161
if (isServerRunning()) {
149162
// Log the intent.
150-
serverLogger.output("Crashing server process: " + server);
163+
serverLogger.output("Crashing server process: " + serverThread);
151164
// Mark this as expected.
152165
this.setCrashExpected(true);
153-
boolean result = server.shutdown();
166+
boolean result = serverThread.shutdown();
154167
serverLogger.output("Server Stop Command Result: " + result);
155168
}
156169
} finally {
@@ -229,24 +242,112 @@ public void run() {
229242
}
230243
}
231244

245+
private Object getServerObject() {
246+
return server;
247+
}
248+
232249
private synchronized boolean initializeServer(OutputStream out) throws Exception {
233-
server = startIsolatedServer(serverWorkingDir, serverInstall, out, cmd, serverProperties);
234-
running = server != null;
250+
server = startIsolatedServer(serverWorkingDir, serverInstall, out, cmd, serverProperties);
251+
running = server != null;
235252
return running;
236253
}
237254

238255
public synchronized boolean shutdown() {
239256
if (running) {
240257
running = false;
241-
String result = invokeOnServerMBean(server, "Server","halt",null);
242-
serverLogger.output("stopping. " + result);
258+
String result = invokeOnServerMBean(server, "Server","stopAndWait",null);
259+
serverLogger.output("stopping. restart:" + result);
260+
detectThreadLeaks();
243261
return !Boolean.parseBoolean(result);
244262
} else {
245263
return true;
246264
}
247265
}
248266
}
249267

268+
private void detectThreadLeaks() {
269+
ThreadGroup grp = (ThreadGroup)invokeOnObject(serverThread.getServerObject(), "getServerThreadGroup");
270+
List<Thread> threads = threads(grp);
271+
if (!threads.isEmpty()) {
272+
LOGGER.warn("Inline server threads are leaking:");
273+
LOGGER.warn("server name:{} path:{}", serverName, serverWorkingDir);
274+
for (Thread t : threads) {
275+
LOGGER.warn("thread group: {}", climbThreadGroupChain(t.getThreadGroup()));
276+
LOGGER.warn(getThreadDump(t));
277+
}
278+
grp.interrupt();
279+
leakDetected = false;
280+
boolean interrupted = false;
281+
try {
282+
for (Thread t : threads) {
283+
try {
284+
t.join(500L);
285+
leakDetected = t.isAlive() | leakDetected;
286+
} catch (InterruptedException ie) {
287+
interrupted = true;
288+
}
289+
}
290+
} finally {
291+
if (interrupted) {
292+
Thread.currentThread().interrupt();
293+
}
294+
}
295+
}
296+
}
297+
298+
private List<Thread> threads(ThreadGroup grp) {
299+
if (grp != null) {
300+
int ac = grp.activeCount();
301+
Thread[] list = new Thread[ac];
302+
grp.enumerate(list, true);
303+
return Arrays.stream(list).filter(t -> t != null && t.isAlive()).collect(Collectors.toList());
304+
} else {
305+
return Collections.emptyList();
306+
}
307+
}
308+
309+
private List<ThreadGroup> climbThreadGroupChain(ThreadGroup tg) {
310+
List<ThreadGroup> list = new ArrayList<>();
311+
while (tg != null) {
312+
list.add(tg);
313+
tg = tg.getParent();
314+
}
315+
Collections.reverse(list);
316+
return list;
317+
}
318+
319+
private static String getThreadDump(Thread t) {
320+
final StringBuilder sb = new StringBuilder(100 * 1024);
321+
sb.append("name=");
322+
sb.append(t.getName());
323+
sb.append(" id=");
324+
sb.append(t.getId());
325+
sb.append('\n');
326+
327+
final StackTraceElement[] stea = t.getStackTrace();
328+
for (StackTraceElement element : stea) {
329+
sb.append("\tat ");
330+
sb.append(element.toString());
331+
sb.append('\n');
332+
}
333+
sb.append('\n');
334+
335+
return sb.toString();
336+
}
337+
338+
@Override
339+
public IGalvanServer newInstance() {
340+
if (leakDetected) {
341+
LOGGER.warn("Leak detected switching to process mode");
342+
ServerProcess process = new ServerProcess(serverName, serverInstall, serverWorkingDir, heap,
343+
debug, serverProperties, parentOutput, cmd);
344+
process.installIntoStripe(stateInterlock, stateManager, this.serverLogger);
345+
return process;
346+
} else {
347+
return this;
348+
}
349+
}
350+
250351
private static Object startIsolatedServer(Path serverWorking, Path server, OutputStream out, String[] cmd, Properties serverProperties) {
251352
ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
252353
Path tc = server.resolve("tc.jar");

galvan/src/main/java/org/terracotta/testing/master/ServerInstance.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ public ServerInstance(String serverName) {
5656
this.serverName = serverName;
5757
}
5858

59-
public void installIntoStripe(StateInterlock stateInterlock, ITestStateManager stateManager, VerboseManager logging) {
59+
public void installIntoStripe(StateInterlock stateInterlock, ITestStateManager stateManager, ContextualLogger logging) {
6060
this.stateInterlock = stateInterlock;
6161
this.stateManager = stateManager;
62-
this.serverLogger = logging.createServerLogger();
62+
this.serverLogger = logging;
6363
this.stateInterlock.registerNewServer(this);
6464
}
6565

@@ -286,6 +286,11 @@ public synchronized void waitForState(ServerMode mode) throws InterruptedExcepti
286286
}
287287
}
288288

289+
@Override
290+
public IGalvanServer newInstance() {
291+
return this;
292+
}
293+
289294
@Override
290295
public String toString() {
291296
return "Server " + this.serverName + " (" + getCurrentState() + ")";

galvan/src/main/java/org/terracotta/testing/master/ServerProcessControl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public synchronized void startOneServer() throws GalvanFailureException {
101101
this.logger.output("<<< startOneServer");
102102
}
103103

104-
public synchronized void startServer() throws GalvanFailureException {
104+
private synchronized void startServer() throws GalvanFailureException {
105105
IGalvanServer server = null;
106106
int tries = 1;
107107
while (null == (server = this.stateInterlock.getOneTerminatedServer())) {

galvan/src/main/java/org/terracotta/testing/master/StateInterlock.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,13 @@ public IGalvanServer getOneTerminatedServer() throws GalvanFailureException {
157157
one = this.servers.stream().filter(s->s.getCurrentState() == ServerMode.TERMINATED).findAny().orElse(null);
158158
}
159159
this.logger.output("getOneTerminatedServer " + one);
160+
if (one != null) {
161+
IGalvanServer newOne = one.newInstance();
162+
if (newOne != one) {
163+
this.servers.remove(one);
164+
one = newOne;
165+
}
166+
}
160167
return one;
161168
}
162169

@@ -193,6 +200,7 @@ private boolean checkIfEmpty() {
193200
&& this.runningClients.isEmpty();
194201
}
195202
// ----- CLEANUP-----
203+
@Override
196204
public void forceShutdown() throws GalvanFailureException {
197205
this.logger.output("> forceShutdown");
198206
// Set the flag that we are shutting down. That way, any servers which were concurrently coming online can be stopped when they check in.

galvan/src/main/java/org/terracotta/testing/master/StripeInstaller.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public StripeInstaller(StateInterlock interlock, ITestStateManager stateManager,
4343
public void installNewServer(ServerInstance serverProcess) throws IOException {
4444
// Our implementation installs all servers before starting any (just an internal consistency check).
4545
Assert.assertFalse(this.isBuilt);
46-
serverProcess.installIntoStripe(interlock, stateManager, stripeVerboseManager.createComponentManager("[" + serverProcess.serverName + "]"));
46+
serverProcess.installIntoStripe(interlock, stateManager, stripeVerboseManager.createComponentManager("[" + serverProcess.serverName + "]").createServerLogger());
4747
serverProcesses.add(serverProcess);
4848
}
4949

0 commit comments

Comments
 (0)