Skip to content

Commit 2bae9c3

Browse files
authored
KAFKA-18952 Fix flaky test in MonitorableSinkIntegrationTest (#21057)
The test `testMonitorableSinkConnectorAndTask` is flaky due to a race condition between the task thread and the test thread. The `awaitRecords()` method uses a `CountDownLatch` that counts down in `TestableSinkTask.put()` for each record inside the loop, while `MonitorableSinkTask.count` is updated after `super.put()` returns. When the latch reaches zero, `awaitRecords()` returns immediately, but the `count += records.size() `may not have executed yet. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent aef1fac commit 2bae9c3

File tree

1 file changed

+15
-6
lines changed

1 file changed

+15
-6
lines changed

connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkIntegrationTest.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class MonitorableSinkIntegrationTest {
5858
private static final int NUM_TASKS = 1;
5959
private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
6060
private static final long CONSUME_MAX_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
61+
private static final long METRICS_CONVERGENCE_DURATION_MS = TimeUnit.SECONDS.toMillis(5);
6162

6263
private EmbeddedConnectStandalone connect;
6364
private ConnectorHandle connectorHandle;
@@ -119,11 +120,19 @@ public void testMonitorableSinkConnectorAndTask() throws Exception {
119120

120121
// check task metric
121122
metrics = connect.connectMetrics().metrics().metrics();
122-
MetricName taskMetric = MonitorableSinkConnector.MonitorableSinkTask.metricsName;
123-
assertTrue(metrics.containsKey(taskMetric));
124-
assertEquals(CONNECTOR_NAME, taskMetric.tags().get("connector"));
125-
assertEquals("0", taskMetric.tags().get("task"));
126-
assertEquals((double) NUM_RECORDS_PRODUCED, metrics.get(taskMetric).metricValue());
123+
MetricName taskMetricName = MonitorableSinkConnector.MonitorableSinkTask.metricsName;
124+
assertTrue(metrics.containsKey(taskMetricName));
125+
assertEquals(CONNECTOR_NAME, taskMetricName.tags().get("connector"));
126+
assertEquals("0", taskMetricName.tags().get("task"));
127+
128+
KafkaMetric taskMetric = metrics.get(taskMetricName);
129+
// The metric value may not be updated immediately after awaitRecords() returns,
130+
// because MonitorableSinkTask.count is incremented after TestableSinkTask.put()
131+
// which triggers the latch countdown. Use waitForCondition to handle this race condition.
132+
waitForCondition(
133+
() -> (double) NUM_RECORDS_PRODUCED == (double) taskMetric.metricValue(),
134+
METRICS_CONVERGENCE_DURATION_MS,
135+
"Task metric did not converge to expected value in time.");
127136

128137
connect.deleteConnector(CONNECTOR_NAME);
129138
connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME,
@@ -132,7 +141,7 @@ public void testMonitorableSinkConnectorAndTask() throws Exception {
132141
// verify connector and task metrics have been deleted
133142
metrics = connect.connectMetrics().metrics().metrics();
134143
assertFalse(metrics.containsKey(connectorMetric));
135-
assertFalse(metrics.containsKey(taskMetric));
144+
assertFalse(metrics.containsKey(taskMetricName));
136145
}
137146

138147
/**

0 commit comments

Comments
 (0)