Skip to content

Commit 57bccc4

Browse files
committed
Simplify share consumer acknowledgment configuration and consolidate message converters
- Replace ShareAcknowledgmentMode enum with boolean explicitShareAcknowledgment field - Consolidate duplicate toMessage/toShareMessage methods into single toMessage with Object parameters - Merge invoke/invokeHandler methods in MessagingMessageListenerAdapter to use Object parameters - Eliminate duplicate commonHeaders methods in MessageConverter interface - Add validation for explicit acknowledgment mode requiring proper listener interface - Simplify ShareKafkaMessageListenerContainerUnitTests to focus on configuration validation - Other formatting fixes in docs and javadocs Signed-off-by: Soby Chacko <[email protected]>
1 parent 11bf13e commit 57bccc4

14 files changed

+156
-621
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc

Lines changed: 16 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ Share consumers support two acknowledgment modes that control how records are ac
233233

234234
[[share-implicit-acknowledgment]]
235235
=== Implicit Acknowledgment (Default)
236+
236237
In implicit mode, records are automatically acknowledged based on processing outcome:
237238

238239
Successful processing: Records are acknowledged as `ACCEPT`
@@ -245,9 +246,8 @@ public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerCont
245246
ShareConsumerFactory<String, String> shareConsumerFactory) {
246247
ShareKafkaListenerContainerFactory<String, String> factory =
247248
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
248-
// Implicit mode is the default
249-
factory.getContainerProperties().setShareAcknowledgmentMode(
250-
ContainerProperties.ShareAcknowledgmentMode.IMPLICIT);
249+
// Implicit mode is the default (false means implicit acknowledgment)
250+
factory.getContainerProperties().setExplicitShareAcknowledgment(false);
251251
252252
return factory;
253253
}
@@ -287,8 +287,8 @@ public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaList
287287
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
288288
289289
// Configure acknowledgment mode at container factory level
290-
factory.getContainerProperties()
291-
.setShareAcknowledgmentMode(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT);
290+
// true means explicit acknowledgment is required
291+
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
292292
293293
return factory;
294294
}
@@ -298,9 +298,9 @@ public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaList
298298

299299
When both configuration methods are used, Spring Kafka follows this precedence order (highest to lowest):
300300

301-
1. **Container Properties**: `containerProperties.setShareAcknowledgmentMode()`
302-
2. **Consumer Config**: `ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG`
303-
3. **Default**: `ShareAcknowledgmentMode.IMPLICIT`
301+
1. **Container Properties**: `containerProperties.setExplicitShareAcknowledgment(true/false)`
302+
2. **Consumer Config**: `ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG` ("implicit" or "explicit")
303+
3. **Default**: `false` (implicit acknowledgment)
304304

305305
[[share-acknowledgment-types]]
306306
=== Acknowledgment Types
@@ -319,12 +319,9 @@ The `ShareAcknowledgment` interface provides methods for explicit acknowledgment
319319
[source,java]
320320
----
321321
public interface ShareAcknowledgment {
322-
void acknowledge(AcknowledgeType type);
323-
void acknowledge(); // Convenience method for ACCEPT
324-
void release(); // Convenience method for RELEASE
325-
void reject(); // Convenience method for REJECT
326-
boolean isAcknowledged();
327-
AcknowledgeType getAcknowledgmentType();
322+
void acknowledge();
323+
void release();
324+
void reject();
328325
}
329326
----
330327

@@ -349,7 +346,7 @@ public void listen(ConsumerRecord<String, String> record) {
349346
[[share-acknowledging-listener]]
350347
==== AcknowledgingShareConsumerAwareMessageListener
351348

352-
This interface provides access to the ShareConsumer instance with optional acknowledgment support.
349+
This interface provides access to the `ShareConsumer` instance with optional acknowledgment support.
353350
The acknowledgment parameter is nullable and depends on the container's acknowledgment mode:
354351

355352
===== Implicit Mode Example (acknowledgment is null)
@@ -412,9 +409,9 @@ public class ExplicitAckListener {
412409

413410
In explicit acknowledgment mode, the container enforces important constraints:
414411

415-
Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged.
416-
One-time Acknowledgment: Each record can only be acknowledged once.
417-
Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.
412+
Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged.
413+
One-time Acknowledgment: Each record can only be acknowledged once.
414+
Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.
418415

419416
[WARNING]
420417
In explicit mode, failing to acknowledge records will block further message processing.
@@ -434,7 +431,7 @@ public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerCont
434431
ShareKafkaListenerContainerFactory<String, String> factory =
435432
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
436433
437-
// Set acknowledgment timeout (default is 60 seconds)
434+
// Set acknowledgment timeout (default is 30 seconds)
438435
factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30));
439436
440437
return factory;
@@ -446,7 +443,6 @@ When a record exceeds the timeout, you'll see a warning like:
446443
WARN: Record not acknowledged within timeout (30 seconds).
447444
In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(),
448445
or ack.reject() for every record.
449-
Unacknowledged record: topic='my-topic', partition=0, offset=123
450446
----
451447

