Skip to content

Commit 1f5adc0

Browse files
committed
Addressing PR review
1 parent 1b2bd64 commit 1f5adc0

File tree

1 file changed

+64
-84
lines changed

1 file changed

+64
-84
lines changed

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java

Lines changed: 64 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,21 @@
1717
package org.springframework.kafka.core;
1818

1919
import java.time.Duration;
20-
import java.util.Arrays;
2120
import java.util.Collection;
2221
import java.util.Collections;
2322
import java.util.HashMap;
2423
import java.util.List;
2524
import java.util.Map;
2625
import java.util.Set;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.Future;
2729

2830
import org.apache.kafka.clients.admin.Admin;
2931
import org.apache.kafka.clients.admin.AdminClient;
3032
import org.apache.kafka.clients.admin.AlterConfigOp;
3133
import org.apache.kafka.clients.admin.ConfigEntry;
34+
import org.apache.kafka.clients.consumer.AcknowledgeType;
3235
import org.apache.kafka.clients.consumer.ShareConsumer;
3336
import org.apache.kafka.clients.producer.KafkaProducer;
3437
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -51,7 +54,7 @@
5154
* @since 4.0
5255
*/
5356
@EmbeddedKafka(
54-
topics = {"embedded-share-test", "embedded-share-multi-test", "embedded-share-distribution-test"}, partitions = 1,
57+
topics = {"embedded-share-test", "embedded-share-distribution-test"}, partitions = 1,
5558
brokerProperties = {
5659
"unstable.api.versions.enable=true",
5760
"group.coordinator.rebalance.protocols=classic,share",
@@ -160,21 +163,7 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro
160163
producer.send(new ProducerRecord<>(topic, "key", "integration-test-value")).get();
161164
}
162165

163-
Map<String, Object> adminProperties = new HashMap<>();
164-
adminProperties.put("bootstrap.servers", bootstrapServers);
165-
166-
// For this test: force new share groups to start from the beginning of the topic.
167-
// This is NOT the same as the usual consumer auto.offset.reset; it's a group config,
168-
// so use AdminClient to set share.auto.offset.reset = earliest for our test group.
169-
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest");
170-
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET);
171-
172-
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
173-
new ConfigResource(ConfigResource.Type.GROUP, "testGroup"), Arrays.asList(op));
174-
175-
try (Admin admin = AdminClient.create(adminProperties)) {
176-
admin.incrementalAlterConfigs(configs).all().get();
177-
}
166+
setShareAutoOffsetResetEarliest(bootstrapServers, groupId);
178167

179168
var consumerProps = new HashMap<String, Object>();
180169
consumerProps.put("bootstrap.servers", bootstrapServers);
@@ -196,41 +185,15 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro
196185
consumer.close();
197186
}
198187

199-
@Test
200-
void integrationTestMultipleSharedConsumers(EmbeddedKafkaBroker broker) throws Exception {
201-
final String topic = "embedded-share-multi-test";
202-
final String groupId = "multiTestGroup";
203-
int recordCount = 4;
204-
List<String> consumerIds = List.of("client-1", "client-2");
205-
Map<String, Set<String>> consumerRecords = runSharedConsumerTest(topic, groupId, consumerIds, recordCount, broker);
206-
207-
Set<String> allReceived = new java.util.HashSet<>();
208-
for (Set<String> records : consumerRecords.values()) {
209-
allReceived.addAll(records);
210-
}
211-
for (int i = 0; i < recordCount; i++) {
212-
assertThat(allReceived)
213-
.as("Should have received value " + topic + "-value-" + i)
214-
.contains(topic + "-value-" + i);
215-
}
216-
assertThat(allReceived.size()).isEqualTo(recordCount);
217-
}
218-
219188
@Test
220189
void integrationTestSharedConsumersDistribution(EmbeddedKafkaBroker broker) throws Exception {
221190
final String topic = "embedded-share-distribution-test";
222191
final String groupId = "distributionTestGroup";
223192
int recordCount = 8;
224193
List<String> consumerIds = List.of("client-dist-1", "client-dist-2");
225-
Map<String, Set<String>> consumerRecords = runSharedConsumerTest(topic, groupId, consumerIds, recordCount, broker);
226-
227-
// Assert each consumer received at least one record
228-
for (String id : consumerIds) {
229-
Set<String> records = consumerRecords.get(id);
230-
assertThat(records)
231-
.as("Consumer %s should have received at least one record", id)
232-
.isNotEmpty();
233-
}
194+
Map<String, Set<String>> consumerRecords = runSharedConsumerTest(topic, groupId, consumerIds,
195+
recordCount, broker);
196+
234197
// Assert all records were received (no loss)
235198
Set<String> allReceived = new java.util.HashSet<>();
236199
consumerRecords.values().forEach(allReceived::addAll);
@@ -239,78 +202,95 @@ void integrationTestSharedConsumersDistribution(EmbeddedKafkaBroker broker) thro
239202
.as("Should have received value " + topic + "-value-" + i)
240203
.contains(topic + "-value-" + i);
241204
}
242-
assertThat(allReceived.size()).isEqualTo(recordCount);
243205
}
244206

