Skip to content

Commit eeafe0a

Browse files
committed
MINOR: fix incorrect offset reset logging (#20558)
We need to only pass in the reset strategy, as the `logMessage` parameter was removed. Reviewers: Chia-Ping Tsai <[email protected]>, Lucas Brutschy <[email protected]>
1 parent 93c4dc1 commit eeafe0a

File tree

1 file changed

+6
-5
lines changed

1 file changed

+6
-5
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1524,6 +1524,7 @@ private ConsumerRecords<byte[], byte[]> pollRequests(final Duration pollTime) {
15241524
try {
15251525
records = mainConsumer.poll(pollTime);
15261526
} catch (final InvalidOffsetException e) {
1527+
log.info("Found no valid offset for {} partitions, resetting.", e.partitions().size());
15271528
resetOffsets(e.partitions(), e);
15281529
}
15291530

@@ -1598,22 +1599,22 @@ private void resetOffsets(final Set<TopicPartition> partitions, final Exception
15981599
addToResetList(
15991600
partition,
16001601
seekToBeginning,
1601-
"Setting topic '{}' to consume from earliest offset",
1602+
"Setting topic '{}' to consume from 'earliest' offset",
16021603
loggedTopics
16031604
);
16041605
} else if (resetPolicy == AutoOffsetResetStrategy.LATEST) {
16051606
addToResetList(
16061607
partition,
16071608
seekToEnd,
1608-
"Setting topic '{}' to consume from latest offset",
1609+
"Setting topic '{}' to consume from 'latest' offset",
16091610
loggedTopics
16101611
);
16111612
} else if (resetPolicy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
16121613
addToResetList(
16131614
partition,
16141615
seekByDuration,
16151616
resetPolicy.duration().get(),
1616-
"Setting topic '{}' to consume from by_duration:{}",
1617+
"Setting topic '{}' to consume from 'by_duration:{}'",
16171618
resetPolicy.duration().get().toString(),
16181619
loggedTopics
16191620
);
@@ -1729,12 +1730,12 @@ private void resetOffsets(final Set<TopicPartition> partitions, final Exception
17291730
private void addToResetList(
17301731
final TopicPartition partition,
17311732
final Set<TopicPartition> partitions,
1732-
final String resetPolicy,
1733+
final String logMessage,
17331734
final Set<String> loggedTopics
17341735
) {
17351736
final String topic = partition.topic();
17361737
if (loggedTopics.add(topic)) {
1737-
log.info("Setting topic '{}' to consume from {} offset", topic, resetPolicy);
1738+
log.info(logMessage, topic);
17381739
}
17391740
partitions.add(partition);
17401741
}

0 commit comments

Comments
 (0)