Skip to content

Commit 71efb89

Browse files
authored
MINOR: fix incorrect offset reset logging (#20558)
We need to only pass in the reset strategy, as the `logMessage` parameter was removed. Reviewers: Chia-Ping Tsai <[email protected]>, Lucas Brutschy <[email protected]>
1 parent f16d1f3 commit 71efb89

File tree

1 file changed

+6
-5
lines changed

1 file changed

+6
-5
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1532,6 +1532,7 @@ private ConsumerRecords<byte[], byte[]> pollRequests(final Duration pollTime) {
15321532
try {
15331533
records = mainConsumer.poll(pollTime);
15341534
} catch (final InvalidOffsetException e) {
1535+
log.info("Found no valid offset for {} partitions, resetting.", e.partitions().size());
15351536
resetOffsets(e.partitions(), e);
15361537
}
15371538

@@ -1647,22 +1648,22 @@ private void resetOffsets(final Set<TopicPartition> partitions, final Exception
16471648
addToResetList(
16481649
partition,
16491650
seekToBeginning,
1650-
"Setting topic '{}' to consume from earliest offset",
1651+
"Setting topic '{}' to consume from 'earliest' offset",
16511652
loggedTopics
16521653
);
16531654
} else if (resetPolicy == AutoOffsetResetStrategy.LATEST) {
16541655
addToResetList(
16551656
partition,
16561657
seekToEnd,
1657-
"Setting topic '{}' to consume from latest offset",
1658+
"Setting topic '{}' to consume from 'latest' offset",
16581659
loggedTopics
16591660
);
16601661
} else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
16611662
addToResetList(
16621663
partition,
16631664
seekByDuration,
16641665
resetPolicy.duration().get(),
1665-
"Setting topic '{}' to consume from by_duration:{}",
1666+
"Setting topic '{}' to consume from 'by_duration:{}'",
16661667
resetPolicy.duration().get().toString(),
16671668
loggedTopics
16681669
);
@@ -1778,12 +1779,12 @@ private void resetOffsets(final Set<TopicPartition> partitions, final Exception
17781779
private void addToResetList(
17791780
final TopicPartition partition,
17801781
final Set<TopicPartition> partitions,
1781-
final String resetPolicy,
1782+
final String logMessage,
17821783
final Set<String> loggedTopics
17831784
) {
17841785
final String topic = partition.topic();
17851786
if (loggedTopics.add(topic)) {
1786-
log.info("Setting topic '{}' to consume from {} offset", topic, resetPolicy);
1787+
log.info(logMessage, topic);
17871788
}
17881789
partitions.add(partition);
17891790
}

0 commit comments

Comments
 (0)