Skip to content

Commit a1b5591

Browse files
authored
KAFKA-19529: State updater sensor names should be unique (apache#20262) (apache#20272)
All state updater threads use the same metrics instance, but do not use unique names for their sensors. This can have the following symptoms: 1) Data inserted into one sensor by one thread can affect the metrics of all state updater threads. 2) If one state updater thread is shutdown, the metrics associated to all state updater threads are removed. 3) If one state updater thread is started, while another one is removed, it can happen that a metric is registered with the `Metrics` instance, but not associated to any `Sensor` (because it is concurrently removed), which means that the metric will not be removed upon shutdown. If a thread with the same name later tries to register the same metric, we may run into a `java.lang.IllegalArgumentException: A metric named ... already exists`, as described in the ticket. This change fixes the bug giving unique names to the sensors. A test is added that there is no interference of the removal of sensors and metrics during shutdown. Reviewers: Matthias J. Sax <[email protected]>
1 parent d1a9255 commit a1b5591

File tree

3 files changed

+83
-29
lines changed

3 files changed

+83
-29
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.kafka.streams.processor.TaskId;
3737
import org.apache.kafka.streams.processor.internals.Task.State;
3838
import org.apache.kafka.streams.processor.internals.TaskAndAction.Action;
39+
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
3940

4041
import org.slf4j.Logger;
4142

