1010import java .util .List ;
1111import java .util .Properties ;
1212import java .util .UUID ;
13- import java .util .stream .Collectors ;
1413import java .util .stream .Stream ;
1514import lombok .extern .slf4j .Slf4j ;
1615import lombok .val ;
@@ -32,7 +31,6 @@ public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
3231 @ Test
3332 void shouldNotFoundWhenNoSuchConsumerGroupId () {
3433 String groupId = "groupA" ;
35- String expError = "The group id does not exist" ;
3634 webTestClient
3735 .delete ()
3836 .uri ("/api/clusters/{clusterName}/consumer-groups/{groupId}" , LOCAL , groupId )
@@ -47,12 +45,13 @@ void shouldOkWhenConsumerGroupIsNotActive() {
4745
4846 //Create a consumer and subscribe to the topic
4947 String groupId = UUID .randomUUID ().toString ();
50- val consumer = createTestConsumerWithGroupId (groupId );
51- consumer .subscribe (List .of (topicName ));
52- consumer .poll (Duration .ofMillis (100 ));
48+ try ( val consumer = createTestConsumerWithGroupId (groupId )) {
49+ consumer .subscribe (List .of (topicName ));
50+ consumer .poll (Duration .ofMillis (100 ));
5351
54- //Unsubscribe from all topics to be able to delete this consumer
55- consumer .unsubscribe ();
52+ //Unsubscribe from all topics to be able to delete this consumer
53+ consumer .unsubscribe ();
54+ }
5655
5756 //Delete the consumer when it's INACTIVE and check
5857 webTestClient
@@ -69,24 +68,24 @@ void shouldBeBadRequestWhenConsumerGroupIsActive() {
6968
7069 //Create a consumer and subscribe to the topic
7170 String groupId = UUID .randomUUID ().toString ();
72- val consumer = createTestConsumerWithGroupId (groupId );
73- consumer .subscribe (List .of (topicName ));
74- consumer .poll (Duration .ofMillis (100 ));
71+ try ( val consumer = createTestConsumerWithGroupId (groupId )) {
72+ consumer .subscribe (List .of (topicName ));
73+ consumer .poll (Duration .ofMillis (100 ));
7574
76- //Try to delete the consumer when it's ACTIVE
77- String expError = "The group is not empty" ;
78- webTestClient
79- . delete ( )
80- . uri ( "/api/clusters/{clusterName}/consumer-groups/{groupId}" , LOCAL , groupId )
81- . exchange ()
82- . expectStatus ()
83- . isBadRequest ();
75+ //Try to delete the consumer when it's ACTIVE
76+ webTestClient
77+ . delete ()
78+ . uri ( "/api/clusters/{clusterName}/consumer-groups/{groupId}" , LOCAL , groupId )
79+ . exchange ( )
80+ . expectStatus ()
81+ . isBadRequest ();
82+ }
8483 }
8584
8685 @ Test
8786 void shouldReturnConsumerGroupsWithPagination () throws Exception {
88- try (var groups1 = startConsumerGroups (3 , "cgPageTest1" );
89- var groups2 = startConsumerGroups (2 , "cgPageTest2" )) {
87+ try (var ignored = startConsumerGroups (3 , "cgPageTest1" );
88+ var ignored1 = startConsumerGroups (2 , "cgPageTest2" )) {
9089 webTestClient
9190 .get ()
9291 .uri ("/api/clusters/{clusterName}/consumer-groups/paged?perPage=3&search=cgPageTest" , LOCAL )
@@ -114,19 +113,19 @@ void shouldReturnConsumerGroupsWithPagination() throws Exception {
114113 });
115114
116115 webTestClient
117- .get ()
118- .uri ("/api/clusters/{clusterName}/consumer-groups/paged?perPage=10&&search"
119- + "=cgPageTest&orderBy=NAME&sortOrder=DESC" , LOCAL )
120- .exchange ()
121- .expectStatus ()
122- .isOk ()
123- .expectBody (ConsumerGroupsPageResponseDTO .class )
124- .value (page -> {
125- assertThat (page .getPageCount ()).isEqualTo (1 );
126- assertThat (page .getConsumerGroups ().size ()).isEqualTo (5 );
127- assertThat (page .getConsumerGroups ())
128- .isSortedAccordingTo (Comparator .comparing (ConsumerGroupDTO ::getGroupId ).reversed ());
129- });
116+ .get ()
117+ .uri ("/api/clusters/{clusterName}/consumer-groups/paged?perPage=10&&search"
118+ + "=cgPageTest&orderBy=NAME&sortOrder=DESC" , LOCAL )
119+ .exchange ()
120+ .expectStatus ()
121+ .isOk ()
122+ .expectBody (ConsumerGroupsPageResponseDTO .class )
123+ .value (page -> {
124+ assertThat (page .getPageCount ()).isEqualTo (1 );
125+ assertThat (page .getConsumerGroups ().size ()).isEqualTo (5 );
126+ assertThat (page .getConsumerGroups ())
127+ .isSortedAccordingTo (Comparator .comparing (ConsumerGroupDTO ::getGroupId ).reversed ());
128+ });
130129
131130 webTestClient
132131 .get ()
@@ -156,7 +155,7 @@ private Closeable startConsumerGroups(int count, String consumerGroupPrefix) {
156155 return consumer ;
157156 })
158157 .limit (count )
159- .collect ( Collectors . toList () );
158+ .toList ();
160159 return () -> {
161160 consumers .forEach (KafkaConsumer ::close );
162161 deleteTopic (topicName );
0 commit comments