Skip to content

Commit 1ebca78

Browse files
authored
KAFKA-19539: Kafka Streams should also purge internal topics based on user commit requests (#20234)
Repartition topic records should be purged up to the currently committed offset once `repartition.purge.interval.ms` duration has passed. Reviewers: Matthias J. Sax <[email protected]>
1 parent 71c5a42 commit 1ebca78

File tree

2 files changed

+39
-6
lines changed

2 files changed

+39
-6
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1837,12 +1837,6 @@ int maybeCommit() {
18371837
.collect(Collectors.toSet())
18381838
);
18391839

1840-
if ((now - lastPurgeMs) > purgeTimeMs) {
1841-
// try to purge the committed records for repartition topics if possible
1842-
taskManager.maybePurgeCommittedRecords();
1843-
lastPurgeMs = now;
1844-
}
1845-
18461840
if (committed == -1) {
18471841
log.debug("Unable to commit as we are in the middle of a rebalance, will try again when it completes.");
18481842
} else {
@@ -1853,6 +1847,12 @@ int maybeCommit() {
18531847
committed = taskManager.maybeCommitActiveTasksPerUserRequested();
18541848
}
18551849

1850+
if ((now - lastPurgeMs) > purgeTimeMs) {
1851+
// try to purge the committed records for repartition topics if possible
1852+
taskManager.maybePurgeCommittedRecords();
1853+
lastPurgeMs = now;
1854+
}
1855+
18561856
return committed;
18571857
}
18581858

streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,39 @@ public void shouldAlsoPurgeWhenNothingGetsCommitted(final boolean stateUpdaterEn
624624
verify(taskManager).maybePurgeCommittedRecords();
625625
}
626626

627+
@ParameterizedTest
628+
@MethodSource("data")
629+
public void shouldAlsoPurgeBeforeTheCommitInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
630+
final long purgeInterval = 1000L;
631+
final long commitInterval = Long.MAX_VALUE;
632+
final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
633+
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
634+
props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
635+
props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, Long.toString(purgeInterval));
636+
637+
final StreamsConfig config = new StreamsConfig(props);
638+
@SuppressWarnings("unchecked")
639+
final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
640+
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
641+
when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
642+
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
643+
final TaskManager taskManager = mock(TaskManager.class);
644+
645+
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
646+
topologyMetadata.buildAndRewriteTopology();
647+
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata);
648+
649+
thread.setNow(mockTime.milliseconds());
650+
thread.maybeCommit();
651+
652+
mockTime.sleep(purgeInterval + 1);
653+
654+
thread.setNow(mockTime.milliseconds());
655+
thread.maybeCommit();
656+
657+
verify(taskManager, times(2)).maybePurgeCommittedRecords();
658+
}
659+
627660
@ParameterizedTest
628661
@MethodSource("data")
629662
public void shouldNotProcessWhenPartitionRevoked(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {

0 commit comments

Comments
 (0)