Skip to content

Commit deb5891

Browse files
authored
KAFKA-19529: State updater sensor names should be unique (#20262) (#20273)
All state updater threads use the same metrics instance, but do not use unique names for their sensors. This can have the following symptoms: 1) Data inserted into one sensor by one thread can affect the metrics of all state updater threads. 2) If one state updater thread is shutdown, the metrics associated to all state updater threads are removed. 3) If one state updater thread is started, while another one is removed, it can happen that a metric is registered with the `Metrics` instance, but not associated to any `Sensor` (because it is concurrently removed), which means that the metric will not be removed upon shutdown. If a thread with the same name later tries to register the same metric, we may run into a `java.lang.IllegalArgumentException: A metric named ... already exists`, as described in the ticket. This change fixes the bug giving unique names to the sensors. A test is added that there is no interference of the removal of sensors and metrics during shutdown. Reviewers: Matthias J. Sax <[email protected]>
1 parent 2a45b4f commit deb5891

File tree

3 files changed

+83
-29
lines changed

3 files changed

+83
-29
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.kafka.streams.processor.TaskId;
3737
import org.apache.kafka.streams.processor.internals.Task.State;
3838
import org.apache.kafka.streams.processor.internals.TaskAndAction.Action;
39+
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
3940

4041
import org.slf4j.Logger;
4142