452448
This feature helps developers quickly identify when acknowledgment calls are missing from their code, preventing the common issue of "Spring Kafka does not consume new records any more" due to forgotten acknowledgments.
@@ -504,47 +500,6 @@ public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgm
504500
}
505501
----
506502

507-
[[share-acknowledgment-configuration]]
508-
=== Acknowledgment Mode Configuration
509-
510-
You can configure the acknowledgment mode at both the consumer factory and container levels:
511-
512-
[[share-factory-level-configuration]]
513-
==== Factory Level Configuration
514-
515-
[source,java]
516-
----
517-
@Bean
518-
public ShareConsumerFactory<String, String> explicitAckShareConsumerFactory() {
519-
Map<String, Object> props = new HashMap<>();
520-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
521-
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
522-
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
523-
// Configure explicit acknowledgment at the factory level
524-
props.put("share.acknowledgement.mode", "explicit");
525-
return new DefaultShareConsumerFactory<>(props);
526-
}
527-
----
528-
529-
[[share-container-level-configuration]]
530-
==== Container Level Configuration
531-
532-
[source,java]
533-
----
534-
@Bean
535-
public ShareKafkaListenerContainerFactory<String, String> customShareKafkaListenerContainerFactory(
536-
ShareConsumerFactory<String, String> shareConsumerFactory) {
537-
ShareKafkaListenerContainerFactory<String, String> factory =
538-
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
539-
540-
// Configure acknowledgment mode at container level
541-
factory.getContainerProperties().setShareAcknowledgmentMode(
542-
ContainerProperties.ShareAcknowledgmentMode.EXPLICIT);
543-
544-
return factory;
545-
}
546-
----
547-
548503
[[share-differences-from-regular-consumers]]
549504
== Differences from Regular Consumers
550505

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ public void setBatchListener(boolean batchListener) {
293293
this.batchListener = batchListener;
294294
}
295295

296-
public void setShareConsumer(Boolean shareConsumer) {
296+
public void setShareConsumer(boolean shareConsumer) {
297297
this.shareConsumer = shareConsumer;
298298
}
299299

spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java

Lines changed: 13 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
131131
validateShareConfiguration(endpoint);
132132

133133
// Determine acknowledgment mode following Spring Kafka's configuration precedence patterns
134-
ContainerProperties.ShareAcknowledgmentMode ackMode = determineAcknowledgmentMode(properties);
135-
properties.setShareAcknowledgmentMode(ackMode);
134+
boolean explicitAck = determineExplicitAcknowledgment(properties);
135+
properties.setExplicitShareAcknowledgment(explicitAck);
136136

137137
JavaUtils.INSTANCE
138138
.acceptIfNotNull(effectiveAutoStartup, instance::setAutoStartup)
@@ -144,49 +144,39 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
144144
}
145145

