Skip to content

Commit fe4d566

Browse files
committed
Addressing PR review
Signed-off-by: Soby Chacko <[email protected]>
1 parent 0b8a1b1 commit fe4d566

File tree

1 file changed

+24
-25
lines changed

1 file changed

+24
-25
lines changed

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717
package org.springframework.kafka.core;
1818

1919
import java.time.Duration;
20+
import java.util.ArrayList;
2021
import java.util.Collection;
2122
import java.util.Collections;
2223
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
25-
import java.util.Set;
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
2828
import java.util.concurrent.TimeUnit;
@@ -187,28 +187,32 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro
187187

188188
@Test
189189
void integrationTestSharedConsumersDistribution(EmbeddedKafkaBroker broker) throws Exception {
190-
final String topic = "embedded-share-distribution-test";
190+
String topic = "shared-consumer-dist-test";
191191
final String groupId = "distributionTestGroup";
192192
int recordCount = 8;
193193
List<String> consumerIds = List.of("client-dist-1", "client-dist-2");
194-
Map<String, Set<String>> consumerRecords = runSharedConsumerTest(topic, groupId, consumerIds,
195-
recordCount, broker);
194+
List<String> allReceived = runSharedConsumerTest(topic, groupId, consumerIds, recordCount, broker);
196195

197-
// Assert all records were received (no loss)
198-
Set<String> allReceived = new java.util.HashSet<>();
199-
consumerRecords.values().forEach(allReceived::addAll);
200-
for (int i = 0; i < recordCount; i++) {
201-
assertThat(allReceived)
202-
.as("Should have received value " + topic + "-value-" + i)
203-
.contains(topic + "-value-" + i);
204-
}
196+
// Assert all records were received (no loss and no duplicates)
197+
assertThat(allReceived)
198+
.containsExactlyInAnyOrder(
199+
topic + "-value-0",
200+
topic + "-value-1",
201+
topic + "-value-2",
202+
topic + "-value-3",
203+
topic + "-value-4",
204+
topic + "-value-5",
205+
topic + "-value-6",
206+
topic + "-value-7"
207+
)
208+
.doesNotHaveDuplicates();
205209
}
206210

207211
/**
208212
* Runs multiple Kafka consumers in parallel using ExecutorService, collects all records received,
209-
* and returns a map of consumerId to the set of record values received by that consumer.
213+
* and returns a list of all record values received by all consumers.
210214
*/
211-
private static Map<String, Set<String>> runSharedConsumerTest(String topic, String groupId,
215+
private static List<String> runSharedConsumerTest(String topic, String groupId,
212216
List<String> consumerIds, int recordCount, EmbeddedKafkaBroker broker) throws Exception {
213217
var bootstrapServers = broker.getBrokersAsString();
214218

@@ -225,11 +229,8 @@ private static Map<String, Set<String>> runSharedConsumerTest(String topic, Stri
225229

226230
setShareAutoOffsetResetEarliest(bootstrapServers, groupId);
227231

228-
Map<String, Set<String>> consumerRecords = new java.util.concurrent.ConcurrentHashMap<>();
229-
consumerIds.forEach(id -> consumerRecords.put(id,
230-
java.util.Collections.synchronizedSet(new java.util.HashSet<>())));
232+
List<String> allReceived = Collections.synchronizedList(new ArrayList<>());
231233
var latch = new java.util.concurrent.CountDownLatch(recordCount);
232-
var running = new java.util.concurrent.atomic.AtomicBoolean(true);
233234
ExecutorService executor = Executors.newCachedThreadPool();
234235
DefaultShareConsumerFactory<String, String> shareConsumerFactory = new DefaultShareConsumerFactory<>(
235236
Map.of(
@@ -243,13 +244,12 @@ private static Map<String, Set<String>> runSharedConsumerTest(String topic, Stri
243244
executor.submit(() -> {
244245
try (var consumer = shareConsumerFactory.createShareConsumer(groupId, consumerIds.get(idx))) {
245246
consumer.subscribe(Collections.singletonList(topic));
246-
while (running.get() && latch.getCount() > 0) {
247+
while (latch.getCount() > 0) {
247248
var records = consumer.poll(Duration.ofMillis(200));
248249
for (var r : records) {
249-
if (consumerRecords.get(consumerIds.get(idx)).add(r.value())) {
250-
consumer.acknowledge(r, AcknowledgeType.ACCEPT);
251-
latch.countDown();
252-
}
250+
allReceived.add(r.value());
251+
consumer.acknowledge(r, AcknowledgeType.ACCEPT);
252+
latch.countDown();
253253
}
254254
}
255255
}
@@ -259,12 +259,11 @@ private static Map<String, Set<String>> runSharedConsumerTest(String topic, Stri
259259
assertThat(latch.await(10, TimeUnit.SECONDS))
260260
.as("All records should be received within timeout")
261261
.isTrue();
262-
running.set(false);
263262
executor.shutdown();
264263
assertThat(executor.awaitTermination(10, TimeUnit.SECONDS))
265264
.as("Executor should terminate after shutdown")
266265
.isTrue();
267-
return consumerRecords;
266+
return allReceived;
268267
}
269268

270269
/**

0 commit comments

Comments
 (0)