Skip to content

Commit 3348103

Browse files
committed
Address PR feedback on concurrency implementation
- Use primitive int for concurrency in factory (consistent with phase field) - Remove unnecessary `getConcurrency()` getter (only used in trivial tests) - Use `HashMap` instead of `ConcurrentHashMap` in metrics() (already inside lock) - Use `CompletableFuture.allOf()` for cleaner shutdown coordination - Remove debug logging from tests (unnecessary noise in CI/CD) - Remove thread tracking from concurrency tests (over-complicates assertions) Clarify documentation based on KIP-932 specifications: - Add explicit note that concurrency is additive across application instances - Replace high-level distribution description with precise KIP-932 details - Document pull-based model, acquisition locks, and batch behavior - Explain `max.poll.records` as soft limit with complete batch preference - Set accurate expectations about broker-controlled record distribution Signed-off-by: Soby Chacko <[email protected]>
1 parent 83aaa6a commit 3348103

File tree

5 files changed

+46
-90
lines changed

5 files changed

+46
-90
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,23 @@ Each thread runs its own `ShareConsumer` instance that participates in the same
160160
Unlike traditional consumer groups where concurrency involves partition distribution, share consumers leverage Kafka's record-level distribution at the broker.
161161
This means multiple consumer threads in the same container work together as part of the share group, with the Kafka broker distributing records across all consumer instances.
162162

163+
[IMPORTANT]
164+
====
165+
**Concurrency is Additive Across Application Instances**
166+
167+
From the share group's perspective, each `ShareConsumer` instance is an independent member, regardless of where it runs.
168+
Setting `concurrency=3` in a single container creates 3 share group members.
169+
If you run multiple application instances with the same share group ID, all their consumer threads combine into one pool.
170+
171+
For example:
172+
* Application Instance 1: `concurrency=3` → 3 share group members
173+
* Application Instance 2: `concurrency=3` → 3 share group members
174+
* **Total**: 6 share group members available for the broker to distribute records to
175+
176+
This means setting `concurrency=5` in a single container is operationally equivalent to running 5 separate application instances with `concurrency=1` each (all using the same `group.id`).
177+
The Kafka broker treats all consumer instances equally and distributes records across the entire pool.
178+
====
179+
163180
==== Configuring Concurrency Programmatically
164181

165182
[source,java]
@@ -269,21 +286,31 @@ public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgm
269286

270287
[NOTE]
271288
====
272-
**Work Distribution Behavior:**
289+
**Record Acquisition and Distribution Behavior:**
290+
291+
Share consumers use a pull-based model where each consumer thread calls `poll()` to fetch records from the broker.
292+
When a consumer polls, the broker's share-partition leader:
293+
294+
* Selects records in "Available" state
295+
* Moves them to "Acquired" state with a time-limited acquisition lock (default 30 seconds, configurable via `group.share.record.lock.duration.ms`)
296+
* Prefers to return complete record batches for efficiency
297+
* Applies `max.poll.records` as a soft limit, meaning complete record batches will be acquired even if it exceeds this value
273298
274-
With share consumers, record distribution is controlled by Kafka's share group coordinator at the broker level, not by Spring for Apache Kafka.
275-
The broker may assign all records to a single consumer thread at any given time, especially when:
299+
While records are acquired by one consumer, they are not available to other consumers.
300+
When the acquisition lock expires, unacknowledged records automatically return to "Available" state and can be delivered to another consumer.
276301
277-
* The topic has a single partition
278-
* There's low message volume
279-
* The broker's distribution algorithm favors certain consumers
302+
The broker limits the number of records that can be acquired per partition using `group.share.partition.max.record.locks`.
303+
Once this limit is reached, subsequent polls temporarily return no records until locks expire.
280304
281-
This is normal behavior. The key benefit of concurrency is having multiple consumer threads *available* to the share group coordinator for distribution.
282-
As message volume increases or over time, you should see distribution across multiple threads.
305+
**Implications for Concurrency:**
283306
284-
This differs from traditional `ConcurrentMessageListenerContainer` where Spring explicitly distributes partitions across threads.
307+
* Each consumer thread independently polls and may acquire different numbers of records per poll
308+
* Record distribution across threads depends on polling timing and batch availability
309+
* Multiple threads increase the pool of consumers available to acquire records
310+
* With low message volume or single partitions, records may concentrate on fewer threads
311+
* For long-running workloads, distribution tends to be more even
285312
286-
When using concurrency with share consumers:
313+
**Configuration:**
287314
288315
* Each thread polls and processes records independently
289316
* Acknowledgment constraints apply per-thread (one thread's unacknowledged records don't block other threads)

spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import org.apache.kafka.clients.consumer.ConsumerConfig;
2424
import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
25-
import org.jspecify.annotations.Nullable;
2625

2726
import org.springframework.context.ApplicationContext;
2827
import org.springframework.context.ApplicationContextAware;
@@ -64,7 +63,7 @@ public class ShareKafkaListenerContainerFactory<K, V>
6463

6564
private int phase = 0;
6665

67-
private @Nullable Integer concurrency;
66+
private int concurrency = 1;
6867

