@@ -250,16 +250,13 @@ private AsyncKafkaConsumer<String, String> newConsumer(
250
250
FetchBuffer fetchBuffer ,
251
251
ConsumerInterceptors <String , String > interceptors ,
252
252
ConsumerRebalanceListenerInvoker rebalanceListenerInvoker ,
253
- SubscriptionState subscriptions ,
254
- String groupId ,
255
- String clientId ,
256
- boolean autoCommitEnabled ) {
253
+ SubscriptionState subscriptions ) {
257
254
long retryBackoffMs = 100L ;
258
255
int requestTimeoutMs = 30000 ;
259
256
int defaultApiTimeoutMs = 1000 ;
260
257
return new AsyncKafkaConsumer <>(
261
258
new LogContext (),
262
- clientId ,
259
+ "client-id" ,
263
260
new Deserializers <>(new StringDeserializer (), new StringDeserializer (), metrics ),
264
261
fetchBuffer ,
265
262
fetchCollector ,
@@ -275,8 +272,8 @@ private AsyncKafkaConsumer<String, String> newConsumer(
275
272
retryBackoffMs ,
276
273
requestTimeoutMs ,
277
274
defaultApiTimeoutMs ,
278
- groupId ,
279
- autoCommitEnabled );
275
+ "group-id" ,
276
+ false );
280
277
}
281
278
282
279
@ Test
@@ -707,10 +704,7 @@ public void testCloseLeavesGroup(long timeoutMs) {
707
704
mock (FetchBuffer .class ),
708
705
mock (ConsumerInterceptors .class ),
709
706
mock (ConsumerRebalanceListenerInvoker .class ),
710
- subscriptions ,
711
- "group-id" ,
712
- "client-id" ,
713
- false ));
707
+ subscriptions ));
714
708
consumer .close (CloseOptions .timeout (Duration .ofMillis (timeoutMs )));
715
709
verify (applicationEventHandler ).addAndGet (any (LeaveGroupOnCloseEvent .class ));
716
710
}
@@ -730,10 +724,7 @@ public void testCloseLeavesGroupDespiteOnPartitionsLostError() {
730
724
mock (FetchBuffer .class ),
731
725
new ConsumerInterceptors <>(Collections .emptyList (), metrics ),
732
726
invoker ,
733
- subscriptions ,
734
- "group-id" ,
735
- "client-id" ,
736
- false ));
727
+ subscriptions ));
737
728
consumer .setGroupAssignmentSnapshot (partitions );
738
729
739
730
Throwable t = assertThrows (KafkaException .class , () -> consumer .close (CloseOptions .timeout (Duration .ZERO )));
@@ -754,10 +745,7 @@ public void testCloseLeavesGroupDespiteInterrupt(long timeoutMs) {
754
745
mock (FetchBuffer .class ),
755
746
mock (ConsumerInterceptors .class ),
756
747
mock (ConsumerRebalanceListenerInvoker .class ),
757
- subscriptions ,
758
- "group-id" ,
759
- "client-id" ,
760
- false ));
748
+ subscriptions ));
761
749
762
750
Duration timeout = Duration .ofMillis (timeoutMs );
763
751
@@ -778,10 +766,7 @@ public void testCommitSyncAllConsumed() {
778
766
mock (FetchBuffer .class ),
779
767
mock (ConsumerInterceptors .class ),
780
768
mock (ConsumerRebalanceListenerInvoker .class ),
781
- subscriptions ,
782
- "group-id" ,
783
- "client-id" ,
784
- false );
769
+ subscriptions );
785
770
completeTopicSubscriptionChangeEventSuccessfully ();
786
771
consumer .subscribe (singleton ("topic" ), mock (ConsumerRebalanceListener .class ));
787
772
subscriptions .assignFromSubscribed (singleton (new TopicPartition ("topic" , 0 )));
@@ -803,10 +788,7 @@ public void testAutoCommitSyncDisabled() {
803
788
mock (FetchBuffer .class ),
804
789
mock (ConsumerInterceptors .class ),
805
790
mock (ConsumerRebalanceListenerInvoker .class ),
806
- subscriptions ,
807
- "group-id" ,
808
- "client-id" ,
809
- false );
791
+ subscriptions );
810
792
completeTopicSubscriptionChangeEventSuccessfully ();
811
793
consumer .subscribe (singleton ("topic" ), mock (ConsumerRebalanceListener .class ));
812
794
subscriptions .assignFromSubscribed (singleton (new TopicPartition ("topic" , 0 )));
@@ -1651,10 +1633,7 @@ public void testEnsurePollEventSentOnConsumerPoll() {
1651
1633
mock (FetchBuffer .class ),
1652
1634
new ConsumerInterceptors <>(Collections .emptyList (), metrics ),
1653
1635
mock (ConsumerRebalanceListenerInvoker .class ),
1654
- subscriptions ,
1655
- "group-id" ,
1656
- "client-id" ,
1657
- false );
1636
+ subscriptions );
1658
1637
final TopicPartition tp = new TopicPartition ("topic" , 0 );
1659
1638
final List <ConsumerRecord <String , String >> records = singletonList (
1660
1639
new ConsumerRecord <>("topic" , 0 , 2 , "key1" , "value1" ));
@@ -2034,10 +2013,7 @@ public void testRecordBackgroundEventQueueSizeAndBackgroundEventQueueTime() {
2034
2013
mock (FetchBuffer .class ),
2035
2014
mock (ConsumerInterceptors .class ),
2036
2015
mock (ConsumerRebalanceListenerInvoker .class ),
2037
- mock (SubscriptionState .class ),
2038
- "group-id" ,
2039
- "client-id" ,
2040
- false );
2016
+ mock (SubscriptionState .class ));
2041
2017
Metrics metrics = consumer .metricsRegistry ();
2042
2018
AsyncConsumerMetrics kafkaConsumerMetrics = consumer .kafkaConsumerMetrics ();
2043
2019
0 commit comments