Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,40 @@ public void listen(List<Thing> things) {
----
However, in this case, `IgnoreEmptyBatchRecordFilterStrategy` always returns empty list and return `false` as result of `ignoreEmptyBatch()`.
Thus `KafkaListener#listen(...)` always will be invoked.

== RECORD_FILTERED Acknowledgment Mode

Starting with version 3.1, the `RECORD_FILTERED` acknowledgment mode is available for use with filtered message listeners.
Unlike the standard filtering approach where filtered records are still committed, `RECORD_FILTERED` mode ensures that offsets for filtered records are not committed, allowing them to be reprocessed if needed.

Key characteristics:

* Auto-commit is automatically disabled when using `RECORD_FILTERED` mode
* Filtered records are tracked internally and excluded from offset commits
* Works with both single record and batch listeners
* Compatible with filtering adapters and record filter strategies
* Supports transactions

Example configuration:

[source, java]
----
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD_FILTERED);
factory.setRecordFilterStrategy(record -> record.value().contains("ignore"));
return factory;
}

@KafkaListener(topics = "my-topic")
public void listen(String message) {
// Only non-filtered messages will reach this method
// Filtered messages will not have their offsets committed
process(message);
}
----

When using `RECORD_FILTERED` mode, the framework automatically marks filtered records so their offsets are excluded from commits, ensuring they remain available for reprocessing in scenarios such as consumer restart or rebalancing.
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ The `MessageListener` is called for each record.
The following lists describes the action taken by the container for each `AckMode` (when transactions are not being used):

* `RECORD`: Commit the offset when the listener returns after processing the record.
* `RECORD_FILTERED`: Like `RECORD`, but when a `RecordFilterStrategy` filters a record, that record's offset is not committed. Auto-commit is disabled. Filtered records are tracked and their offsets are excluded from commits to ensure they can be reprocessed if needed. Requires the use of a `RecordFilterStrategy`.
* `BATCH`: Commit the offset when all the records returned by the `poll()` have been processed.
* `TIME`: Commit the offset when all the records returned by the `poll()` have been processed, as long as the `ackTime` since the last commit has been exceeded.
* `COUNT`: Commit the offset when all the records returned by the `poll()` have been processed, as long as `ackCount` records have been received since the last commit.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
= What's new?

[[whats-new-in-3-1-since-3-0]]
== What's New in 3.1 Since 3.0
:page-section-summary-toc: 1

This section covers the changes made from version 3.0 to version 3.1.

[[x31-record-filtered-ack-mode]]
=== RECORD_FILTERED Acknowledgment Mode

A new acknowledgment mode `RECORD_FILTERED` has been added that works in conjunction with record filtering.
Unlike standard filtering where filtered records are still committed, this mode excludes filtered record offsets from commits, ensuring they can be reprocessed if needed.

Key features:
* Automatically disables auto-commit
* Tracks filtered records and excludes their offsets from commits
* Works with both single record and batch listeners
* Supports transactions
* Compatible with existing filtering adapters and record filter strategies

See xref:kafka/receiving-messages/filtering.adoc#record-filtered-acknowledgment-mode[RECORD_FILTERED Acknowledgment Mode] for more details.

