Skip to content

Commit 83aaa6a

Browse files
committed
Add concurrency support to ShareKafkaMessageListenerContainer
Share consumers (KIP-932) enable record-level load balancing where multiple consumers can cooperatively process from the same partitions. Unlike traditional consumer groups with exclusive partition ownership, share groups distribute work at the broker level via the share group coordinator. This commit adds native concurrency support to the existing `ShareKafkaMessageListenerContainer` rather than creating a separate `ConcurrentShareKafkaMessageListenerContainer`. This design choice avoids the parent/child container complexity that exists in the regular consumer model, since share consumers fundamentally operate differently: - Work distribution happens at the broker level, not at the Spring layer - Multiple threads simply provide more capacity for the broker to distribute records across - No partition ownership model to coordinate between child containers This approach provides: - Simpler architecture with a single container managing multiple threads - No parent/child context propagation concerns - Better alignment with share consumer semantics (record-level vs partition-level distribution) - Increased throughput for high-volume workloads - Better resource utilization across consumer threads Users can configure concurrency at three levels: 1. Per-listener via `@KafkaListener(concurrency = N)` 2. Factory-level default via `factory.setConcurrency(N)` 3. Programmatically via `container.setConcurrency(N)` The feature works seamlessly with both implicit (auto-acknowledge) and explicit (manual acknowledge/release/reject) acknowledgment modes, with each consumer thread independently managing its own acknowledgments. Signed-off-by: Soby Chacko <[email protected]>
1 parent 1972769 commit 83aaa6a

File tree

5 files changed

+595
-24
lines changed

5 files changed

+595
-24
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc

Lines changed: 141 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
107107
[[share-kafka-message-listener-container]]
108108
=== ShareKafkaMessageListenerContainer
109109

110-
The `ShareKafkaMessageListenerContainer` provides a simple, single-threaded container for share consumers:
110+
The `ShareKafkaMessageListenerContainer` provides a container for share consumers with support for concurrent processing:
111111

112112
[source,java]
113113
----
@@ -151,6 +151,145 @@ Share consumers do not support:
151151
* Manual offset management
152152
====
153153

154+
[[share-container-concurrency]]
155+
=== Concurrency
156+
157+
The `ShareKafkaMessageListenerContainer` supports concurrent processing by creating multiple consumer threads within a single container.
158+
Each thread runs its own `ShareConsumer` instance that participates in the same share group.
159+
160+
Unlike traditional consumer groups where concurrency involves partition distribution, share consumers leverage Kafka's record-level distribution at the broker.
161+
This means multiple consumer threads in the same container work together as part of the share group, with the Kafka broker distributing records across all consumer instances.
162+
163+
==== Configuring Concurrency Programmatically
164+
165+
[source,java]
166+
----
167+
@Bean
168+
public ShareKafkaMessageListenerContainer<String, String> concurrentContainer(
169+
ShareConsumerFactory<String, String> shareConsumerFactory) {
170+
171+
ContainerProperties containerProps = new ContainerProperties("my-topic");
172+
containerProps.setGroupId("my-share-group");
173+
174+
ShareKafkaMessageListenerContainer<String, String> container =
175+
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
176+
177+
// Set concurrency to create 5 consumer threads
178+
container.setConcurrency(5);
179+
180+
container.setupMessageListener(new MessageListener<String, String>() {
181+
@Override
182+
public void onMessage(ConsumerRecord<String, String> record) {
183+
System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value());
184+
}
185+
});
186+
187+
return container;
188+
}
189+
----
190+
191+
==== Configuring Concurrency via Factory
192+
193+
You can set default concurrency at the factory level, which applies to all containers created by that factory:
194+
195+
[source,java]
196+
----
197+
@Bean
198+
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
199+
ShareConsumerFactory<String, String> shareConsumerFactory) {
200+
201+
ShareKafkaListenerContainerFactory<String, String> factory =
202+
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
203+
204+
// Set default concurrency for all containers created by this factory
205+
factory.setConcurrency(3);
206+
207+
return factory;
208+
}
209+
----
210+
211+
==== Per-Listener Concurrency
212+
213+
The concurrency setting can be overridden per listener using the `concurrency` attribute:
214+
215+
[source,java]
216+
----
217+
@Component
218+
public class ConcurrentShareListener {
219+
220+
@KafkaListener(
221+
topics = "high-throughput-topic",
222+
containerFactory = "shareKafkaListenerContainerFactory",
223+
groupId = "my-share-group",
224+
concurrency = "10" // Override factory default
225+
)
226+
public void listen(ConsumerRecord<String, String> record) {
227+
// This listener will use 10 consumer threads
228+
System.out.println("Processing: " + record.value());
229+
}
230+
}
231+
----
232+
233+
==== Concurrency Considerations
234+
235+
* **Thread Safety**: Each consumer thread has its own `ShareConsumer` instance and manages its own acknowledgments independently
236+
* **Client IDs**: Each consumer thread receives a unique client ID with a numeric suffix (e.g., `myContainer-0`, `myContainer-1`, etc.)
237+
* **Metrics**: Metrics from all consumer threads are aggregated and accessible via `container.metrics()`
238+
* **Lifecycle**: All consumer threads start and stop together as a unit
239+
* **Work Distribution**: The Kafka broker handles record distribution across all consumer instances in the share group
240+
* **Explicit Acknowledgment**: Each thread independently manages acknowledgments for its records; unacknowledged records in one thread don't block other threads
241+
242+
==== Concurrency with Explicit Acknowledgment
243+
244+
Concurrency works seamlessly with explicit acknowledgment mode.
245+
Each consumer thread independently tracks and acknowledges its own records:
246+
247+
[source,java]
248+
----
249+
@KafkaListener(
250+
topics = "order-queue",
251+
containerFactory = "explicitShareKafkaListenerContainerFactory",
252+
groupId = "order-processors",
253+
concurrency = "5"
254+
)
255+
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
256+
try {
257+
// Process the order
258+
processOrderLogic(record.value());
259+
acknowledgment.acknowledge(); // ACCEPT
260+
}
261+
catch (RetryableException e) {
262+
acknowledgment.release(); // Will be redelivered
263+
}
264+
catch (Exception e) {
265+
acknowledgment.reject(); // Permanent failure
266+
}
267+
}
268+
----
269+
270+
[NOTE]
271+
====
272+
**Work Distribution Behavior:**
273+
274+
With share consumers, record distribution is controlled by Kafka's share group coordinator at the broker level, not by Spring for Apache Kafka.
275+
The broker may assign all records to a single consumer thread at any given time, especially when:
276+
277+
* The topic has a single partition
278+
* There's low message volume
279+
* The broker's distribution algorithm favors certain consumers
280+
281+
This is normal behavior. The key benefit of concurrency is having multiple consumer threads *available* to the share group coordinator for distribution.
282+
As message volume increases or over time, you should see distribution across multiple threads.
283+
284+
This differs from traditional `ConcurrentMessageListenerContainer` where Spring explicitly distributes partitions across threads.
285+
286+
When using concurrency with share consumers:
287+
288+
* Each thread polls and processes records independently
289+
* Acknowledgment constraints apply per-thread (one thread's unacknowledged records don't block other threads)
290+
* Concurrency setting must be greater than 0 and cannot be changed while the container is running
291+
====
292+
154293
[[share-annotation-driven-listeners]]
155294
== Annotation-Driven Listeners
156295

