Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
[[share-kafka-message-listener-container]]
=== ShareKafkaMessageListenerContainer

The `ShareKafkaMessageListenerContainer` provides a simple, single-threaded container for share consumers:
The `ShareKafkaMessageListenerContainer` provides a container for share consumers with support for concurrent processing:

[source,java]
----
Expand Down Expand Up @@ -151,6 +151,145 @@ Share consumers do not support:
* Manual offset management
====

[[share-container-concurrency]]
=== Concurrency

The `ShareKafkaMessageListenerContainer` supports concurrent processing by creating multiple consumer threads within a single container.
Each thread runs its own `ShareConsumer` instance that participates in the same share group.

Unlike traditional consumer groups where concurrency involves partition distribution, share consumers leverage Kafka's record-level distribution at the broker.
This means multiple consumer threads in the same container work together as part of the share group, with the Kafka broker distributing records across all consumer instances.

==== Configuring Concurrency Programmatically

[source,java]
----
@Bean
public ShareKafkaMessageListenerContainer<String, String> concurrentContainer(
ShareConsumerFactory<String, String> shareConsumerFactory) {

ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");

ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);

// Set concurrency to create 5 consumer threads
container.setConcurrency(5);

container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value());
}
});

return container;
}
----

==== Configuring Concurrency via Factory

You can set default concurrency at the factory level, which applies to all containers created by that factory:

[source,java]
----
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {

ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

// Set default concurrency for all containers created by this factory
factory.setConcurrency(3);

return factory;
}
----

==== Per-Listener Concurrency

The concurrency setting can be overridden per listener using the `concurrency` attribute:

[source,java]
----
@Component
public class ConcurrentShareListener {

@KafkaListener(
topics = "high-throughput-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group",
concurrency = "10" // Override factory default
)
public void listen(ConsumerRecord<String, String> record) {
// This listener will use 10 consumer threads
System.out.println("Processing: " + record.value());
}
}
----

==== Concurrency Considerations

* **Thread Safety**: Each consumer thread has its own `ShareConsumer` instance and manages its own acknowledgments independently
* **Client IDs**: Each consumer thread receives a unique client ID with a numeric suffix (e.g., `myContainer-0`, `myContainer-1`, etc.)
* **Metrics**: Metrics from all consumer threads are aggregated and accessible via `container.metrics()`
* **Lifecycle**: All consumer threads start and stop together as a unit
* **Work Distribution**: The Kafka broker handles record distribution across all consumer instances in the share group
* **Explicit Acknowledgment**: Each thread independently manages acknowledgments for its records; unacknowledged records in one thread don't block other threads

==== Concurrency with Explicit Acknowledgment

Concurrency works seamlessly with explicit acknowledgment mode.
Each consumer thread independently tracks and acknowledges its own records:

[source,java]
----
@KafkaListener(
topics = "order-queue",
containerFactory = "explicitShareKafkaListenerContainerFactory",
groupId = "order-processors",
concurrency = "5"
)
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
try {
// Process the order
processOrderLogic(record.value());
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}
----

[NOTE]
====
**Work Distribution Behavior:**

With share consumers, record distribution is controlled by Kafka's share group coordinator at the broker level, not by Spring for Apache Kafka.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that it works same way as in many other message broker implementations for queues: the next record is pushed down to the next available consumer?
Or is that done in batches?
Where do we control that batch size then? min.bytes? max.record, max.wait?

Thanks

The broker may assign all records to a single consumer thread at any given time, especially when:

* The topic has a single partition
* There's low message volume
* The broker's distribution algorithm favors certain consumers

This is normal behavior. The key benefit of concurrency is having multiple consumer threads *available* to the share group coordinator for distribution.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One sentence per line.

As message volume increases or over time, you should see distribution across multiple threads.

This differs from traditional `ConcurrentMessageListenerContainer` where Spring explicitly distributes partitions across threads.

When using concurrency with share consumers:

* Each thread polls and processes records independently
* Acknowledgment constraints apply per-thread (one thread's unacknowledged records don't block other threads)
* Concurrency setting must be greater than 0 and cannot be changed while the container is running
====

[[share-annotation-driven-listeners]]
== Annotation-Driven Listeners

Expand Down Expand Up @@ -520,8 +659,7 @@ Share consumers differ from regular consumers in several key ways:
=== Current Limitations

* **In preview**: This feature is in preview mode and may change in future versions
* **Single-Threaded**: Share consumer containers currently run in single-threaded mode
* **No Message Converters**: Message converters are not yet supported for share consumers
* **No Batch Listeners**: Batch processing is not supported with share consumers
* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls
* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls within each consumer thread

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
import org.jspecify.annotations.Nullable;

import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
Expand Down Expand Up @@ -63,6 +64,8 @@ public class ShareKafkaListenerContainerFactory<K, V>

private int phase = 0;

private @Nullable Integer concurrency;

@SuppressWarnings("NullAway.Init")
private ApplicationEventPublisher applicationEventPublisher;

Expand Down Expand Up @@ -98,6 +101,22 @@ public void setPhase(int phase) {
this.phase = phase;
}

/**
* Set the concurrency for containers created by this factory.
* <p>
* This specifies the number of consumer threads to create within each container.
* Each thread creates its own {@link org.apache.kafka.clients.consumer.ShareConsumer}
* instance and participates in the same share group. The Kafka broker distributes
* records across all consumer instances, providing record-level load balancing.
* <p>
* This can be overridden per listener endpoint using the {@code concurrency}
* attribute on {@code @KafkaListener}.
* @param concurrency the number of consumer threads (must be greater than 0)
*/
public void setConcurrency(Integer concurrency) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be a primitive.

this.concurrency = concurrency;
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -138,6 +157,15 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
boolean explicitAck = determineExplicitAcknowledgment(properties);
properties.setExplicitShareAcknowledgment(explicitAck);

// Set concurrency - endpoint setting takes precedence over factory setting
Integer conc = endpoint.getConcurrency();
if (conc != null) {
instance.setConcurrency(conc);
}
else if (this.concurrency != null) {
instance.setConcurrency(this.concurrency);
}

instance.setAutoStartup(effectiveAutoStartup);
instance.setPhase(this.phase);
instance.setApplicationContext(this.applicationContext);
Expand Down
Loading