Skip to content

Commit 73e9942

Browse files
committed
TEST
1 parent e8ab82a commit 73e9942

File tree

9 files changed

+490
-10
lines changed

9 files changed

+490
-10
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/filtering.adoc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,40 @@ public void listen(List<Thing> things) {
8787
----
8888
However, in this case, `IgnoreEmptyBatchRecordFilterStrategy` always returns empty list and return `false` as result of `ignoreEmptyBatch()`.
8989
Thus `KafkaListener#listen(...)` always will be invoked.
90+
91+
== RECORD_FILTERED Acknowledgment Mode
92+
93+
Starting with version 3.1, the `RECORD_FILTERED` acknowledgment mode is available for use with filtered message listeners.
94+
Unlike the standard filtering approach where filtered records are still committed, `RECORD_FILTERED` mode ensures that offsets for filtered records are not committed, allowing them to be reprocessed if needed.
95+
96+
Key characteristics:
97+
98+
* Auto-commit is automatically disabled when using `RECORD_FILTERED` mode
99+
* Filtered records are tracked internally and excluded from offset commits
100+
* Works with both single record and batch listeners
101+
* Compatible with filtering adapters and record filter strategies
102+
* Supports transactions
103+
104+
Example configuration:
105+
106+
[source, java]
107+
----
108+
@Bean
109+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
110+
ConcurrentKafkaListenerContainerFactory<String, String> factory =
111+
new ConcurrentKafkaListenerContainerFactory<>();
112+
factory.setConsumerFactory(consumerFactory());
113+
factory.getContainerProperties().setAckMode(AckMode.RECORD_FILTERED);
114+
factory.setRecordFilterStrategy(record -> record.value().contains("ignore"));
115+
return factory;
116+
}
117+
118+
@KafkaListener(topics = "my-topic")
119+
public void listen(String message) {
120+
// Only non-filtered messages will reach this method
121+
// Filtered messages will not have their offsets committed
122+
process(message);
123+
}
124+
----
125+
126+
When using `RECORD_FILTERED` mode, the framework automatically marks filtered records so their offsets are excluded from commits, ensuring they remain available for reprocessing in scenarios such as consumer restart or rebalancing.

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ The `MessageListener` is called for each record.
215215
The following lists describes the action taken by the container for each `AckMode` (when transactions are not being used):
216216

217217
* `RECORD`: Commit the offset when the listener returns after processing the record.
218+
* `RECORD_FILTERED`: Like `RECORD`, but when a `RecordFilterStrategy` filters a record, that record's offset is not committed. Auto-commit is disabled. Filtered records are tracked and their offsets are excluded from commits to ensure they can be reprocessed if needed. Requires the use of a `RecordFilterStrategy`.
218219
* `BATCH`: Commit the offset when all the records returned by the `poll()` have been processed.
219220
* `TIME`: Commit the offset when all the records returned by the `poll()` have been processed, as long as the `ackTime` since the last commit has been exceeded.
220221
* `COUNT`: Commit the offset when all the records returned by the `poll()` have been processed, as long as `ackCount` records have been received since the last commit.

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,26 @@
11
= What's new?
22

3+
[[whats-new-in-3-1-since-3-0]]
4+
== What's New in 3.1 Since 3.0
5+
:page-section-summary-toc: 1
6+
7+
This section covers the changes made from version 3.0 to version 3.1.
8+
9+
[[x31-record-filtered-ack-mode]]
10+
=== RECORD_FILTERED Acknowledgment Mode
11+
12+
A new acknowledgment mode `RECORD_FILTERED` has been added that works in conjunction with record filtering.
13+
Unlike standard filtering where filtered records are still committed, this mode excludes filtered record offsets from commits, ensuring they can be reprocessed if needed.
14+
15+
Key features:
16+
* Automatically disables auto-commit
17+
* Tracks filtered records and excludes their offsets from commits
18+
* Works with both single record and batch listeners
19+
* Supports transactions
20+
* Compatible with existing filtering adapters and record filter strategies
21+
22+
See xref:kafka/receiving-messages/filtering.adoc#record-filtered-acknowledgment-mode[RECORD_FILTERED Acknowledgment Mode] for more details.
23+
324
[[whats-new-in-4-0-since-3-3]]
425
== What's New in 4.0 Since 3.3
526
:page-section-summary-toc: 1

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,14 @@ public enum AckMode {
114114
*/
115115
MANUAL_IMMEDIATE,
116116

117+
/**
118+
* Like RECORD, but when a RecordFilterStrategy filters a record,
119+
* that record's offset is not committed. Auto-commit is disabled.
120+
* Filtered records are tracked and their offsets are excluded from commits
121+
* to ensure they can be reprocessed if needed.
122+
*/
123+
RECORD_FILTERED,
124+
117125
}
118126