@@ -89,7 +90,7 @@ private class StateUpdaterThread extends Thread {
8990
private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new KafkaFutureImpl<>();
9091

9192
public StateUpdaterThread(final String name,
92-
final Metrics metrics,
93+
final StreamsMetricsImpl metrics,
9394
final ChangelogReader changelogReader) {
9495
super(name);
9596
this.changelogReader = changelogReader;
@@ -745,7 +746,7 @@ private void recordMetrics(final long now, final long totalLatency, final long t
745746
private final Time time;
746747
private final Logger log;
747748
private final String name;
748-
private final Metrics metrics;
749+
private final StreamsMetricsImpl metrics;
749750
private final Consumer<byte[], byte[]> restoreConsumer;
750751
private final ChangelogReader changelogReader;
751752
private final TopologyMetadata topologyMetadata;
@@ -766,7 +767,7 @@ private void recordMetrics(final long now, final long totalLatency, final long t
766767
private StateUpdaterThread stateUpdaterThread = null;
767768

768769
public DefaultStateUpdater(final String name,
769-
final Metrics metrics,
770+
final StreamsMetricsImpl metrics,
770771
final StreamsConfig config,
771772
final Consumer<byte[], byte[]> restoreConsumer,
772773
final ChangelogReader changelogReader,
@@ -1059,74 +1060,75 @@ private class StateUpdaterMetrics {
10591060
private final Sensor standbyRestoreRatioSensor;
10601061
private final Sensor checkpointRatioSensor;
10611062

1062-
private final Deque<String> allSensorNames = new LinkedList<>();
1063+
private final Deque<Sensor> allSensors = new LinkedList<>();
10631064
private final Deque<MetricName> allMetricNames = new LinkedList<>();
10641065

1065-
private StateUpdaterMetrics(final Metrics metrics, final String threadId) {
1066+
private StateUpdaterMetrics(final StreamsMetricsImpl metrics, final String threadId) {
10661067
final Map<String, String> threadLevelTags = new LinkedHashMap<>();
10671068
threadLevelTags.put(THREAD_ID_TAG, threadId);
1069+
final Metrics metricsRegistry = metrics.metricsRegistry();
10681070

1069-
MetricName metricName = metrics.metricName("active-restoring-tasks",
1071+
MetricName metricName = metricsRegistry.metricName("active-restoring-tasks",
10701072
STATE_LEVEL_GROUP,
10711073
"The number of active tasks currently undergoing restoration",
10721074
threadLevelTags);
1073-
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
1075+
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
10741076
stateUpdaterThread.numRestoringActiveTasks() : 0);
10751077
allMetricNames.push(metricName);
10761078

1077-
metricName = metrics.metricName("standby-updating-tasks",
1079+
metricName = metricsRegistry.metricName("standby-updating-tasks",
10781080
STATE_LEVEL_GROUP,
10791081
"The number of standby tasks currently undergoing state update",
10801082
threadLevelTags);
1081-
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
1083+
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
10821084
stateUpdaterThread.numUpdatingStandbyTasks() : 0);
10831085
allMetricNames.push(metricName);
10841086

1085-
metricName = metrics.metricName("active-paused-tasks",
1087+
metricName = metricsRegistry.metricName("active-paused-tasks",
10861088
STATE_LEVEL_GROUP,
10871089
"The number of active tasks paused restoring",
10881090
threadLevelTags);
1089-
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
1091+
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
10901092
stateUpdaterThread.numPausedActiveTasks() : 0);
10911093
allMetricNames.push(metricName);
10921094

1093-
metricName = metrics.metricName("standby-paused-tasks",
1095+
metricName = metricsRegistry.metricName("standby-paused-tasks",
10941096
STATE_LEVEL_GROUP,
10951097
"The number of standby tasks paused state update",
10961098
threadLevelTags);
1097-
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
1099+
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
10981100
stateUpdaterThread.numPausedStandbyTasks() : 0);
10991101
allMetricNames.push(metricName);
11001102

1101-
this.idleRatioSensor = metrics.sensor("idle-ratio", RecordingLevel.INFO);
1103+
this.idleRatioSensor = metrics.threadLevelSensor(threadId, "idle-ratio", RecordingLevel.INFO);
11021104
this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
1103-
allSensorNames.add("idle-ratio");
1105+
allSensors.add(this.idleRatioSensor);
11041106

1105-
this.activeRestoreRatioSensor = metrics.sensor("active-restore-ratio", RecordingLevel.INFO);
1107+
this.activeRestoreRatioSensor = metrics.threadLevelSensor(threadId, "active-restore-ratio", RecordingLevel.INFO);
11061108
this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
1107-
allSensorNames.add("active-restore-ratio");
1109+
allSensors.add(this.activeRestoreRatioSensor);
11081110

1109-
this.standbyRestoreRatioSensor = metrics.sensor("standby-update-ratio", RecordingLevel.INFO);
1111+
this.standbyRestoreRatioSensor = metrics.threadLevelSensor(threadId, "standby-update-ratio", RecordingLevel.INFO);
11101112
this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
1111-
allSensorNames.add("standby-update-ratio");
1113+
allSensors.add(this.standbyRestoreRatioSensor);
11121114

1113-
this.checkpointRatioSensor = metrics.sensor("checkpoint-ratio", RecordingLevel.INFO);
1115+
this.checkpointRatioSensor = metrics.threadLevelSensor(threadId, "checkpoint-ratio", RecordingLevel.INFO);
11141116
this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg());
1115-
allSensorNames.add("checkpoint-ratio");
1117+
allSensors.add(this.checkpointRatioSensor);
11161118

1117-
this.restoreSensor = metrics.sensor("restore-records", RecordingLevel.INFO);
1119+
this.restoreSensor = metrics.threadLevelSensor(threadId, "restore-records", RecordingLevel.INFO);
11181120
this.restoreSensor.add(new MetricName("restore-records-rate", STATE_LEVEL_GROUP, RESTORE_RECORDS_RATE_DESCRIPTION, threadLevelTags), new Rate());
11191121
this.restoreSensor.add(new MetricName("restore-call-rate", STATE_LEVEL_GROUP, RESTORE_RATE_DESCRIPTION, threadLevelTags), new Rate(new WindowedCount()));
1120-
allSensorNames.add("restore-records");
1122+
allSensors.add(this.restoreSensor);
11211123
}
11221124

11231125
void clear() {
1124-
while (!allSensorNames.isEmpty()) {
1125-
metrics.removeSensor(allSensorNames.pop());
1126+
while (!allSensors.isEmpty()) {
1127+
metrics.removeSensor(allSensors.pop());
11261128
}
11271129

11281130
while (!allMetricNames.isEmpty()) {
1129-
metrics.removeMetric(allMetricNames.pop());
1131+
metrics.metricsRegistry().removeMetric(allMetricNames.pop());
11301132
}
11311133
}
11321134
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateU
550550
final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx;
551551
final StateUpdater stateUpdater = new DefaultStateUpdater(
552552
name,
553-
streamsMetrics.metricsRegistry(),
553+
streamsMetrics,
554554
streamsConfig,
555555
restoreConsumer,
556556
changelogReader,

streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.streams.processor.TaskId;
2929
import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTask;
3030
import org.apache.kafka.streams.processor.internals.Task.State;
31+
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
3132

3233
import org.hamcrest.Matcher;
3334
import org.junit.jupiter.api.AfterEach;
@@ -105,7 +106,7 @@ class DefaultStateUpdaterTest {
105106

106107
// need an auto-tick timer to work for draining with timeout
107108
private final Time time = new MockTime(1L);
108-
private final Metrics metrics = new Metrics(time);
109+
private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", time);
109110
private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
110111
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
111112
private final TopologyMetadata topologyMetadata = unnamedTopology().build();
@@ -1672,8 +1673,59 @@ public void shouldRecordMetrics() throws Exception {
16721673
assertThat(metrics.metrics().size(), is(1));
16731674
}
16741675

1676+
@Test
1677+
public void shouldRemoveMetricsWithoutInterference() {
1678+
final DefaultStateUpdater stateUpdater2 =
1679+
new DefaultStateUpdater("test-state-updater2", metrics, config, null, changelogReader, topologyMetadata, time);
1680+
final List<MetricName> threadMetrics = getMetricNames("test-state-updater");
1681+
final List<MetricName> threadMetrics2 = getMetricNames("test-state-updater2");
1682+
1683+
stateUpdater.start();
1684+
stateUpdater2.start();
1685+
1686+
for (final MetricName metricName : threadMetrics) {
1687+
assertTrue(metrics.metrics().containsKey(metricName));
1688+
}
1689+
for (final MetricName metricName : threadMetrics2) {
1690+
assertTrue(metrics.metrics().containsKey(metricName));
1691+
}
1692+
1693+
stateUpdater2.shutdown(Duration.ofMinutes(1));
1694+
1695+
for (final MetricName metricName : threadMetrics) {
1696+
assertTrue(metrics.metrics().containsKey(metricName));
1697+
}
1698+
for (final MetricName metricName : threadMetrics2) {
1699+
assertFalse(metrics.metrics().containsKey(metricName));
1700+
}
1701+
1702+
stateUpdater.shutdown(Duration.ofMinutes(1));
1703+
1704+
for (final MetricName metricName : threadMetrics) {
1705+
assertFalse(metrics.metrics().containsKey(metricName));
1706+
}
1707+
for (final MetricName metricName : threadMetrics2) {
1708+
assertFalse(metrics.metrics().containsKey(metricName));
1709+
}
1710+
}
1711+
1712+
private static List<MetricName> getMetricNames(final String threadId) {
1713+
final Map<String, String> tagMap = Map.of("thread-id", threadId);
1714+
return List.of(
1715+
new MetricName("active-restoring-tasks", "stream-state-updater-metrics", "", tagMap),
1716+
new MetricName("standby-updating-tasks", "stream-state-updater-metrics", "", tagMap),
1717+
new MetricName("active-paused-tasks", "stream-state-updater-metrics", "", tagMap),
1718+
new MetricName("standby-paused-tasks", "stream-state-updater-metrics", "", tagMap),
1719+
new MetricName("idle-ratio", "stream-state-updater-metrics", "", tagMap),
1720+
new MetricName("standby-update-ratio", "stream-state-updater-metrics", "", tagMap),
1721+
new MetricName("checkpoint-ratio", "stream-state-updater-metrics", "", tagMap),
1722+
new MetricName("restore-records-rate", "stream-state-updater-metrics", "", tagMap),
1723+
new MetricName("restore-call-rate", "stream-state-updater-metrics", "", tagMap)
1724+
);
1725+
}
1726+
16751727
@SuppressWarnings("unchecked")
1676-
private static <T> void verifyMetric(final Metrics metrics,
1728+
private static <T> void verifyMetric(final StreamsMetricsImpl metrics,
16771729
final MetricName metricName,
16781730
final Matcher<T> matcher) {
16791731
assertThat(metrics.metrics().get(metricName).metricName().description(), is(metricName.description()));

0 commit comments

Comments
 (0)