Skip to content

Commit 1fe56be

Browse files
committed
tmp diag code for testing
1 parent 13a3821 commit 1fe56be

File tree

2 files changed

+360
-7
lines changed

2 files changed

+360
-7
lines changed

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java

Lines changed: 198 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@
4343
import org.junit.Rule;
4444
import org.junit.jupiter.api.parallel.ResourceLock;
4545
import org.junit.rules.TemporaryFolder;
46+
import org.junit.rules.TestRule;
47+
import org.junit.rules.TestWatcher;
48+
import org.junit.runner.Description;
4649
import org.slf4j.Logger;
4750
import org.slf4j.LoggerFactory;
4851

@@ -51,6 +54,7 @@
5154
import java.util.ArrayList;
5255
import java.util.Collection;
5356
import java.util.List;
57+
import java.util.Map;
5458
import java.util.Properties;
5559
import java.util.concurrent.atomic.AtomicReference;
5660

@@ -95,30 +99,125 @@ public abstract class KafkaTestBase extends TestLogger {
9599

96100
@Rule public final RetryRule retryRule = new RetryRule();
97101

102+
@Rule
103+
public TestRule testMethodLogger =
104+
new TestWatcher() {
105+
private long startTime;
106+
107+
@Override
108+
protected void starting(Description description) {
109+
startTime = System.currentTimeMillis();
110+
LOG.info(
111+
"=== DIAGNOSTIC: TEST STARTING: {}.{} ===",
112+
description.getClassName(),
113+
description.getMethodName());
114+
LOG.info("=== DIAGNOSTIC: Test start time: {} ===", startTime);
115+
LOG.info(
116+
"=== DIAGNOSTIC: Active threads at test start: {} ===",
117+
Thread.activeCount());
118+
}
119+
120+
@Override
121+
protected void succeeded(Description description) {
122+
long duration = System.currentTimeMillis() - startTime;
123+
LOG.info(
124+
"=== DIAGNOSTIC: TEST SUCCEEDED: {}.{} in {} ms ===",
125+
description.getClassName(),
126+
description.getMethodName(),
127+
duration);
128+
LOG.info(
129+
"=== DIAGNOSTIC: Active threads at test success: {} ===",
130+
Thread.activeCount());
131+
}
132+
133+
@Override
134+
protected void failed(Throwable e, Description description) {
135+
long duration = System.currentTimeMillis() - startTime;
136+
LOG.info(
137+
"=== DIAGNOSTIC: TEST FAILED: {}.{} after {} ms - {} ===",
138+
description.getClassName(),
139+
description.getMethodName(),
140+
duration,
141+
e.getMessage());
142+
LOG.info(
143+
"=== DIAGNOSTIC: Active threads at test failure: {} ===",
144+
Thread.activeCount());
145+
LOG.error("=== DIAGNOSTIC: Full stack trace for failed test ===", e);
146+
}
147+
148+
@Override
149+
protected void finished(Description description) {
150+
long duration = System.currentTimeMillis() - startTime;
151+
LOG.info(
152+
"=== DIAGNOSTIC: TEST FINISHED: {}.{} total duration {} ms ===",
153+
description.getClassName(),
154+
description.getMethodName(),
155+
duration);
156+
}
157+
};
158+
98159
// ------------------------------------------------------------------------
99160
// Setup and teardown of the mini clusters
100161
// ------------------------------------------------------------------------
101162

102163
@BeforeClass
103164
public static void prepare() throws Exception {
165+
LOG.info("=== DIAGNOSTIC: KafkaTestBase.prepare() starting ===");
166+
LOG.info(
167+
"=== DIAGNOSTIC: JVM version: {}, Flink version from classpath ===",
168+
System.getProperty("java.version"));
169+
LOG.info(
170+
"=== DIAGNOSTIC: Available memory: {} MB ===",
171+
Runtime.getRuntime().maxMemory() / 1024 / 1024);
172+
LOG.info(
173+
"=== DIAGNOSTIC: CI environment: {} ===",
174+
System.getenv("GITHUB_ACTIONS") != null ? "GitHub Actions" : "Local");
175+
LOG.info("=== DIAGNOSTIC: Thread count before setup: {} ===", Thread.activeCount());
176+
LOG.info("=== DIAGNOSTIC: Docker environment check ===");
104177
LOG.info("-------------------------------------------------------------------------");
105178
LOG.info(" Starting KafkaTestBase ");
106179
LOG.info("-------------------------------------------------------------------------");
107180

108-
startClusters(false, numKafkaClusters);
181+
try {
182+
startClusters(false, numKafkaClusters);
183+
LOG.info("=== DIAGNOSTIC: KafkaTestBase.prepare() completed successfully ===");
184+
} catch (Exception e) {
185+
LOG.error(
186+
"=== DIAGNOSTIC: KafkaTestBase.prepare() FAILED with exception: {} ===",
187+
e.getMessage(),
188+
e);
189+
throw e;
190+
}
109191
}
110192

111193
@AfterClass
112194
public static void shutDownServices() throws Exception {
195+
LOG.info("=== DIAGNOSTIC: KafkaTestBase.shutDownServices() starting ===");
196+
LOG.info("=== DIAGNOSTIC: Thread count before shutdown: {} ===", Thread.activeCount());
113197

114198
LOG.info("-------------------------------------------------------------------------");
115199
LOG.info(" Shut down KafkaTestBase ");
116200
LOG.info("-------------------------------------------------------------------------");
117201

118-
TestStreamEnvironment.unsetAsContext();
202+
try {
203+
TestStreamEnvironment.unsetAsContext();
204+
LOG.info("=== DIAGNOSTIC: TestStreamEnvironment unset successfully ===");
205+
} catch (Exception e) {
206+
LOG.error(
207+
"=== DIAGNOSTIC: Failed to unset TestStreamEnvironment: {} ===",
208+
e.getMessage(),
209+
e);
210+
}
119211

120-
shutdownClusters();
212+
try {
213+
shutdownClusters();
214+
LOG.info("=== DIAGNOSTIC: Clusters shutdown successfully ===");
215+
} catch (Exception e) {
216+
LOG.error("=== DIAGNOSTIC: Failed to shutdown clusters: {} ===", e.getMessage(), e);
217+
throw e;
218+
}
121219

220+
LOG.info("=== DIAGNOSTIC: Thread count after shutdown: {} ===", Thread.activeCount());
122221
LOG.info("-------------------------------------------------------------------------");
123222
LOG.info(" KafkaTestBase finished");
124223
LOG.info("-------------------------------------------------------------------------");
@@ -144,59 +243,122 @@ public static void startClusters(boolean secureMode, int numKafkaClusters) throw
144243

145244
public static void startClusters(
146245
KafkaTestEnvironment.Config environmentConfig, int numKafkaClusters) throws Exception {
246+
LOG.info(
247+
"=== DIAGNOSTIC: startClusters() called with {} kafka clusters ===",
248+
numKafkaClusters);
147249
for (int i = 0; i < numKafkaClusters; i++) {
250+
LOG.info("=== DIAGNOSTIC: Starting kafka cluster {}/{} ===", i + 1, numKafkaClusters);
251+
long clusterStartTime = System.currentTimeMillis();
148252
startClusters(environmentConfig);
253+
long clusterDuration = System.currentTimeMillis() - clusterStartTime;
254+
LOG.info(
255+
"=== DIAGNOSTIC: Kafka cluster {}/{} started in {} ms ===",
256+
i + 1,
257+
numKafkaClusters,
258+
clusterDuration);
259+
149260
KafkaClusterTestEnvMetadata kafkaClusterTestEnvMetadata =
150261
new KafkaClusterTestEnvMetadata(
151262
i, kafkaServer, standardProps, brokerConnectionStrings, secureProps);
152263
kafkaClusters.add(kafkaClusterTestEnvMetadata);
153264
LOG.info("Created Kafka cluster with configuration: {}", kafkaClusterTestEnvMetadata);
154265
}
266+
LOG.info(
267+
"=== DIAGNOSTIC: All {} kafka clusters started successfully ===", numKafkaClusters);
155268
}
156269

157270
public static void startClusters(KafkaTestEnvironment.Config environmentConfig)
158271
throws Exception {
272+
LOG.info("=== DIAGNOSTIC: startClusters() single cluster method called ===");
273+
274+
LOG.info("=== DIAGNOSTIC: Constructing KafkaTestEnvironment ===");
159275
kafkaServer = constructKafkaTestEnvironment();
276+
LOG.info("=== DIAGNOSTIC: KafkaTestEnvironment constructed successfully ===");
160277

161278
LOG.info("Starting KafkaTestBase.prepare() for Kafka {}", kafkaServer.getVersion());
162279

280+
LOG.info("=== DIAGNOSTIC: Calling kafkaServer.prepare() ===");
281+
long prepareStartTime = System.currentTimeMillis();
163282
kafkaServer.prepare(environmentConfig);
283+
long prepareDuration = System.currentTimeMillis() - prepareStartTime;
284+
LOG.info("=== DIAGNOSTIC: kafkaServer.prepare() completed in {} ms ===", prepareDuration);
164285

165286
standardProps = kafkaServer.getStandardProperties();
287+
LOG.info("=== DIAGNOSTIC: Standard properties retrieved ===");
166288

167289
brokerConnectionStrings = kafkaServer.getBrokerConnectionString();
290+
LOG.info("=== DIAGNOSTIC: Broker connection string: {} ===", brokerConnectionStrings);
168291

169292
if (environmentConfig.isSecureMode()) {
293+
LOG.info("=== DIAGNOSTIC: Setting up secure mode ===");
170294
if (!kafkaServer.isSecureRunSupported()) {
171295
throw new IllegalStateException(
172296
"Attempting to test in secure mode but secure mode not supported by the KafkaTestEnvironment.");
173297
}
174298
secureProps = kafkaServer.getSecureProperties();
299+
LOG.info("=== DIAGNOSTIC: Secure properties configured ===");
175300
}
301+
LOG.info("=== DIAGNOSTIC: Single cluster startup completed ===");
176302
}
177303

178304
public static KafkaTestEnvironment constructKafkaTestEnvironment() throws Exception {
305+
LOG.info("=== DIAGNOSTIC: Loading KafkaTestEnvironmentImpl class ===");
179306
Class<?> clazz =
180307
Class.forName(
181308
"org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
182-
return (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
309+
LOG.info("=== DIAGNOSTIC: Instantiating KafkaTestEnvironmentImpl ===");
310+
KafkaTestEnvironment environment =
311+
(KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
312+
LOG.info(
313+
"=== DIAGNOSTIC: KafkaTestEnvironment instantiated successfully: {} ===",
314+
environment.getClass().getName());
315+
return environment;
183316
}
184317

185318
public static void shutdownClusters() throws Exception {
319+
LOG.info("=== DIAGNOSTIC: shutdownClusters() called ===");
320+
186321
if (secureProps != null) {
322+
LOG.info("=== DIAGNOSTIC: Clearing secure props ===");
187323
secureProps.clear();
188324
}
189325

190326
if (kafkaServer != null) {
327+
LOG.info("=== DIAGNOSTIC: Shutting down kafka server ===");
328+
long shutdownStartTime = System.currentTimeMillis();
191329
kafkaServer.shutdown();
330+
long shutdownDuration = System.currentTimeMillis() - shutdownStartTime;
331+
LOG.info(
332+
"=== DIAGNOSTIC: Kafka server shutdown completed in {} ms ===",
333+
shutdownDuration);
334+
} else {
335+
LOG.info("=== DIAGNOSTIC: Kafka server is null, skipping shutdown ===");
192336
}
193337

194338
if (kafkaClusters != null && !kafkaClusters.isEmpty()) {
339+
LOG.info("=== DIAGNOSTIC: Shutting down {} kafka clusters ===", kafkaClusters.size());
340+
int clusterIndex = 0;
195341
for (KafkaClusterTestEnvMetadata value : kafkaClusters) {
342+
clusterIndex++;
343+
LOG.info(
344+
"=== DIAGNOSTIC: Shutting down cluster {}/{} ===",
345+
clusterIndex,
346+
kafkaClusters.size());
347+
long clusterShutdownStart = System.currentTimeMillis();
196348
value.getKafkaTestEnvironment().shutdown();
349+
long clusterShutdownDuration = System.currentTimeMillis() - clusterShutdownStart;
350+
LOG.info(
351+
"=== DIAGNOSTIC: Cluster {}/{} shutdown completed in {} ms ===",
352+
clusterIndex,
353+
kafkaClusters.size(),
354+
clusterShutdownDuration);
197355
}
198356
kafkaClusters.clear();
357+
LOG.info("=== DIAGNOSTIC: All kafka clusters shutdown successfully ===");
358+
} else {
359+
LOG.info("=== DIAGNOSTIC: No kafka clusters to shutdown ===");
199360
}
361+
LOG.info("=== DIAGNOSTIC: shutdownClusters() completed ===");
200362
}
201363

202364
// ------------------------------------------------------------------------
@@ -278,6 +440,38 @@ public static void setNumKafkaClusters(int size) {
278440
numKafkaClusters = size;
279441
}
280442

443+
/**
444+
* Helper method to print diagnostic information about all active threads. This can be useful
445+
* for debugging hanging tests.
446+
*/
447+
public static void printThreadDiagnostics(String context) {
448+
LOG.info("=== DIAGNOSTIC: Thread diagnostics for context: {} ===", context);
449+
LOG.info("=== DIAGNOSTIC: Total active threads: {} ===", Thread.activeCount());
450+
451+
Map<Thread, StackTraceElement[]> allThreads = Thread.getAllStackTraces();
452+
LOG.info("=== DIAGNOSTIC: All thread stack traces ({} threads): ===", allThreads.size());
453+
454+
for (Map.Entry<Thread, StackTraceElement[]> entry : allThreads.entrySet()) {
455+
Thread thread = entry.getKey();
456+
StackTraceElement[] stackTrace = entry.getValue();
457+
458+
LOG.info(
459+
"=== Thread: {} ({}), State: {}, Daemon: {} ===",
460+
thread.getName(),
461+
thread.getId(),
462+
thread.getState(),
463+
thread.isDaemon());
464+
465+
if (stackTrace.length > 0) {
466+
LOG.info("=== Stack trace (top 5 frames): ===");
467+
for (int i = 0; i < Math.min(5, stackTrace.length); i++) {
468+
LOG.info(" {}", stackTrace[i]);
469+
}
470+
}
471+
}
472+
LOG.info("=== END Thread diagnostics for context: {} ===", context);
473+
}
474+
281475
/** Metadata generated by this test utility. */
282476
public static class KafkaClusterTestEnvMetadata {
283477

0 commit comments

Comments
 (0)