245-
private static Map<String, Set<String>> runSharedConsumerTest(String topic, String groupId, List<String> consumerIds, int recordCount, EmbeddedKafkaBroker broker) throws Exception {
207+
/**
208+
* Runs multiple Kafka consumers in parallel using ExecutorService, collects all records received,
209+
* and returns a map of consumerId to the set of record values received by that consumer.
210+
*/
211+
private static Map<String, Set<String>> runSharedConsumerTest(String topic, String groupId,
212+
List<String> consumerIds, int recordCount, EmbeddedKafkaBroker broker) throws Exception {
246213
var bootstrapServers = broker.getBrokersAsString();
247214

248215
var producerProps = new java.util.Properties();
249216
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
250217
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
251218
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
252-
253219
try (var producer = new KafkaProducer<String, String>(producerProps)) {
254220
for (int i = 0; i < recordCount; i++) {
255221
producer.send(new ProducerRecord<>(topic, "key" + i, topic + "-value-" + i)).get();
256222
}
223+
producer.flush();
257224
}
258225

259-
Map<String, Object> adminProperties = Map.of("bootstrap.servers", bootstrapServers);
260-
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest");
261-
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET);
262-
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
263-
new ConfigResource(ConfigResource.Type.GROUP, groupId), List.of(op));
264-
try (Admin admin = AdminClient.create(adminProperties)) {
265-
admin.incrementalAlterConfigs(configs).all().get();
266-
}
267-
268-
var consumerProps = new HashMap<String, Object>();
269-
consumerProps.put("bootstrap.servers", bootstrapServers);
270-
consumerProps.put("key.deserializer", StringDeserializer.class);
271-
consumerProps.put("value.deserializer", StringDeserializer.class);
272-
consumerProps.put("group.id", groupId);
273-
274-
DefaultShareConsumerFactory<String, String> factory = new DefaultShareConsumerFactory<>(consumerProps);
275-
var consumers = consumerIds.stream()
276-
.map(id -> factory.createShareConsumer(groupId, id))
277-
.toList();
278-
consumers.forEach(c -> c.subscribe(Collections.singletonList(topic)));
226+
setShareAutoOffsetResetEarliest(bootstrapServers, groupId);
279227

280228
Map<String, Set<String>> consumerRecords = new java.util.concurrent.ConcurrentHashMap<>();
281-
consumerIds.forEach(id -> consumerRecords.put(id, java.util.Collections.synchronizedSet(new java.util.HashSet<>())));
229+
consumerIds.forEach(id -> consumerRecords.put(id,
230+
java.util.Collections.synchronizedSet(new java.util.HashSet<>())));
282231
var latch = new java.util.concurrent.CountDownLatch(recordCount);
283232
var running = new java.util.concurrent.atomic.AtomicBoolean(true);
284-
List<Thread> threads = new java.util.ArrayList<>();
233+
ExecutorService executor = Executors.newCachedThreadPool();
234+
List<Future<?>> futures = new java.util.ArrayList<>();
285235

286-
for (int i = 0; i < consumers.size(); i++) {
236+
// Consumer task: poll, acknowledge, and count down latch for new records
237+
for (int i = 0; i < consumerIds.size(); i++) {
287238
final int idx = i;
288-
Thread t = new Thread(() -> {
289-
try (var consumer = consumers.get(idx)) {
290-
var id = consumerIds.get(idx);
239+
Future<?> future = executor.submit(() -> {
240+
DefaultShareConsumerFactory<String, String> shareConsumerFactory = new DefaultShareConsumerFactory<>(
241+
Map.of(
242+
"bootstrap.servers", bootstrapServers,
243+
"key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class,
244+
"value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class
245+
));
246+
try (var consumer = shareConsumerFactory
247+
.createShareConsumer(groupId, consumerIds.get(idx))) {
248+
consumer.subscribe(Collections.singletonList(topic));
291249
while (running.get() && latch.getCount() > 0) {
292250
var records = consumer.poll(Duration.ofMillis(200));
293251
for (var r : records) {
294-
if (consumerRecords.get(id).add(r.value())) {
252+
if (consumerRecords.get(consumerIds.get(idx)).add(r.value())) {
253+
consumer.acknowledge(r, AcknowledgeType.ACCEPT);
295254
latch.countDown();
296255
}
297256
}
298257
}
299258
}
300259
});
301-
threads.add(t);
302-
t.start();
260+
futures.add(future);
303261
}
304262

305-
boolean completed = latch.await(5, java.util.concurrent.TimeUnit.SECONDS);
263+
boolean completed = latch.await(10, java.util.concurrent.TimeUnit.SECONDS);
306264
running.set(false);
307-
for (Thread t : threads) {
308-
t.join();
309-
}
310-
if (!completed) {
311-
throw new AssertionError("All records should be received within timeout");
265+
for (Future<?> future : futures) {
266+
try {
267+
future.get();
268+
}
269+
catch (Exception e) {
270+
throw new RuntimeException(e);
271+
}
312272
}
273+
executor.shutdown();
274+
assertThat(completed)
275+
.as("All records should be received within timeout")
276+
.isTrue();
313277
return consumerRecords;
314278
}
315279

280+
/**
281+
* Sets the share.auto.offset.reset group config to earliest for the given groupId,
282+
* using the provided bootstrapServers.
283+
*/
284+
private static void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception {
285+
Map<String, Object> adminProperties = new HashMap<>();
286+
adminProperties.put("bootstrap.servers", bootstrapServers);
287+
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest");
288+
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET);
289+
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
290+
new ConfigResource(ConfigResource.Type.GROUP, groupId), List.of(op));
291+
try (Admin admin = AdminClient.create(adminProperties)) {
292+
admin.incrementalAlterConfigs(configs).all().get();
293+
}
294+
}
295+
316296
}

0 commit comments

Comments
 (0)