2121import java .util .Collection ;
2222import java .util .Collections ;
2323import java .util .HashMap ;
24+ import java .util .List ;
2425import java .util .Map ;
26+ import java .util .Set ;
2527
2628import org .apache .kafka .clients .admin .Admin ;
2729import org .apache .kafka .clients .admin .AdminClient ;
4951 * @since 4.0
5052 */
5153@ EmbeddedKafka (
52- topics = {"embedded-share-test" }, partitions = 1 ,
54+ topics = {"embedded-share-test" , "embedded-share-multi-test" , "embedded-share-distribution-test" }, partitions = 1 ,
5355 brokerProperties = {
5456 "unstable.api.versions.enable=true" ,
5557 "group.coordinator.rebalance.protocols=classic,share" ,
@@ -144,7 +146,6 @@ void shouldReturnUnmodifiableListenersList() {
144146 }
145147
146148 @ Test
147- @ SuppressWarnings ("try" )
148149 void integrationTestDefaultShareConsumerFactory (EmbeddedKafkaBroker broker ) throws Exception {
149150 final String topic = "embedded-share-test" ;
150151 final String groupId = "testGroup" ;
@@ -165,16 +166,14 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro
165166 // For this test: force new share groups to start from the beginning of the topic.
166167 // This is NOT the same as the usual consumer auto.offset.reset; it's a group config,
167168 // so use AdminClient to set share.auto.offset.reset = earliest for our test group.
168- try (AdminClient ignored = AdminClient .create (adminProperties )) {
169- ConfigEntry entry = new ConfigEntry ("share.auto.offset.reset" , "earliest" );
170- AlterConfigOp op = new AlterConfigOp (entry , AlterConfigOp .OpType .SET );
169+ ConfigEntry entry = new ConfigEntry ("share.auto.offset.reset" , "earliest" );
170+ AlterConfigOp op = new AlterConfigOp (entry , AlterConfigOp .OpType .SET );
171171
172- Map <ConfigResource , Collection <AlterConfigOp >> configs = Map .of (
173- new ConfigResource (ConfigResource .Type .GROUP , "testGroup" ), Arrays .asList (op ));
172+ Map <ConfigResource , Collection <AlterConfigOp >> configs = Map .of (
173+ new ConfigResource (ConfigResource .Type .GROUP , "testGroup" ), Arrays .asList (op ));
174174
175- try (Admin admin = AdminClient .create (adminProperties )) {
176- admin .incrementalAlterConfigs (configs ).all ().get ();
177- }
175+ try (Admin admin = AdminClient .create (adminProperties )) {
176+ admin .incrementalAlterConfigs (configs ).all ().get ();
178177 }
179178
180179 var consumerProps = new HashMap <String , Object >();
@@ -197,4 +196,121 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro
197196 consumer .close ();
198197 }
199198
199+ @ Test
200+ void integrationTestMultipleSharedConsumers (EmbeddedKafkaBroker broker ) throws Exception {
201+ final String topic = "embedded-share-multi-test" ;
202+ final String groupId = "multiTestGroup" ;
203+ int recordCount = 4 ;
204+ List <String > consumerIds = List .of ("client-1" , "client-2" );
205+ Map <String , Set <String >> consumerRecords = runSharedConsumerTest (topic , groupId , consumerIds , recordCount , broker );
206+
207+ Set <String > allReceived = new java .util .HashSet <>();
208+ for (Set <String > records : consumerRecords .values ()) {
209+ allReceived .addAll (records );
210+ }
211+ for (int i = 0 ; i < recordCount ; i ++) {
212+ assertThat (allReceived )
213+ .as ("Should have received value " + topic + "-value-" + i )
214+ .contains (topic + "-value-" + i );
215+ }
216+ assertThat (allReceived .size ()).isEqualTo (recordCount );
217+ }
218+
219+ @ Test
220+ void integrationTestSharedConsumersDistribution (EmbeddedKafkaBroker broker ) throws Exception {
221+ final String topic = "embedded-share-distribution-test" ;
222+ final String groupId = "distributionTestGroup" ;
223+ int recordCount = 8 ;
224+ List <String > consumerIds = List .of ("client-dist-1" , "client-dist-2" );
225+ Map <String , Set <String >> consumerRecords = runSharedConsumerTest (topic , groupId , consumerIds , recordCount , broker );
226+
227+ // Assert each consumer received at least one record
228+ for (String id : consumerIds ) {
229+ Set <String > records = consumerRecords .get (id );
230+ assertThat (records )
231+ .as ("Consumer %s should have received at least one record" , id )
232+ .isNotEmpty ();
233+ }
234+ // Assert all records were received (no loss)
235+ Set <String > allReceived = new java .util .HashSet <>();
236+ consumerRecords .values ().forEach (allReceived ::addAll );
237+ for (int i = 0 ; i < recordCount ; i ++) {
238+ assertThat (allReceived )
239+ .as ("Should have received value " + topic + "-value-" + i )
240+ .contains (topic + "-value-" + i );
241+ }
242+ assertThat (allReceived .size ()).isEqualTo (recordCount );
243+ }
244+
245+ private static Map <String , Set <String >> runSharedConsumerTest (String topic , String groupId , List <String > consumerIds , int recordCount , EmbeddedKafkaBroker broker ) throws Exception {
246+ var bootstrapServers = broker .getBrokersAsString ();
247+
248+ var producerProps = new java .util .Properties ();
249+ producerProps .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
250+ producerProps .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , StringSerializer .class );
251+ producerProps .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer .class );
252+
253+ try (var producer = new KafkaProducer <String , String >(producerProps )) {
254+ for (int i = 0 ; i < recordCount ; i ++) {
255+ producer .send (new ProducerRecord <>(topic , "key" + i , topic + "-value-" + i )).get ();
256+ }
257+ }
258+
259+ Map <String , Object > adminProperties = Map .of ("bootstrap.servers" , bootstrapServers );
260+ ConfigEntry entry = new ConfigEntry ("share.auto.offset.reset" , "earliest" );
261+ AlterConfigOp op = new AlterConfigOp (entry , AlterConfigOp .OpType .SET );
262+ Map <ConfigResource , Collection <AlterConfigOp >> configs = Map .of (
263+ new ConfigResource (ConfigResource .Type .GROUP , groupId ), List .of (op ));
264+ try (Admin admin = AdminClient .create (adminProperties )) {
265+ admin .incrementalAlterConfigs (configs ).all ().get ();
266+ }
267+
268+ var consumerProps = new HashMap <String , Object >();
269+ consumerProps .put ("bootstrap.servers" , bootstrapServers );
270+ consumerProps .put ("key.deserializer" , StringDeserializer .class );
271+ consumerProps .put ("value.deserializer" , StringDeserializer .class );
272+ consumerProps .put ("group.id" , groupId );
273+
274+ DefaultShareConsumerFactory <String , String > factory = new DefaultShareConsumerFactory <>(consumerProps );
275+ var consumers = consumerIds .stream ()
276+ .map (id -> factory .createShareConsumer (groupId , id ))
277+ .toList ();
278+ consumers .forEach (c -> c .subscribe (Collections .singletonList (topic )));
279+
280+ Map <String , Set <String >> consumerRecords = new java .util .concurrent .ConcurrentHashMap <>();
281+ consumerIds .forEach (id -> consumerRecords .put (id , java .util .Collections .synchronizedSet (new java .util .HashSet <>())));
282+ var latch = new java .util .concurrent .CountDownLatch (recordCount );
283+ var running = new java .util .concurrent .atomic .AtomicBoolean (true );
284+ List <Thread > threads = new java .util .ArrayList <>();
285+
286+ for (int i = 0 ; i < consumers .size (); i ++) {
287+ final int idx = i ;
288+ Thread t = new Thread (() -> {
289+ try (var consumer = consumers .get (idx )) {
290+ var id = consumerIds .get (idx );
291+ while (running .get () && latch .getCount () > 0 ) {
292+ var records = consumer .poll (Duration .ofMillis (200 ));
293+ for (var r : records ) {
294+ if (consumerRecords .get (id ).add (r .value ())) {
295+ latch .countDown ();
296+ }
297+ }
298+ }
299+ }
300+ });
301+ threads .add (t );
302+ t .start ();
303+ }
304+
305+ boolean completed = latch .await (5 , java .util .concurrent .TimeUnit .SECONDS );
306+ running .set (false );
307+ for (Thread t : threads ) {
308+ t .join ();
309+ }
310+ if (!completed ) {
311+ throw new AssertionError ("All records should be received within timeout" );
312+ }
313+ return consumerRecords ;
314+ }
315+
200316}
0 commit comments