146146
/**
147-
* Determine the acknowledgment mode following Spring Kafka's configuration precedence patterns.
147+
* Determine whether explicit acknowledgment is required following Spring Kafka's configuration precedence patterns.
148148
* <p>
149149
* Configuration precedence (highest to lowest):
150150
* <ol>
151-
* <li>Container Properties: {@code containerProperties.getShareAcknowledgmentMode()} (if explicitly set)</li>
151+
* <li>Container Properties: {@code containerProperties.isExplicitShareAcknowledgment()} (if explicitly set)</li>
152152
* <li>Consumer Config: {@code ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG}</li>
153-
* <li>Default: {@code ShareAcknowledgmentMode.IMPLICIT}</li>
153+
* <li>Default: {@code false} (implicit acknowledgment)</li>
154154
* </ol>
155-
*
156155
* @param containerProperties the container properties to check
157-
* @return the resolved acknowledgment mode
156+
* @return true if explicit acknowledgment is required, false for implicit
158157
* @throws IllegalArgumentException if an invalid acknowledgment mode is configured
159158
*/
160-
private ContainerProperties.ShareAcknowledgmentMode determineAcknowledgmentMode(ContainerProperties containerProperties) {
161-
// 1. Check if explicitly set at container level (highest priority)
162-
// Note: We need to check if it was explicitly set vs using the default
163-
// For now, we assume if it's not the default, it was explicitly set
164-
ContainerProperties.ShareAcknowledgmentMode containerMode = containerProperties.getShareAcknowledgmentMode();
165-
if (containerMode != ContainerProperties.ShareAcknowledgmentMode.IMPLICIT) {
166-
// Container level setting takes precedence
167-
return containerMode;
168-
}
169-
170-
// 2. Check Kafka client configuration (middle priority)
159+
private boolean determineExplicitAcknowledgment(ContainerProperties containerProperties) {
160+
// Check Kafka client configuration
171161
Object clientAckMode = this.shareConsumerFactory.getConfigurationProperties()
172162
.get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
173163

174164
if (clientAckMode != null) {
175-
String mode = clientAckMode.toString();
165+
String mode = clientAckMode.toString().toLowerCase();
176166
if ("explicit".equals(mode)) {
177-
return ContainerProperties.ShareAcknowledgmentMode.EXPLICIT;
167+
return true;
178168
}
179169
else if ("implicit".equals(mode)) {
180-
return ContainerProperties.ShareAcknowledgmentMode.IMPLICIT;
170+
return false;
181171
}
182172
else {
183173
throw new IllegalArgumentException(
184174
"Invalid " + ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG + ": " + mode +
185175
". Must be 'implicit' or 'explicit'");
186176
}
187177
}
188-
// 3. Default (lowest priority)
189-
return ContainerProperties.ShareAcknowledgmentMode.IMPLICIT;
178+
// Default to implicit acknowledgment (false)
179+
return containerProperties.isExplicitShareAcknowledgment();
190180
}
191181

192182
private void validateShareConfiguration(KafkaListenerEndpoint endpoint) {
@@ -197,19 +187,6 @@ private void validateShareConfiguration(KafkaListenerEndpoint endpoint) {
197187
"Share groups operate at the record level.");
198188
}
199189

200-
// Validate acknowledgment mode consistency using official Kafka client configuration
201-
Object ackMode = this.shareConsumerFactory.getConfigurationProperties()
202-
.get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
203-
if (ackMode != null) {
204-
String ackModeStr = ackMode.toString().toLowerCase();
205-
boolean isValid = Arrays.stream(ContainerProperties.ShareAcknowledgmentMode.values())
206-
.anyMatch(mode -> mode.name().toLowerCase().equals(ackModeStr));
207-
if (!isValid) {
208-
throw new IllegalArgumentException(
209-
"Invalid " + ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG + ": " + ackMode +
210-
". Must be 'implicit' or 'explicit'");
211-
}
212-
}
213190
}
214191

215192
@Override

spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingShareConsumerAwareMessageListener.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
*
4848
* @see ShareAcknowledgment
4949
* @see ShareConsumer
50-
* @see ContainerProperties.ShareAcknowledgmentMode
5150
*/
5251
@FunctionalInterface
5352
public interface AcknowledgingShareConsumerAwareMessageListener<K, V> extends GenericMessageListener<ConsumerRecord<K, V>> {

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 18 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -161,21 +161,6 @@ public enum EOSMode {
161161

162162
}
163163

164-
/**
165-
* Acknowledgment mode for share consumer containers.
166-
*/
167-
public enum ShareAcknowledgmentMode {
168-
/**
169-
* Records are automatically acknowledged as ACCEPT on next poll, commitSync, or commitAsync.
170-
*/
171-
IMPLICIT,
172-
173-
/**
174-
* Application must explicitly acknowledge all records before next poll.
175-
*/
176-
EXPLICIT
177-
}
178-
179164
/**
180165
* The default {@link #setShutdownTimeout(long) shutDownTimeout} (ms).
181166
*/
@@ -329,9 +314,9 @@ public enum ShareAcknowledgmentMode {
329314

330315
private boolean recordObservationsInBatch;
331316

332-
private ShareAcknowledgmentMode shareAcknowledgmentMode = ShareAcknowledgmentMode.IMPLICIT;
317+
private boolean explicitShareAcknowledgment = false;
333318

334-
private Duration shareAcknowledgmentTimeout = Duration.ofSeconds(60); // 1 minute default
319+
private Duration shareAcknowledgmentTimeout = Duration.ofSeconds(30); // Align with Kafka's share.record.lock.duration.ms default
335320

336321
/**
337322
* Create properties for a container that will subscribe to the specified topics.
@@ -1136,34 +1121,30 @@ public void setRecordObservationsInBatch(boolean recordObservationsInBatch) {
11361121
}
11371122

11381123
/**
1139-
* Set the acknowledgment mode for share consumer containers.
1124+
* Set whether explicit acknowledgment is required for share consumer containers.
11401125
* <p>
11411126
* This setting only applies to share consumer containers and is ignored
1142-
* by regular consumer containers. The acknowledgment mode determines
1143-
* how records are acknowledged:
1144-
* <ul>
1145-
* <li>{@link ShareAcknowledgmentMode#IMPLICIT} - Records are automatically
1146-
* acknowledged as ACCEPT when the next poll occurs or when commitSync/commitAsync
1147-
* is called</li>
1148-
* <li>{@link ShareAcknowledgmentMode#EXPLICIT} - Application must explicitly
1149-
* acknowledge each record using the provided {@link ShareAcknowledgment}</li>
1150-
* </ul>
1151-
*
1152-
* @param shareAcknowledgmentMode the acknowledgment mode
1127+
* by regular consumer containers.
1128+
* <p>
1129+
* When set to {@code false} (default), records are automatically acknowledged
1130+
* as ACCEPT when the next poll occurs or when commitSync/commitAsync is called.
1131+
* <p>
1132+
* When set to {@code true}, the application must explicitly acknowledge each
1133+
* record using the provided {@link ShareAcknowledgment}.
1134+
* @param explicitShareAcknowledgment true for explicit acknowledgment, false for implicit
11531135
* @since 4.0
11541136
* @see ShareAcknowledgment
11551137
*/
1156-
public void setShareAcknowledgmentMode(ShareAcknowledgmentMode shareAcknowledgmentMode) {
1157-
this.shareAcknowledgmentMode = shareAcknowledgmentMode;
1138+
public void setExplicitShareAcknowledgment(boolean explicitShareAcknowledgment) {
1139+
this.explicitShareAcknowledgment = explicitShareAcknowledgment;
11581140
}
11591141

11601142
/**
1161-
* Get the acknowledgment mode for share consumer containers.
1162-
*
1163-
* @return the acknowledgment mode
1143+
* Check whether explicit acknowledgment is required for share consumer containers.
1144+
* @return true if explicit acknowledgment is required, false for implicit acknowledgment
11641145
*/
1165-
public ShareAcknowledgmentMode getShareAcknowledgmentMode() {
1166-
return this.shareAcknowledgmentMode;
1146+
public boolean isExplicitShareAcknowledgment() {
1147+
return this.explicitShareAcknowledgment;
11671148
}
11681149

11691150
/**
@@ -1173,8 +1154,7 @@ public ShareAcknowledgmentMode getShareAcknowledgmentMode() {
11731154
* will be logged to help identify missing acknowledgment calls.
11741155
* This only applies when using explicit acknowledgment mode.
11751156
* <p>
1176-
* Default is 60 seconds.
1177-
*
1157+
* Default is 30 seconds.
11781158
* @param shareAcknowledgmentTimeout the timeout duration
11791159
* @since 4.0
11801160
*/
@@ -1184,7 +1164,6 @@ public void setShareAcknowledgmentTimeout(Duration shareAcknowledgmentTimeout) {
11841164

11851165
/**
11861166
* Get the timeout for share acknowledgments in explicit mode.
1187-
*
11881167
* @return the acknowledgment timeout
11891168
* @since 4.0
11901169
*/

spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,11 @@
6969
*
7070
* @param <K> the key type
7171
* @param <V> the value type
72+
*
7273
* @author Soby Chacko
7374
* @since 4.0
7475
* @see ShareConsumer
7576
* @see ShareAcknowledgment
76-
* @see ContainerProperties.ShareAcknowledgmentMode
7777
*/
7878
public class ShareKafkaMessageListenerContainer<K, V>
7979
extends AbstractShareKafkaMessageListenerContainer<K, V> {
@@ -148,6 +148,16 @@ protected void doStart() {
148148
}
149149
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
150150
Assert.state(listener != null, "'messageListener' cannot be null");
151+
152+
// Validate listener type for explicit acknowledgment mode
153+
if (containerProperties.isExplicitShareAcknowledgment()) {
154+
boolean isAcknowledgingListener = listener instanceof AcknowledgingShareConsumerAwareMessageListener;
155+
Assert.state(isAcknowledgingListener,
156+
"Explicit acknowledgment mode requires an AcknowledgingShareConsumerAwareMessageListener. " +
157+
"Current listener type: " + listener.getClass().getName() + ". " +
158+
"Either use implicit acknowledgment mode or provide a listener that can handle acknowledgments.");
159+
}
160+
151161
this.listenerConsumer = new ShareListenerConsumer(listener);
152162
setRunning(true);
153163
this.listenerConsumerFuture = CompletableFuture.runAsync(this.listenerConsumer, consumerExecutor);
@@ -230,8 +240,7 @@ private class ShareListenerConsumer implements Runnable {
230240
ContainerProperties containerProperties = getContainerProperties();
231241

232242
// Configure acknowledgment mode
233-
this.isExplicitMode = containerProperties.getShareAcknowledgmentMode() ==
234-
ContainerProperties.ShareAcknowledgmentMode.EXPLICIT;
243+
this.isExplicitMode = containerProperties.isExplicitShareAcknowledgment();
235244
this.ackTimeoutMs = containerProperties.getShareAcknowledgmentTimeout().toMillis();
236245

237246
// Configure consumer properties based on acknowledgment mode

0 commit comments

Comments
 (0)