[[whats-new-in-4-0-since-3-3]]
== What's New in 4.0 Since 3.3
:page-section-summary-toc: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ public enum AckMode {
*/
MANUAL_IMMEDIATE,

/**
* Like RECORD, but when a RecordFilterStrategy filters a record,
* that record's offset is not committed. Auto-commit is disabled.
* Filtered records are tracked and their offsets are excluded from commits
* to ensure they can be reprocessed if needed.
*/
RECORD_FILTERED,

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.FilterAwareAcknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.LogIfLevelEnabled;
Expand Down Expand Up @@ -639,6 +640,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final Map<TopicPartition, Long> savedPositions = new HashMap<>();

private final Map<TopicPartition, Long> filteredOffsets = new ConcurrentHashMap<>();

private final GenericMessageListener<?> genericListener;

private final @Nullable ConsumerSeekAware consumerSeekAwareListener;
Expand Down Expand Up @@ -677,6 +680,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final boolean isRecordAck;

private final boolean isRecordFilteredAck;

private final BlockingQueue<ConsumerRecord<K, V>> acks = new LinkedBlockingQueue<>();

private final BlockingQueue<TopicPartitionOffset> seeks = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -871,6 +876,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
this.isManualImmediateAck = AckMode.MANUAL_IMMEDIATE.equals(this.ackMode);
this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
this.isRecordAck = this.ackMode.equals(AckMode.RECORD);
this.isRecordFilteredAck = this.ackMode.equals(AckMode.RECORD_FILTERED);
boolean isOutOfCommit = this.isAnyManualAck && this.asyncReplies;
this.offsetsInThisBatch = isOutOfCommit ? new ConcurrentHashMap<>() : null;
this.deferredOffsets = isOutOfCommit ? new ConcurrentHashMap<>() : null;
Expand Down Expand Up @@ -930,8 +936,8 @@ else if (listener instanceof MessageListener) {
this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
|| listenerType.equals(ListenerType.CONSUMER_AWARE);
this.commonErrorHandler = determineCommonErrorHandler();
Assert.state(!this.isBatchListener || !this.isRecordAck,
"Cannot use AckMode.RECORD with a batch listener");
Assert.state(!this.isBatchListener || (!this.isRecordAck && !this.isRecordFilteredAck),
"Cannot use AckMode.RECORD or AckMode.RECORD_FILTERED with a batch listener");
if (this.containerProperties.getScheduler() != null) {
this.taskScheduler = this.containerProperties.getScheduler();
this.taskSchedulerExplicitlySet = true;
Expand Down Expand Up @@ -1203,6 +1209,10 @@ else if (autoCommitOverride != null) {
else {
isAutoCommit = KafkaMessageListenerContainer.this.consumerFactory.isAutoCommit();
}
if (this.isRecordFilteredAck && isAutoCommit) {
consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
isAutoCommit = false;
}
Assert.state(!this.isAnyManualAck || !isAutoCommit,
() -> "Consumer cannot be configured for auto commit for ackMode " + this.ackMode);
return isAutoCommit;
Expand Down Expand Up @@ -1514,7 +1524,7 @@ protected void handleAsyncFailure() {
}

private void doProcessCommits() {
if (!this.autoCommit && !this.isRecordAck) {
if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) {
try {
processCommits();
}
Expand Down Expand Up @@ -2278,7 +2288,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
}
getAfterRollbackProcessor().clearThreadState();
}
if (!this.autoCommit && !this.isRecordAck) {
if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) {
processCommits();
}
}
Expand Down Expand Up @@ -2736,7 +2746,7 @@ private void listenerInfo(final ConsumerRecord<K, V> cRecord) {
}

private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecord<K, V> cRecord) {
if (!this.autoCommit && !this.isRecordAck) {
if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) {
processCommits();
}
List<ConsumerRecord<?, ?>> list = new ArrayList<>();
Expand Down Expand Up @@ -3092,7 +3102,7 @@ public void ackCurrent(final ConsumerRecord<K, V> cRecord) {
}

public void ackCurrent(final ConsumerRecord<K, V> cRecord, boolean commitRecovered) {
if (this.isRecordAck && this.producer == null) {
if ((this.isRecordAck || this.isRecordFilteredAck) && this.producer == null) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = buildSingleCommits(cRecord);
this.commitLogger.log(() -> COMMITTING + offsetsToCommit);
commitOffsets(offsetsToCommit);
Expand Down Expand Up @@ -3121,6 +3131,7 @@ private void doSendOffsets(@Nullable Producer<?, ?> prod, Map<TopicPartition, Of
if (this.fixTxOffsets) {
this.lastCommits.putAll(commits);
}
cleanupFilteredOffsetsAfterCommit(commits);
}

private void processCommits() {
Expand Down Expand Up @@ -3365,6 +3376,7 @@ private void commitIfNecessary() {
this.commitLogger.log(() -> COMMITTING + commits);
try {
commitOffsets(commits);
cleanupFilteredOffsetsAfterCommit(commits);
}
catch (@SuppressWarnings(UNUSED) WakeupException e) {
// ignore - not polling
Expand Down Expand Up @@ -3432,9 +3444,10 @@ private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> commits, int re
}

Map<TopicPartition, OffsetAndMetadata> buildSingleCommits(ConsumerRecord<K, V> cRecord) {
return Collections.singletonMap(
Map<TopicPartition, OffsetAndMetadata> commits = Collections.singletonMap(
new TopicPartition(cRecord.topic(), cRecord.partition()),
createOffsetAndMetadata(cRecord.offset() + 1));
return pruneByFilteredOffsets(commits);
}

private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
Expand All @@ -3443,7 +3456,7 @@ private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
commits.put(topicPartition, createOffsetAndMetadata(offset + 1));
});
this.offsets.clear();
return commits;
return pruneByFilteredOffsets(commits);
}

private Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(ConsumerRecords<K, V> records) {
Expand Down Expand Up @@ -3545,7 +3558,43 @@ private OffsetAndMetadata createOffsetAndMetadata(long offset) {
return this.offsetAndMetadataProvider.provide(this.listenerMetadata, offset);
}

private final class ConsumerAcknowledgment implements Acknowledgment {
private Map<TopicPartition, OffsetAndMetadata> pruneByFilteredOffsets(
Map<TopicPartition, OffsetAndMetadata> commits) {
if (!this.isRecordFilteredAck || this.filteredOffsets.isEmpty()) {
return commits;
}
Map<TopicPartition, OffsetAndMetadata> prunedCommits = new HashMap<>(commits.size());
commits.forEach((tp, oam) -> {
Long filteredOffset = this.filteredOffsets.get(tp);
if (filteredOffset == null) {
prunedCommits.put(tp, oam);
} else {
long commitOffset = oam.offset();
// Only commit if the commit offset is beyond the filtered offset
// filteredOffset is inclusive, so we need commitOffset > filteredOffset + 1
if (commitOffset > filteredOffset + 1) {
prunedCommits.put(tp, oam);
}
// else: skip this partition for now, filtered record is not yet processed
}
});
return prunedCommits;
}

private void cleanupFilteredOffsetsAfterCommit(Map<TopicPartition, OffsetAndMetadata> commits) {
if (!this.isRecordFilteredAck || this.filteredOffsets.isEmpty()) {
return;
}
// Remove filtered offsets that are now below the committed offsets
commits.forEach((tp, oam) -> {
Long filteredOffset = this.filteredOffsets.get(tp);
if (filteredOffset != null && filteredOffset < oam.offset() - 1) {
this.filteredOffsets.remove(tp);
}
});
}

private final class ConsumerAcknowledgment implements FilterAwareAcknowledgment {

private final ConsumerRecord<K, V> cRecord;

Expand Down Expand Up @@ -3578,14 +3627,20 @@ public boolean isOutOfOrderCommit() {
return ListenerConsumer.this.asyncReplies;
}

@Override
public void markFiltered(ConsumerRecord<?, ?> record) {
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
ListenerConsumer.this.filteredOffsets.merge(tp, record.offset(), Math::max);
}

@Override
public String toString() {
return "Acknowledgment for " + KafkaUtils.format(this.cRecord);
}

}

private final class ConsumerBatchAcknowledgment implements Acknowledgment {
private final class ConsumerBatchAcknowledgment implements FilterAwareAcknowledgment {

private final ConsumerRecords<K, V> records;

Expand Down Expand Up @@ -3681,6 +3736,12 @@ public boolean isOutOfOrderCommit() {
return ListenerConsumer.this.asyncReplies;
}

@Override
public void markFiltered(ConsumerRecord<?, ?> record) {
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
ListenerConsumer.this.filteredOffsets.merge(tp, record.offset(), Math::max);
}

@Override
public String toString() {
return "Acknowledgment for " + this.records;
Expand Down Expand Up @@ -3734,6 +3795,8 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
ListenerConsumer.this.pausedForNack.removeAll(partitions);
partitions.forEach(ListenerConsumer.this.lastCommits::remove);
// Clean up filtered offsets for revoked partitions
partitions.forEach(ListenerConsumer.this.filteredOffsets::remove);
synchronized (ListenerConsumer.this) {
Map<TopicPartition, List<Long>> pendingOffsets = ListenerConsumer.this.offsetsInThisBatch;
if (pendingOffsets != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.ListenerType;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.FilterAwareAcknowledgment;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -84,16 +85,31 @@ public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgme
final List<ConsumerRecord<K, V>> consumerRecords = recordFilterStrategy.filterBatch(records);
Assert.state(consumerRecords != null, "filter returned null from filterBatch");

// Mark filtered records
if (acknowledgment instanceof FilterAwareAcknowledgment faa && consumerRecords.size() < records.size()) {
records.stream()
.filter(record -> !consumerRecords.contains(record))
.forEach(faa::markFiltered);
}

if (recordFilterStrategy.ignoreEmptyBatch() &&
consumerRecords.isEmpty() &&
acknowledgment != null) {
// All records were filtered but ignoreEmptyBatch is true
if (acknowledgment instanceof FilterAwareAcknowledgment faa) {
faa.markFiltered(records);
}
acknowledgment.acknowledge();
}
else if (!consumerRecords.isEmpty() || this.consumerAware
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
invokeDelegate(consumerRecords, acknowledgment, consumer);
}
else {
// All records were filtered and ignoreEmptyBatch is false
if (acknowledgment instanceof FilterAwareAcknowledgment faa) {
faa.markFiltered(records);
}
if (this.ackDiscarded && acknowledgment != null) {
acknowledgment.acknowledge();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.FilterAwareAcknowledgment;

/**
* A {@link MessageListener} adapter that implements filter logic
Expand Down Expand Up @@ -77,6 +78,9 @@ public void onMessage(ConsumerRecord<K, V> consumerRecord, @Nullable Acknowledgm
}
}
else {
if (acknowledgment instanceof FilterAwareAcknowledgment faa) {
faa.markFiltered(consumerRecord);
}
ackFilteredIfNecessary(acknowledgment);
}
}
Expand Down
Loading
Loading