6968
@SuppressWarnings("NullAway.Init")
7069
private ApplicationEventPublisher applicationEventPublisher;
@@ -113,7 +112,7 @@ public void setPhase(int phase) {
113112
* attribute on {@code @KafkaListener}.
114113
* @param concurrency the number of consumer threads (must be greater than 0)
115114
*/
116-
public void setConcurrency(Integer concurrency) {
115+
public void setConcurrency(int concurrency) {
117116
this.concurrency = concurrency;
118117
}
119118

@@ -162,7 +161,7 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
162161
if (conc != null) {
163162
instance.setConcurrency(conc);
164163
}
165-
else if (this.concurrency != null) {
164+
else {
166165
instance.setConcurrency(this.concurrency);
167166
}
168167

spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.ArrayList;
2020
import java.util.Arrays;
2121
import java.util.Collections;
22+
import java.util.HashMap;
2223
import java.util.List;
2324
import java.util.Map;
2425
import java.util.concurrent.CompletableFuture;
@@ -134,14 +135,6 @@ public void setClientId(String clientId) {
134135
this.clientId = clientId;
135136
}
136137

137-
/**
138-
* Get the concurrency level (number of consumer threads).
139-
* @return the concurrency level
140-
*/
141-
public int getConcurrency() {
142-
return this.concurrency;
143-
}
144-
145138
/**
146139
* Set the level of concurrency. This will create the specified number of
147140
* consumer threads, each with its own {@link ShareConsumer} instance.
@@ -169,7 +162,7 @@ public boolean isInExpectedState() {
169162
if (this.consumers.isEmpty()) {
170163
return Collections.emptyMap();
171164
}
172-
Map<String, Map<MetricName, ? extends Metric>> allMetrics = new ConcurrentHashMap<>();
165+
Map<String, Map<MetricName, ? extends Metric>> allMetrics = new HashMap<>();
173166
for (ShareListenerConsumer consumer : this.consumers) {
174167
Map<MetricName, ? extends Metric> consumerMetrics = consumer.consumer.metrics();
175168
String consumerId = consumer.getClientId();
@@ -239,17 +232,13 @@ protected void doStop() {
239232
// Wait for all consumer threads to complete
240233
this.lifecycleLock.lock();
241234
try {
242-
for (CompletableFuture<Void> future : this.consumerFutures) {
243-
try {
244-
future.join(); // Wait for consumer to finish
245-
}
246-
catch (Exception e) {
247-
this.logger.error(e, "Error waiting for consumer thread to stop");
248-
}
249-
}
235+
CompletableFuture.allOf(this.consumerFutures.toArray(new CompletableFuture<?>[0])).join();
250236
this.consumers.clear();
251237
this.consumerFutures.clear();
252238
}
239+
catch (Exception e) {
240+
this.logger.error(e, "Error waiting for consumer threads to stop");
241+
}
253242
finally {
254243
this.lifecycleLock.unlock();
255244
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.concurrent.atomic.AtomicReference;
3333
import java.util.function.Supplier;
3434

35-
import org.apache.commons.logging.LogFactory;
3635
import org.apache.kafka.clients.admin.Admin;
3736
import org.apache.kafka.clients.admin.AlterConfigOp;
3837
import org.apache.kafka.clients.admin.ConfigEntry;
@@ -84,8 +83,6 @@
8483
)
8584
class ShareKafkaMessageListenerContainerIntegrationTests {
8685

87-
private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(ShareKafkaMessageListenerContainerIntegrationTests.class));
88-
8986
@Test
9087
void integrationTestShareKafkaMessageListenerContainer(EmbeddedKafkaBroker broker) throws Exception {
9188
final String topic = "share-listener-integration-test";
@@ -856,12 +853,9 @@ void shouldProcessRecordsWithMultipleConsumerThreads(EmbeddedKafkaBroker broker)
856853

857854
ContainerProperties containerProps = new ContainerProperties(topic);
858855
CountDownLatch latch = new CountDownLatch(numRecords);
859-
Map<String, AtomicInteger> threadCounts = new ConcurrentHashMap<>();
860856
List<String> receivedValues = Collections.synchronizedList(new ArrayList<>());
861857

862858
containerProps.setMessageListener((MessageListener<String, String>) record -> {
863-
String threadName = Thread.currentThread().getName();
864-
threadCounts.computeIfAbsent(threadName, k -> new AtomicInteger()).incrementAndGet();
865859
receivedValues.add(record.value());
866860
latch.countDown();
867861
});
@@ -871,34 +865,14 @@ void shouldProcessRecordsWithMultipleConsumerThreads(EmbeddedKafkaBroker broker)
871865
container.setBeanName("concurrencyBasicTest");
872866
container.setConcurrency(concurrency);
873867

874-
assertThat(container.getConcurrency()).isEqualTo(concurrency);
875-
876868
container.start();
877869

878870
try {
879-
// Verify all records are processed
880871
assertThat(latch.await(30, TimeUnit.SECONDS))
881872
.as("All %d records should be processed", numRecords)
882873
.isTrue();
883874

884-
// Log thread distribution for debugging
885-
logger.info(() -> "Thread distribution: " + threadCounts);
886-
887-
// Verify all records received
888875
assertThat(receivedValues).hasSize(numRecords);
889-
890-
// Share consumers with single partition may use only one consumer at a time
891-
// So we verify: at least 1 thread used, at most concurrency threads
892-
assertThat(threadCounts.size())
893-
.as("At least one consumer thread should process records")
894-
.isGreaterThanOrEqualTo(1)
895-
.isLessThanOrEqualTo(concurrency);
896-
897-
// Verify work was done
898-
int totalProcessed = threadCounts.values().stream()
899-
.mapToInt(AtomicInteger::get)
900-
.sum();
901-
assertThat(totalProcessed).isEqualTo(numRecords);
902876
}
903877
finally {
904878
container.stop();
@@ -953,8 +927,6 @@ void shouldAggregateMetricsFromMultipleConsumers(EmbeddedKafkaBroker broker) thr
953927
.as("Client ID should contain bean name")
954928
.contains("concurrencyMetricsTest");
955929
}
956-
957-
logger.info(() -> "Client IDs from metrics: " + metrics.keySet());
958930
}
959931
finally {
960932
container.stop();
@@ -983,13 +955,9 @@ void shouldHandleConcurrencyWithExplicitAcknowledgment(EmbeddedKafkaBroker broke
983955
CountDownLatch latch = new CountDownLatch(numRecords);
984956
AtomicInteger acceptCount = new AtomicInteger();
985957
AtomicInteger rejectCount = new AtomicInteger();
986-
Map<String, AtomicInteger> threadCounts = new ConcurrentHashMap<>();
987958

988959
containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener<String, String>) (
989960
record, acknowledgment, consumer) -> {
990-
String threadName = Thread.currentThread().getName();
991-
threadCounts.computeIfAbsent(threadName, k -> new AtomicInteger()).incrementAndGet();
992-
993961
// Reject every 5th record, accept others
994962
int recordNum = Integer.parseInt(record.value().substring(5)); // "value0" -> 0
995963
if (recordNum % 5 == 0) {
@@ -1014,16 +982,6 @@ void shouldHandleConcurrencyWithExplicitAcknowledgment(EmbeddedKafkaBroker broke
1014982
.as("All records should be processed with explicit acknowledgment")
1015983
.isTrue();
1016984

1017-
logger.info(() -> "Thread distribution with explicit ack: " + threadCounts);
1018-
logger.info(() -> "Accept count: " + acceptCount.get() + ", Reject count: " + rejectCount.get());
1019-
1020-
// Verify at least one thread processed records
1021-
assertThat(threadCounts.size())
1022-
.as("At least one thread should process records in explicit mode")
1023-
.isGreaterThanOrEqualTo(1)
1024-
.isLessThanOrEqualTo(concurrency);
1025-
1026-
// Verify acknowledgments were processed correctly
1027985
assertThat(acceptCount.get() + rejectCount.get())
1028986
.as("Total acknowledgments should equal number of records")
1029987
.isEqualTo(numRecords);
@@ -1084,7 +1042,6 @@ void shouldStopAllConsumerThreadsGracefully(EmbeddedKafkaBroker broker) throws E
10841042
.untilAsserted(() -> assertThat(processedCount.get()).isGreaterThan(0));
10851043

10861044
int processedBeforeStop = processedCount.get();
1087-
logger.info(() -> "Processed " + processedBeforeStop + " records before stop");
10881045

10891046
// Stop the container
10901047
container.stop();
@@ -1112,8 +1069,6 @@ void shouldStopAllConsumerThreadsGracefully(EmbeddedKafkaBroker broker) throws E
11121069
.atMost(10, TimeUnit.SECONDS)
11131070
.untilAsserted(() -> assertThat(processedCount.get()).isGreaterThan(processedBeforeStop));
11141071

1115-
logger.info(() -> "Processed " + processedCount.get() + " records total after restart");
1116-
11171072
// Final stop
11181073
container.stop();
11191074
assertThat(container.isRunning()).isFalse();

spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -89,20 +89,6 @@ void shouldFailWhenExplicitModeUsedWithNonAcknowledgingListener() {
8989
.withMessageContaining("Explicit acknowledgment mode requires an AcknowledgingShareConsumerAwareMessageListener");
9090
}
9191

92-
@Test
93-
void shouldSetConcurrencyCorrectly() {
94-
ContainerProperties containerProperties = new ContainerProperties("test-topic");
95-
containerProperties.setMessageListener(messageListener);
96-
97-
ShareKafkaMessageListenerContainer<String, String> container =
98-
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties);
99-
100-
assertThat(container.getConcurrency()).isEqualTo(1); // Default is 1
101-
102-
container.setConcurrency(5);
103-
assertThat(container.getConcurrency()).isEqualTo(5);
104-
}
105-
10692
@Test
10793
void shouldRejectInvalidConcurrency() {
10894
ContainerProperties containerProperties = new ContainerProperties("test-topic");

0 commit comments

Comments
 (0)