Skip to content

Commit 28e7803

Browse files
KAFKA-19744: Move restore time calculation to ChangelogMetadata (#20613)
- Move restore time calculation to ChangelogMetadata. - Introduced a new interface to propagate the calculated value to the stores to avoid modifications in the public interface. Reviewers: Matthias J. Sax <[email protected]>
1 parent 8468317 commit 28e7803

File tree

12 files changed

+196
-32
lines changed

12 files changed

+196
-32
lines changed

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@
112112
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
113113
import static org.apache.kafka.test.TestUtils.waitForCondition;
114114
import static org.hamcrest.MatcherAssert.assertThat;
115+
import static org.hamcrest.Matchers.allOf;
116+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
117+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
115118
import static org.hamcrest.core.IsEqual.equalTo;
116119
import static org.junit.jupiter.api.Assertions.assertTrue;
117120
import static org.junit.jupiter.api.Assertions.fail;
@@ -685,6 +688,52 @@ public void shouldInvokeUserDefinedGlobalStateRestoreListener(final boolean useN
685688
}
686689
}
687690

691+
@ParameterizedTest
692+
@ValueSource(booleans = {true, false})
693+
public void shouldRecordRestoreMetrics(final boolean useNewProtocol) throws Exception {
694+
final AtomicInteger numReceived = new AtomicInteger(0);
695+
final StreamsBuilder builder = new StreamsBuilder();
696+
697+
final Properties props = props();
698+
699+
if (useNewProtocol) {
700+
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());
701+
}
702+
703+
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
704+
705+
createStateForRestoration(inputStream, 10000);
706+
707+
final CountDownLatch shutdownLatch = new CountDownLatch(1);
708+
builder.table(inputStream, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("store"))
709+
.toStream()
710+
.foreach((key, value) -> {
711+
if (numReceived.incrementAndGet() == numberOfKeys) {
712+
shutdownLatch.countDown();
713+
}
714+
});
715+
716+
kafkaStreams = new KafkaStreams(builder.build(), props);
717+
718+
final AtomicLong restored = new AtomicLong(0);
719+
final TrackingStateRestoreListener restoreListener = new TrackingStateRestoreListener(restored);
720+
kafkaStreams.setGlobalStateRestoreListener(restoreListener);
721+
kafkaStreams.start();
722+
723+
assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS));
724+
assertThat(numReceived.get(), equalTo(numberOfKeys));
725+
726+
final Map<String, Long> taskIdToMetricValue = kafkaStreams.metrics().entrySet().stream()
727+
.filter(e -> e.getKey().name().equals("restore-latency-max"))
728+
.collect(Collectors.toMap(e -> e.getKey().tags().get("task-id"), e -> ((Double) e.getValue().metricValue()).longValue()));
729+
730+
for (final Map.Entry<TopicPartition, Long> entry : restoreListener.changelogToRestoreTime().entrySet()) {
731+
final long lowerBound = entry.getValue() - TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
732+
final long upperBound = entry.getValue() + TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
733+
assertThat(taskIdToMetricValue.get("0_" + entry.getKey().partition()), allOf(greaterThanOrEqualTo(lowerBound), lessThanOrEqualTo(upperBound)));
734+
}
735+
}
736+
688737
private void validateReceivedMessages(final List<KeyValue<Integer, Integer>> expectedRecords,
689738
final String outputTopic) throws Exception {
690739
final Properties consumerProperties = new Properties();
@@ -971,4 +1020,4 @@ public void process(final Record<Integer, String> record) {
9711020
}
9721021
}
9731022
}
974-
}
1023+
}

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1337,6 +1337,8 @@ public static class TrackingStateRestoreListener implements StateRestoreListener
13371337
public final Map<TopicPartition, AtomicLong> changelogToStartOffset = new ConcurrentHashMap<>();
13381338
public final Map<TopicPartition, AtomicLong> changelogToEndOffset = new ConcurrentHashMap<>();
13391339
public final Map<TopicPartition, AtomicLong> changelogToTotalNumRestored = new ConcurrentHashMap<>();
1340+
private final Map<TopicPartition, AtomicLong> changelogToRestoreStartTime = new ConcurrentHashMap<>();
1341+
private final Map<TopicPartition, AtomicLong> changelogToRestoreEndTime = new ConcurrentHashMap<>();
13401342
private final AtomicLong restored;
13411343