@@ -90,7 +91,7 @@ private class StateUpdaterThread extends Thread {
9091
private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new KafkaFutureImpl<>();
9192

9293
public StateUpdaterThread(final String name,
93-
final Metrics metrics,
94+
final StreamsMetricsImpl metrics,
9495
final ChangelogReader changelogReader) {
9596
super(name);
9697
this.changelogReader = changelogReader;
@@ -747,7 +748,7 @@ private void recordMetrics(final long now, final long totalLatency, final long t
747748
private final Time time;
748749
private final Logger log;
749750
private final String name;
750-
private final Metrics metrics;
751+
private final StreamsMetricsImpl metrics;
751752
private final Consumer<byte[], byte[]> restoreConsumer;
752753
private final ChangelogReader changelogReader;
753754
private final TopologyMetadata topologyMetadata;
@@ -769,7 +770,7 @@ private void recordMetrics(final long now, final long totalLatency, final long t
769770
private CountDownLatch shutdownGate;
770771

771772
public DefaultStateUpdater(final String name,
772-
final Metrics metrics,
773+
final StreamsMetricsImpl metrics,
773774
final StreamsConfig config,
774775
final Consumer<byte[], byte[]> restoreConsumer,
775776
final ChangelogReader changelogReader,
@@ -1062,74 +1063,75 @@ private class StateUpdaterMetrics {
10621063
private final Sensor standbyRestoreRatioSensor;
10631064
private final Sensor checkpointRatioSensor;
10641065

1065-
private final Deque<String> allSensorNames = new LinkedList<>();
1066+
private final Deque<Sensor> allSensors = new LinkedList<>();
10661067
private final Deque<MetricName> allMetricNames = new LinkedList<>();
10671068

1068-
private StateUpdaterMetrics(final Metrics metrics, final String threadId) {
1069+
private StateUpdaterMetrics(final StreamsMetricsImpl metrics, final String threadId) {
10691070
final Map<String, String> threadLevelTags = new LinkedHashMap<>();
10701071
threadLevelTags.put(THREAD_ID_TAG, threadId);
1072+
final Metrics metricsRegistry = metrics.metricsRegistry();
10711073

1072-
MetricName metricName = metrics.metricName("active-restoring-tasks",
1074+
MetricName metricName = metricsRegistry.metricName("active-restoring-tasks",
10731075
STATE_LEVEL_GROUP,
10741076
"The number of active tasks currently undergoing restoration",
10751077
threadLevelTags);
1076-
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
1078+
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
10771079
stateUpdaterThread.getNumRestoringActiveTasks() : 0);
10781080
allMetricNames.push(metricName);
10791081

1080-
metricName = metrics.metricName("standby-updating-tasks",
1082+
metricName = metricsRegistry.metricName("standby-updating-tasks",
10811083
STATE_LEVEL_GROUP,
10821084
"The number of standby tasks currently undergoing state update",
10831085
threadLevelTags);
1084-
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
1086+
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
10851087
stateUpdaterThread.getNumUpdatingStandbyTasks() : 0);
10861088
allMetricNames.push(metricName);
10871089

1088-
metricName = metrics.metricName("active-paused-tasks",
1090+
metricName = metricsRegistry.metricName("active-paused-tasks",
10891091
STATE_LEVEL_GROUP,
10901092
"The number of active tasks paused restoring",
10911093
threadLevelTags);
1092-
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
1094+
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
10931095
stateUpdaterThread.getNumPausedActiveTasks() : 0);
10941096
allMetricNames.push(metricName);
10951097

1096-
metricName = metrics.metricName("standby-paused-tasks",
1098+
metricName = metricsRegistry.metricName("standby-paused-tasks",
10971099
STATE_LEVEL_GROUP,
10981100
"The number of standby tasks paused state update",
10991101
threadLevelTags);
1100-
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
1102+
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
11011103
stateUpdaterThread.getNumPausedStandbyTasks() : 0);
11021104
allMetricNames.push(metricName);
11031105

1104-
this.idleRatioSensor = metrics.sensor("idle-ratio", RecordingLevel.INFO);
1106+
this.idleRatioSensor = metrics.threadLevelSensor(threadId, "idle-ratio", RecordingLevel.INFO);
11051107
this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
1106-
allSensorNames.add("idle-ratio");
1108+
allSensors.add(this.idleRatioSensor);
11071109

1108-
this.activeRestoreRatioSensor = metrics.sensor("active-restore-ratio", RecordingLevel.INFO);
1110+
this.activeRestoreRatioSensor = metrics.threadLevelSensor(threadId, "active-restore-ratio", RecordingLevel.INFO);
11091111
this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
1110-
allSensorNames.add("active-restore-ratio");
1112+
allSensors.add(this.activeRestoreRatioSensor);
11111113

1112-
this.standbyRestoreRatioSensor = metrics.sensor("standby-update-ratio", RecordingLevel.INFO);
1114+
this.standbyRestoreRatioSensor = metrics.threadLevelSensor(threadId, "standby-update-ratio", RecordingLevel.INFO);
11131115
this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
1114-
allSensorNames.add("standby-update-ratio");
1116+
allSensors.add(this.standbyRestoreRatioSensor);
11151117

1116-
this.checkpointRatioSensor = metrics.sensor("checkpoint-ratio", RecordingLevel.INFO);
1118+
this.checkpointRatioSensor = metrics.threadLevelSensor(threadId, "checkpoint-ratio", RecordingLevel.INFO);
11171119
this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg());
1118-
allSensorNames.add("checkpoint-ratio");
1120+
allSensors.add(this.checkpointRatioSensor);
11191121

1120-
this.restoreSensor = metrics.sensor("restore-records", RecordingLevel.INFO);
1122+
this.restoreSensor = metrics.threadLevelSensor(threadId, "restore-records", RecordingLevel.INFO);
11211123
this.restoreSensor.add(new MetricName("restore-records-rate", STATE_LEVEL_GROUP, RESTORE_RECORDS_RATE_DESCRIPTION, threadLevelTags), new Rate());
11221124
this.restoreSensor.add(new MetricName("restore-call-rate", STATE_LEVEL_GROUP, RESTORE_RATE_DESCRIPTION, threadLevelTags), new Rate(new WindowedCount()));
1123-
allSensorNames.add("restore-records");
1125+
allSensors.add(this.restoreSensor);
11241126
}
11251127

11261128
void clear() {
1127-
while (!allSensorNames.isEmpty()) {
1128-
metrics.removeSensor(allSensorNames.pop());
1129+
while (!allSensors.isEmpty()) {
1130+
metrics.removeSensor(allSensors.pop());
11291131
}
11301132

11311133
while (!allMetricNames.isEmpty()) {
1132-
metrics.removeMetric(allMetricNames.pop());
1134+
metrics.metricsRegistry().removeMetric(allMetricNames.pop());
11331135
}
11341136
}
11351137
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateU
537537
final String name = clientId + "-StateUpdater-" + threadIdx;
538538
final StateUpdater stateUpdater = new DefaultStateUpdater(
539539
name,
540-
streamsMetrics.metricsRegistry(),
540+
streamsMetrics,
541541
streamsConfig,
542542
restoreConsumer,
543543
changelogReader,

streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.streams.processor.TaskId;
2929
import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTask;
3030
import org.apache.kafka.streams.processor.internals.Task.State;
31+
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
3132

3233
import org.hamcrest.Matcher;
3334
import org.junit.jupiter.api.AfterEach;
@@ -106,7 +107,7 @@ class DefaultStateUpdaterTest {
106107

107108
// need an auto-tick timer to work for draining with timeout
108109
private final Time time = new MockTime(1L);
109-
private final Metrics metrics = new Metrics(time);
110+
private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", time);
110111
private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
111112
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
112113
private final TopologyMetadata topologyMetadata = unnamedTopology().build();
@@ -1672,8 +1673,59 @@ public void shouldRecordMetrics() throws Exception {
16721673
assertThat(metrics.metrics().size(), is(1));
16731674
}
16741675

1676+
@Test
1677+
public void shouldRemoveMetricsWithoutInterference() {
1678+
final DefaultStateUpdater stateUpdater2 =
1679+
new DefaultStateUpdater("test-state-updater2", metrics, config, null, changelogReader, topologyMetadata, time);
1680+
final List<MetricName> threadMetrics = getMetricNames("test-state-updater");
1681+
final List<MetricName> threadMetrics2 = getMetricNames("test-state-updater2");
1682+
1683+
stateUpdater.start();
1684+
stateUpdater2.start();
1685+
1686+
for (final MetricName metricName : threadMetrics) {
1687+
assertTrue(metrics.metrics().containsKey(metricName));
1688+
}
1689+
for (final MetricName metricName : threadMetrics2) {
1690+
assertTrue(metrics.metrics().containsKey(metricName));
1691+
}
1692+
1693+
stateUpdater2.shutdown(Duration.ofMinutes(1));
1694+
1695+
for (final MetricName metricName : threadMetrics) {
1696+
assertTrue(metrics.metrics().containsKey(metricName));
1697+
}
1698+
for (final MetricName metricName : threadMetrics2) {
1699+
assertFalse(metrics.metrics().containsKey(metricName));
1700+
}
1701+
1702+
stateUpdater.shutdown(Duration.ofMinutes(1));
1703+
1704+
for (final MetricName metricName : threadMetrics) {
1705+
assertFalse(metrics.metrics().containsKey(metricName));
1706+
}
1707+
for (final MetricName metricName : threadMetrics2) {
1708+
assertFalse(metrics.metrics().containsKey(metricName));
1709+
}
1710+
}
1711+
1712+
private static List<MetricName> getMetricNames(final String threadId) {
1713+
final Map<String, String> tagMap = mkMap(mkEntry("thread-id", threadId));
1714+
return Arrays.asList(
1715+
new MetricName("active-restoring-tasks", "stream-state-updater-metrics", "", tagMap),
1716+
new MetricName("standby-updating-tasks", "stream-state-updater-metrics", "", tagMap),
1717+
new MetricName("active-paused-tasks", "stream-state-updater-metrics", "", tagMap),
1718+
new MetricName("standby-paused-tasks", "stream-state-updater-metrics", "", tagMap),
1719+
new MetricName("idle-ratio", "stream-state-updater-metrics", "", tagMap),
1720+
new MetricName("standby-update-ratio", "stream-state-updater-metrics", "", tagMap),
1721+
new MetricName("checkpoint-ratio", "stream-state-updater-metrics", "", tagMap),
1722+
new MetricName("restore-records-rate", "stream-state-updater-metrics", "", tagMap),
1723+
new MetricName("restore-call-rate", "stream-state-updater-metrics", "", tagMap)
1724+
);
1725+
}
1726+
16751727
@SuppressWarnings("unchecked")
1676-
private static <T> void verifyMetric(final Metrics metrics,
1728+
private static <T> void verifyMetric(final StreamsMetricsImpl metrics,
16771729
final MetricName metricName,
16781730
final Matcher<T> matcher) {
16791731
assertThat(metrics.metrics().get(metricName).metricName().description(), is(metricName.description()));

0 commit comments

Comments
 (0)