@@ -520,8 +659,7 @@ Share consumers differ from regular consumers in several key ways:
520659
=== Current Limitations
521660

522661
* **In preview**: This feature is in preview mode and may change in future versions
523-
* **Single-Threaded**: Share consumer containers currently run in single-threaded mode
524662
* **No Message Converters**: Message converters are not yet supported for share consumers
525663
* **No Batch Listeners**: Batch processing is not supported with share consumers
526-
* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls
664+
* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls within each consumer thread
527665

spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.apache.kafka.clients.consumer.ConsumerConfig;
2424
import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
25+
import org.jspecify.annotations.Nullable;
2526

2627
import org.springframework.context.ApplicationContext;
2728
import org.springframework.context.ApplicationContextAware;
@@ -63,6 +64,8 @@ public class ShareKafkaListenerContainerFactory<K, V>
6364

6465
private int phase = 0;
6566

67+
private @Nullable Integer concurrency;
68+
6669
@SuppressWarnings("NullAway.Init")
6770
private ApplicationEventPublisher applicationEventPublisher;
6871

@@ -98,6 +101,22 @@ public void setPhase(int phase) {
98101
this.phase = phase;
99102
}
100103

104+
/**
105+
* Set the concurrency for containers created by this factory.
106+
* <p>
107+
* This specifies the number of consumer threads to create within each container.
108+
* Each thread creates its own {@link org.apache.kafka.clients.consumer.ShareConsumer}
109+
* instance and participates in the same share group. The Kafka broker distributes
110+
* records across all consumer instances, providing record-level load balancing.
111+
* <p>
112+
* This can be overridden per listener endpoint using the {@code concurrency}
113+
* attribute on {@code @KafkaListener}.
114+
* @param concurrency the number of consumer threads (must be greater than 0)
115+
*/
116+
public void setConcurrency(Integer concurrency) {
117+
this.concurrency = concurrency;
118+
}
119+
101120
@Override
102121
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
103122
this.applicationEventPublisher = applicationEventPublisher;
@@ -138,6 +157,15 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
138157
boolean explicitAck = determineExplicitAcknowledgment(properties);
139158
properties.setExplicitShareAcknowledgment(explicitAck);
140159

160+
// Set concurrency - endpoint setting takes precedence over factory setting
161+
Integer conc = endpoint.getConcurrency();
162+
if (conc != null) {
163+
instance.setConcurrency(conc);
164+
}
165+
else if (this.concurrency != null) {
166+
instance.setConcurrency(this.concurrency);
167+
}
168+
141169
instance.setAutoStartup(effectiveAutoStartup);
142170
instance.setPhase(this.phase);
143171
instance.setApplicationContext(this.applicationContext);

0 commit comments

Comments
 (0)