13421344
public TrackingStateRestoreListener() {
@@ -1355,6 +1357,7 @@ public void onRestoreStart(final TopicPartition topicPartition,
13551357
changelogToStartOffset.put(topicPartition, new AtomicLong(startingOffset));
13561358
changelogToEndOffset.put(topicPartition, new AtomicLong(endingOffset));
13571359
changelogToTotalNumRestored.put(topicPartition, new AtomicLong(0L));
1360+
changelogToRestoreStartTime.put(topicPartition, new AtomicLong(System.nanoTime()));
13581361
}
13591362

13601363
@Override
@@ -1372,6 +1375,7 @@ public void onRestoreEnd(final TopicPartition topicPartition,
13721375
if (restored != null) {
13731376
restored.addAndGet(totalRestored);
13741377
}
1378+
changelogToRestoreEndTime.put(topicPartition, new AtomicLong(System.nanoTime()));
13751379
}
13761380

13771381
public long totalNumRestored() {
@@ -1381,6 +1385,11 @@ public long totalNumRestored() {
13811385
}
13821386
return totalNumRestored;
13831387
}
1388+
1389+
public Map<TopicPartition, Long> changelogToRestoreTime() {
1390+
return changelogToRestoreStartTime.entrySet().stream()
1391+
.collect(Collectors.toMap(Map.Entry::getKey, e -> changelogToRestoreEndTime.get(e.getKey()).get() - e.getValue().get()));
1392+
}
13841393
}
13851394