119127
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 73 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
114114
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
115115
import org.springframework.kafka.support.Acknowledgment;
116+
import org.springframework.kafka.support.FilterAwareAcknowledgment;
116117
import org.springframework.kafka.support.KafkaHeaders;
117118
import org.springframework.kafka.support.KafkaUtils;
118119
import org.springframework.kafka.support.LogIfLevelEnabled;
@@ -639,6 +640,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
639640

640641
private final Map<TopicPartition, Long> savedPositions = new HashMap<>();
641642

643+
private final Map<TopicPartition, Long> filteredOffsets = new ConcurrentHashMap<>();
644+
642645
private final GenericMessageListener<?> genericListener;
643646

644647
private final @Nullable ConsumerSeekAware consumerSeekAwareListener;
@@ -677,6 +680,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
677680

678681
private final boolean isRecordAck;
679682

683+
private final boolean isRecordFilteredAck;
684+
680685
private final BlockingQueue<ConsumerRecord<K, V>> acks = new LinkedBlockingQueue<>();
681686

682687
private final BlockingQueue<TopicPartitionOffset> seeks = new LinkedBlockingQueue<>();
@@ -871,6 +876,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
871876
this.isManualImmediateAck = AckMode.MANUAL_IMMEDIATE.equals(this.ackMode);
872877
this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
873878
this.isRecordAck = this.ackMode.equals(AckMode.RECORD);
879+
this.isRecordFilteredAck = this.ackMode.equals(AckMode.RECORD_FILTERED);
874880
boolean isOutOfCommit = this.isAnyManualAck && this.asyncReplies;
875881
this.offsetsInThisBatch = isOutOfCommit ? new ConcurrentHashMap<>() : null;
876882
this.deferredOffsets = isOutOfCommit ? new ConcurrentHashMap<>() : null;
@@ -930,8 +936,8 @@ else if (listener instanceof MessageListener) {
930936
this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
931937
|| listenerType.equals(ListenerType.CONSUMER_AWARE);
932938
this.commonErrorHandler = determineCommonErrorHandler();
933-
Assert.state(!this.isBatchListener || !this.isRecordAck,
934-
"Cannot use AckMode.RECORD with a batch listener");
939+
Assert.state(!this.isBatchListener || (!this.isRecordAck && !this.isRecordFilteredAck),
940+
"Cannot use AckMode.RECORD or AckMode.RECORD_FILTERED with a batch listener");
935941
if (this.containerProperties.getScheduler() != null) {
936942
this.taskScheduler = this.containerProperties.getScheduler();
937943
this.taskSchedulerExplicitlySet = true;
@@ -1203,6 +1209,10 @@ else if (autoCommitOverride != null) {
12031209
else {
12041210
isAutoCommit = KafkaMessageListenerContainer.this.consumerFactory.isAutoCommit();
12051211
}
1212+
if (this.isRecordFilteredAck && isAutoCommit) {
1213+
consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
1214+
isAutoCommit = false;
1215+
}
12061216
Assert.state(!this.isAnyManualAck || !isAutoCommit,
12071217
() -> "Consumer cannot be configured for auto commit for ackMode " + this.ackMode);
12081218
return isAutoCommit;
@@ -1514,7 +1524,7 @@ protected void handleAsyncFailure() {
15141524
}
15151525

15161526
private void doProcessCommits() {
1517-
if (!this.autoCommit && !this.isRecordAck) {
1527+
if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) {
15181528
try {
15191529
processCommits();
15201530
}
@@ -2278,7 +2288,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
22782288
}
22792289
getAfterRollbackProcessor().clearThreadState();
22802290
}
2281-
if (!this.autoCommit && !this.isRecordAck) {
2291+
if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) {
22822292
processCommits();
22832293
}
22842294
}
@@ -2736,7 +2746,7 @@ private void listenerInfo(final ConsumerRecord<K, V> cRecord) {
27362746
}
27372747

27382748
private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecord<K, V> cRecord) {
2739-
if (!this.autoCommit && !this.isRecordAck) {
2749+
if (!this.autoCommit && !this.isRecordAck && !this.isRecordFilteredAck) {
27402750
processCommits();
27412751
}
27422752
List<ConsumerRecord<?, ?>> list = new ArrayList<>();
@@ -3092,7 +3102,7 @@ public void ackCurrent(final ConsumerRecord<K, V> cRecord) {
30923102
}
30933103

30943104
public void ackCurrent(final ConsumerRecord<K, V> cRecord, boolean commitRecovered) {
3095-
if (this.isRecordAck && this.producer == null) {
3105+
if ((this.isRecordAck || this.isRecordFilteredAck) && this.producer == null) {
30963106
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = buildSingleCommits(cRecord);
30973107
this.commitLogger.log(() -> COMMITTING + offsetsToCommit);
30983108
commitOffsets(offsetsToCommit);
@@ -3121,6 +3131,7 @@ private void doSendOffsets(@Nullable Producer<?, ?> prod, Map<TopicPartition, Of
31213131
if (this.fixTxOffsets) {
31223132
this.lastCommits.putAll(commits);
31233133
}
3134+
cleanupFilteredOffsetsAfterCommit(commits);
31243135
}
31253136

31263137
private void processCommits() {
@@ -3365,6 +3376,7 @@ private void commitIfNecessary() {
33653376
this.commitLogger.log(() -> COMMITTING + commits);
33663377
try {
33673378
commitOffsets(commits);
3379+
cleanupFilteredOffsetsAfterCommit(commits);
33683380
}
33693381
catch (@SuppressWarnings(UNUSED) WakeupException e) {
33703382
// ignore - not polling
@@ -3432,9 +3444,10 @@ private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> commits, int re
34323444
}
34333445

34343446
Map<TopicPartition, OffsetAndMetadata> buildSingleCommits(ConsumerRecord<K, V> cRecord) {
3435-
return Collections.singletonMap(
3447+
Map<TopicPartition, OffsetAndMetadata> commits = Collections.singletonMap(
34363448
new TopicPartition(cRecord.topic(), cRecord.partition()),
34373449
createOffsetAndMetadata(cRecord.offset() + 1));
3450+
return pruneByFilteredOffsets(commits);
34383451
}
34393452

34403453
private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
@@ -3443,7 +3456,7 @@ private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
34433456
commits.put(topicPartition, createOffsetAndMetadata(offset + 1));
34443457
});
34453458
this.offsets.clear();
3446-
return commits;
3459+
return pruneByFilteredOffsets(commits);
34473460
}
34483461

34493462
private Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(ConsumerRecords<K, V> records) {
@@ -3545,7 +3558,43 @@ private OffsetAndMetadata createOffsetAndMetadata(long offset) {
35453558
return this.offsetAndMetadataProvider.provide(this.listenerMetadata, offset);
35463559
}
35473560

3548-
private final class ConsumerAcknowledgment implements Acknowledgment {
3561+
private Map<TopicPartition, OffsetAndMetadata> pruneByFilteredOffsets(
3562+
Map<TopicPartition, OffsetAndMetadata> commits) {
3563+
if (!this.isRecordFilteredAck || this.filteredOffsets.isEmpty()) {
3564+
return commits;
3565+
}
3566+
Map<TopicPartition, OffsetAndMetadata> prunedCommits = new HashMap<>(commits.size());
3567+
commits.forEach((tp, oam) -> {
3568+
Long filteredOffset = this.filteredOffsets.get(tp);
3569+
if (filteredOffset == null) {
3570+
prunedCommits.put(tp, oam);
3571+
} else {
3572+
long commitOffset = oam.offset();
3573+
// Only commit if the commit offset is beyond the filtered offset
3574+
// filteredOffset is inclusive, so we need commitOffset > filteredOffset + 1
3575+
if (commitOffset > filteredOffset + 1) {
3576+
prunedCommits.put(tp, oam);
3577+
}
3578+
// else: skip this partition for now, filtered record is not yet processed
3579+
}
3580+
});
3581+
return prunedCommits;
3582+
}
3583+
3584+
private void cleanupFilteredOffsetsAfterCommit(Map<TopicPartition, OffsetAndMetadata> commits) {
3585+
if (!this.isRecordFilteredAck || this.filteredOffsets.isEmpty()) {
3586+
return;
3587+
}
3588+
// Remove filtered offsets that are now below the committed offsets
3589+
commits.forEach((tp, oam) -> {
3590+
Long filteredOffset = this.filteredOffsets.get(tp);
3591+
if (filteredOffset != null && filteredOffset < oam.offset() - 1) {
3592+
this.filteredOffsets.remove(tp);
3593+
}
3594+
});
3595+
}
3596+
3597+
private final class ConsumerAcknowledgment implements FilterAwareAcknowledgment {
35493598

35503599
private final ConsumerRecord<K, V> cRecord;
35513600

@@ -3578,14 +3627,20 @@ public boolean isOutOfOrderCommit() {
35783627
return ListenerConsumer.this.asyncReplies;
35793628
}
35803629

3630+
@Override
3631+
public void markFiltered(ConsumerRecord<?, ?> record) {
3632+
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
3633+
ListenerConsumer.this.filteredOffsets.merge(tp, record.offset(), Math::max);
3634+
}
3635+
35813636
@Override
35823637
public String toString() {
35833638
return "Acknowledgment for " + KafkaUtils.format(this.cRecord);
35843639
}
35853640

35863641
}
35873642

3588-
private final class ConsumerBatchAcknowledgment implements Acknowledgment {
3643+
private final class ConsumerBatchAcknowledgment implements FilterAwareAcknowledgment {
35893644

35903645
private final ConsumerRecords<K, V> records;
35913646

@@ -3681,6 +3736,12 @@ public boolean isOutOfOrderCommit() {
36813736
return ListenerConsumer.this.asyncReplies;
36823737
}
36833738

3739+
@Override
3740+
public void markFiltered(ConsumerRecord<?, ?> record) {
3741+
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
3742+
ListenerConsumer.this.filteredOffsets.merge(tp, record.offset(), Math::max);
3743+
}
3744+
36843745
@Override
36853746
public String toString() {
36863747
return "Acknowledgment for " + this.records;
@@ -3734,6 +3795,8 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
37343795
}
37353796
ListenerConsumer.this.pausedForNack.removeAll(partitions);
37363797
partitions.forEach(ListenerConsumer.this.lastCommits::remove);
3798+
// Clean up filtered offsets for revoked partitions
3799+
partitions.forEach(ListenerConsumer.this.filteredOffsets::remove);
37373800
synchronized (ListenerConsumer.this) {
37383801
Map<TopicPartition, List<Long>> pendingOffsets = ListenerConsumer.this.offsetsInThisBatch;
37393802
if (pendingOffsets != null) {

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringBatchMessageListenerAdapter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.springframework.kafka.listener.BatchMessageListener;
2727
import org.springframework.kafka.listener.ListenerType;
2828
import org.springframework.kafka.support.Acknowledgment;
29+
import org.springframework.kafka.support.FilterAwareAcknowledgment;
2930
import org.springframework.util.Assert;
3031

3132
/**
@@ -84,16 +85,31 @@ public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgme
8485
final List<ConsumerRecord<K, V>> consumerRecords = recordFilterStrategy.filterBatch(records);
8586
Assert.state(consumerRecords != null, "filter returned null from filterBatch");
8687

88+
// Mark filtered records
89+
if (acknowledgment instanceof FilterAwareAcknowledgment faa && consumerRecords.size() < records.size()) {
90+
records.stream()
91+
.filter(record -> !consumerRecords.contains(record))
92+
.forEach(faa::markFiltered);
93+
}
94+
8795
if (recordFilterStrategy.ignoreEmptyBatch() &&
8896
consumerRecords.isEmpty() &&
8997
acknowledgment != null) {
98+
// All records were filtered but ignoreEmptyBatch is true
99+
if (acknowledgment instanceof FilterAwareAcknowledgment faa) {
100+
faa.markFiltered(records);
101+
}
90102
acknowledgment.acknowledge();
91103
}
92104
else if (!consumerRecords.isEmpty() || this.consumerAware
93105
|| (!this.ackDiscarded && this.delegateType.equals(ListenerType.ACKNOWLEDGING))) {
94106
invokeDelegate(consumerRecords, acknowledgment, consumer);
95107
}
96108
else {
109+
// All records were filtered and ignoreEmptyBatch is false
110+
if (acknowledgment instanceof FilterAwareAcknowledgment faa) {
111+
faa.markFiltered(records);
112+
}
97113
if (this.ackDiscarded && acknowledgment != null) {
98114
acknowledgment.acknowledge();
99115
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
2424
import org.springframework.kafka.listener.MessageListener;
2525
import org.springframework.kafka.support.Acknowledgment;
26+
import org.springframework.kafka.support.FilterAwareAcknowledgment;
2627

2728
/**
2829
* A {@link MessageListener} adapter that implements filter logic
@@ -77,6 +78,9 @@ public void onMessage(ConsumerRecord<K, V> consumerRecord, @Nullable Acknowledgm
7778
}
7879
}
7980
else {
81+
if (acknowledgment instanceof FilterAwareAcknowledgment faa) {
82+
faa.markFiltered(consumerRecord);
83+
}
8084
ackFilteredIfNecessary(acknowledgment);
8185
}
8286
}

0 commit comments

Comments
 (0)