Skip to content

Commit 0c942d4

Browse files
garyrussellartembilan
authored andcommitted
GH-1485: Option to suppress single client.id
See #1485
1 parent 05e56ad commit 0c942d4

File tree

3 files changed

+35
-2
lines changed

3 files changed

+35
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis
5858

5959
private int concurrency = 1;
6060

61+
private boolean alwaysClientIdSuffix = true;
62+
6163
/**
6264
* Construct an instance with the supplied configuration properties.
6365
* The topic partitions are distributed evenly across the delegate
@@ -86,6 +88,16 @@ public void setConcurrency(int concurrency) {
8688
this.concurrency = concurrency;
8789
}
8890

91+
/**
92+
* Set to false to suppress adding a suffix to the child container's client.id when
93+
* the concurrency is only 1.
94+
* @param alwaysClientIdSuffix true to suppress.
95+
* @since 2.2.14
96+
*/
97+
public void setAlwaysClientIdSuffix(boolean alwaysClientIdSuffix) {
98+
this.alwaysClientIdSuffix = alwaysClientIdSuffix;
99+
}
100+
89101
/**
90102
* Return the list of {@link KafkaMessageListenerContainer}s created by
91103
* this container.
@@ -181,7 +193,7 @@ protected void doStart() {
181193
if (getApplicationEventPublisher() != null) {
182194
container.setApplicationEventPublisher(getApplicationEventPublisher());
183195
}
184-
container.setClientIdSuffix("-" + i);
196+
container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + i : "");
185197
container.setGenericErrorHandler(getGenericErrorHandler());
186198
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
187199
container.setRecordInterceptor(getRecordInterceptor());

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.BitSet;
2929
import java.util.Collection;
3030
import java.util.HashSet;
31+
import java.util.Iterator;
3132
import java.util.List;
3233
import java.util.Map;
3334
import java.util.Properties;
@@ -453,6 +454,7 @@ public void testManualCommitSyncExisting() throws Exception {
453454
latch.countDown();
454455
});
455456
containerProps.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
457+
containerProps.setClientId("myClientId");
456458

457459
ConcurrentMessageListenerContainer<Integer, String> container =
458460
new ConcurrentMessageListenerContainer<>(cf, containerProps);
@@ -467,6 +469,9 @@ public void testManualCommitSyncExisting() throws Exception {
467469
template.flush();
468470
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
469471
assertThat(bitSet.cardinality()).isEqualTo(8);
472+
Set<String> clientIds = container.getAssignmentsByClientId().keySet();
473+
assertThat(clientIds).hasSize(1);
474+
assertThat(clientIds.iterator().next()).isEqualTo("myClientId-0");
470475
container.stop();
471476
this.logger.info("Stop MANUAL_IMMEDIATE with Existing");
472477
}
@@ -483,10 +488,11 @@ public void testPausedStart() throws Exception {
483488
ConcurrentMessageListenerContainerTests.this.logger.info("paused start: " + message);
484489
latch.countDown();
485490
});
486-
491+
containerProps.setClientId("myClientId");
487492
ConcurrentMessageListenerContainer<Integer, String> container =
488493
new ConcurrentMessageListenerContainer<>(cf, containerProps);
489494
container.setConcurrency(2);
495+
container.setAlwaysClientIdSuffix(false);
490496
container.setBeanName("testBatch");
491497
container.pause();
492498
container.start();
@@ -505,6 +511,11 @@ public void testPausedStart() throws Exception {
505511
container.resume();
506512

507513
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
514+
Set<String> clientIds = container.getAssignmentsByClientId().keySet();
515+
assertThat(clientIds).hasSize(2);
516+
Iterator<String> iterator = clientIds.iterator();
517+
assertThat(iterator.next()).startsWith("myClientId-");
518+
assertThat(iterator.next()).startsWith("myClientId-");
508519
container.stop();
509520
this.logger.info("Stop paused start");
510521
}
@@ -704,9 +715,11 @@ private void testAckOnErrorWithManualImmediateGuts(String topic, boolean ackOnEr
704715
}
705716

706717
});
718+
containerProps.setClientId("myClientId");
707719
ConcurrentMessageListenerContainer<Integer, String> container = new ConcurrentMessageListenerContainer<>(cf,
708720
containerProps);
709721
container.setConcurrency(1);
722+
container.setAlwaysClientIdSuffix(false);
710723
container.setBeanName("testAckOnErrorWithManualImmediate");
711724
container.start();
712725
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
@@ -719,6 +732,9 @@ private void testAckOnErrorWithManualImmediateGuts(String topic, boolean ackOnEr
719732
template.sendDefault(0, 1, "bar");
720733
template.flush();
721734
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
735+
Set<String> clientIds = container.getAssignmentsByClientId().keySet();
736+
assertThat(clientIds).hasSize(1);
737+
assertThat(clientIds.iterator().next()).isEqualTo("myClientId");
722738
container.stop();
723739

724740
Consumer<Integer, String> consumer = cf.createConsumer();

src/reference/asciidoc/kafka.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2384,6 +2384,11 @@ Suffix
23842384
| Default
23852385
| Description
23862386

2387+
|alwaysClientId
2388+
Suffix
2389+
|`true`
2390+
|Set to false to suppress adding a suffix to the `client.id` consumer property, when the `concurrency` is only 1.
2391+
23872392
|assigned
23882393
Partitions
23892394
|(read only)

0 commit comments

Comments
 (0)