13861395
public static class TrackingStandbyUpdateListener implements StandbyUpdateListener {

streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.kafka.streams.processor.TaskId;
4242
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
4343
import org.apache.kafka.streams.processor.internals.Task.TaskType;
44+
import org.apache.kafka.streams.state.internals.MeteredStateStore;
4445

4546
import org.slf4j.Logger;
4647

@@ -138,6 +139,8 @@ static class ChangelogMetadata {
138139
// either due to limit offset (standby) or committed end offset (active)
139140
private int bufferedLimitIndex;
140141

142+
private long restoreStartTimeNs;
143+
141144
private ChangelogMetadata(final StateStoreMetadata storeMetadata, final ProcessorStateManager stateManager) {
142145
this.changelogState = ChangelogState.REGISTERED;
143146
this.storeMetadata = storeMetadata;
@@ -188,6 +191,10 @@ List<ConsumerRecord<byte[], byte[]>> bufferedRecords() {
188191
int bufferedLimitIndex() {
189192
return bufferedLimitIndex;
190193
}
194+
195+
long calculateRestoreTime(final long restoreEndTimeNs) {
196+
return restoreEndTimeNs - restoreStartTimeNs;
197+
}
191198
}
192199

193200
private static final long DEFAULT_OFFSET_UPDATE_MS = Duration.ofMinutes(5L).toMillis();
@@ -695,6 +702,9 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM
695702

696703
changelogMetadata.transitTo(ChangelogState.COMPLETED);
697704
pauseChangelogsFromRestoreConsumer(Collections.singleton(partition));
705+
if (storeMetadata.store() instanceof MeteredStateStore) {
706+
((MeteredStateStore) storeMetadata.store()).recordRestoreTime(changelogMetadata.calculateRestoreTime(time.nanoseconds()));
707+
}
698708

699709
try {
700710
stateRestoreListener.onRestoreEnd(partition, storeName, changelogMetadata.totalRestored);
@@ -1026,6 +1036,7 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks,
10261036
// no records to restore; in this case we just initialize the sensor to zero
10271037
final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
10281038
task.recordRestoration(time, recordsToRestore, true);
1039+
changelogMetadata.restoreStartTimeNs = time.nanoseconds();
10291040
} else if (changelogMetadata.stateManager.taskType() == TaskType.STANDBY) {
10301041
try {
10311042
standbyUpdateListener.onUpdateStart(partition, storeName, startOffset);

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
*/
7070
public class MeteredKeyValueStore<K, V>
7171
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V>
72-
implements KeyValueStore<K, V> {
72+
implements KeyValueStore<K, V>, MeteredStateStore {
7373

7474
final Serde<K> keySerde;
7575
final Serde<V> valueSerde;
@@ -91,6 +91,7 @@ public class MeteredKeyValueStore<K, V>
9191
protected InternalProcessorContext<?, ?> internalContext;
9292
private StreamsMetricsImpl streamsMetrics;
9393
private TaskId taskId;
94+
private Sensor restoreSensor;
9495

9596
protected OpenIterators openIterators;
9697

@@ -128,11 +129,10 @@ public void init(final StateStoreContext stateStoreContext,
128129
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
129130

130131
registerMetrics();
131-
final Sensor restoreSensor =
132-
StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
133132

134-
// register and possibly restore the state from the logs
135-
maybeMeasureLatency(() -> super.init(stateStoreContext, root), time, restoreSensor);
133+
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
134+
135+
super.init(stateStoreContext, root);
136136
}
137137

138138
private void registerMetrics() {
@@ -152,6 +152,11 @@ private void registerMetrics() {
152152
openIterators = new OpenIterators(taskId, metricsScope, name(), streamsMetrics);
153153
}
154154

155+
@Override
156+
public void recordRestoreTime(final long restoreTimeNs) {
157+
restoreSensor.record(restoreTimeNs);
158+
}
159+
155160
protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final SerdeGetter getter) {
156161
return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
157162
}

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757

5858
public class MeteredSessionStore<K, V>
5959
extends WrappedStateStore<SessionStore<Bytes, byte[]>, Windowed<K>, V>
60-
implements SessionStore<K, V> {
60+
implements SessionStore<K, V>, MeteredStateStore {
6161

6262
private final String metricsScope;
6363
private final Serde<K> keySerde;
@@ -73,6 +73,7 @@ public class MeteredSessionStore<K, V>
7373
private Sensor iteratorDurationSensor;
7474
private InternalProcessorContext<?, ?> internalContext;
7575
private TaskId taskId;
76+
private Sensor restoreSensor;
7677

7778
private final LongAdder numOpenIterators = new LongAdder();
7879
private final NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
@@ -108,11 +109,9 @@ public void init(final StateStoreContext stateStoreContext,
108109
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
109110

110111
registerMetrics();
111-
final Sensor restoreSensor =
112-
StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
112+
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
113113

114-
// register and possibly restore the state from the logs
115-
maybeMeasureLatency(() -> super.init(stateStoreContext, root), time, restoreSensor);
114+
super.init(stateStoreContext, root);
116115
}
117116

118117
private void registerMetrics() {
@@ -132,6 +131,11 @@ private void registerMetrics() {
132131
);
133132
}
134133

134+
@Override
135+
public void recordRestoreTime(final long restoreTimeNs) {
136+
restoreSensor.record(restoreTimeNs);
137+
}
138+
135139
private void initStoreSerde(final StateStoreContext context) {
136140
final String storeName = name();
137141
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.state.internals;
18+
19+
public interface MeteredStateStore {
20+
21+
void recordRestoreTime(final long restoreTimeNs);
22+
}

streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060

6161
public class MeteredWindowStore<K, V>
6262
extends WrappedStateStore<WindowStore<Bytes, byte[]>, Windowed<K>, V>
63-
implements WindowStore<K, V> {
63+
implements WindowStore<K, V>, MeteredStateStore {
6464

6565
private final long windowSizeMs;
6666
private final String metricsScope;
@@ -76,6 +76,7 @@ public class MeteredWindowStore<K, V>
7676
private Sensor iteratorDurationSensor;
7777
private InternalProcessorContext<?, ?> internalContext;
7878
private TaskId taskId;
79+
private Sensor restoreSensor;
7980

8081
private final LongAdder numOpenIterators = new LongAdder();
8182
private final NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
@@ -124,8 +125,8 @@ public void init(final StateStoreContext stateStoreContext,
124125
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
125126

126127
registerMetrics();
127-
final Sensor restoreSensor =
128-
StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
128+
129+
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
129130

130131
// register and possibly restore the state from the logs
131132
maybeMeasureLatency(() -> super.init(stateStoreContext, root), time, restoreSensor);
@@ -150,6 +151,11 @@ private void registerMetrics() {
150151
);
151152
}
152153

154+
@Override
155+
public void recordRestoreTime(final long restoreTimeNs) {
156+
restoreSensor.record(restoreTimeNs);
157+
}
158+
153159
private void initStoreSerde(final StateStoreContext context) {
154160
final String storeName = name();
155161
final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);

streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.kafka.streams.processor.StateStore;
4242
import org.apache.kafka.streams.processor.TaskId;
4343
import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
44+
import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
4445
import org.apache.kafka.test.MockStandbyUpdateListener;
4546
import org.apache.kafka.test.MockStateRestoreListener;
4647
import org.apache.kafka.test.StreamsTestUtils;
@@ -89,7 +90,9 @@
8990
import static org.mockito.ArgumentMatchers.anyBoolean;
9091
import static org.mockito.ArgumentMatchers.anyLong;
9192
import static org.mockito.Mockito.mock;
93+
import static org.mockito.Mockito.never;
9294
import static org.mockito.Mockito.times;
95+
import static org.mockito.Mockito.verify;
9396
import static org.mockito.Mockito.when;
9497

9598
@ExtendWith(MockitoExtension.class)
@@ -1364,6 +1367,58 @@ public void shouldNotThrowOnUnknownRevokedPartition() {
13641367
}
13651368
}
13661369

1370+
@Test
1371+
public void shouldCallRecordRestoreTimeAtTheEndOfRestore() {
1372+
setupActiveStateManager();
1373+
1374+
final MeteredKeyValueStore<?, ?> meteredStateStore = mock(MeteredKeyValueStore.class);
1375+
1376+
when(storeMetadata.changelogPartition()).thenReturn(tp);
1377+
when(storeMetadata.store()).thenReturn(meteredStateStore);
1378+
when(meteredStateStore.name()).thenReturn(storeName);
1379+
final TaskId taskId = new TaskId(0, 0);
1380+
1381+
when(storeMetadata.offset()).thenReturn(0L);
1382+
when(activeStateManager.taskId()).thenReturn(taskId);
1383+
1384+
setupConsumer(2, tp);
1385+
consumer.updateEndOffsets(Collections.singletonMap(tp, 2L));
1386+
adminClient.updateEndOffsets(Collections.singletonMap(tp, 2L));
1387+
1388+
changelogReader.register(tp, activeStateManager);
1389+
1390+
changelogReader.restore(Collections.singletonMap(taskId, mock(Task.class)));
1391+
1392+
assertEquals(1L, changelogReader.changelogMetadata(tp).totalRestored());
1393+
verify(meteredStateStore).recordRestoreTime(anyLong());
1394+
}
1395+
1396+
@Test
1397+
public void shouldNotCallRecordRestoreTimeIfRestoreDoesNotComplete() {
1398+
setupActiveStateManager();
1399+
1400+
final MeteredKeyValueStore<?, ?> meteredStateStore = mock(MeteredKeyValueStore.class);
1401+
1402+
when(storeMetadata.changelogPartition()).thenReturn(tp);
1403+
when(storeMetadata.store()).thenReturn(meteredStateStore);
1404+
when(meteredStateStore.name()).thenReturn(storeName);
1405+
final TaskId taskId = new TaskId(0, 0);
1406+
1407+
when(storeMetadata.offset()).thenReturn(0L);
1408+
when(activeStateManager.taskId()).thenReturn(taskId);
1409+
1410+
setupConsumer(2, tp);
1411+
consumer.updateEndOffsets(Collections.singletonMap(tp, 3L));
1412+
adminClient.updateEndOffsets(Collections.singletonMap(tp, 3L));
1413+
1414+
changelogReader.register(tp, activeStateManager);
1415+
1416+
changelogReader.restore(Collections.singletonMap(taskId, mock(Task.class)));
1417+
1418+
assertEquals(1L, changelogReader.changelogMetadata(tp).totalRestored());
1419+
verify(meteredStateStore, never()).recordRestoreTime(anyLong());
1420+
}
1421+
13671422
private void setupConsumer(final long messages, final TopicPartition topicPartition) {
13681423
assignPartition(messages, topicPartition);
13691424
addRecords(messages, topicPartition);

0 commit comments

Comments
 (0)