1111import java .util .Properties ;
1212import java .util .UUID ;
1313import java .util .stream .Stream ;
14+ import io .kafbat .ui .producer .KafkaTestProducer ;
1415import lombok .extern .slf4j .Slf4j ;
1516import lombok .val ;
1617import org .apache .commons .lang3 .RandomStringUtils ;
2223import org .junit .jupiter .api .Test ;
2324import org .springframework .beans .factory .annotation .Autowired ;
2425import org .springframework .test .web .reactive .server .WebTestClient ;
26+ import reactor .core .publisher .Flux ;
27+ import reactor .core .publisher .Mono ;
2528
2629@ Slf4j
2730public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
@@ -31,12 +34,76 @@ public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
3134 @ Test
3235 void shouldNotFoundWhenNoSuchConsumerGroupId () {
3336 String groupId = "groupA" ;
37+ String topicName = "topicX" ;
38+
3439 webTestClient
3540 .delete ()
3641 .uri ("/api/clusters/{clusterName}/consumer-groups/{groupId}" , LOCAL , groupId )
3742 .exchange ()
3843 .expectStatus ()
3944 .isNotFound ();
45+
46+ webTestClient
47+ .delete ()
48+ .uri ("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}" , LOCAL , groupId , topicName )
49+ .exchange ()
50+ .expectStatus ()
51+ .isNotFound ();
52+ }
53+
54+ @ Test
55+ void shouldNotFoundWhenNoSuchTopic () {
56+ String topicName = createTopicWithRandomName ();
57+ String topicNameUnSubscribed = "topicX" ;
58+
59+ //Create a consumer and subscribe to the topic
60+ String groupId = UUID .randomUUID ().toString ();
61+ try (val consumer = createTestConsumerWithGroupId (groupId )) {
62+ consumer .subscribe (List .of (topicName ));
63+ consumer .poll (Duration .ofMillis (100 ));
64+
65+ webTestClient
66+ .delete ()
67+ .uri ("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}" , LOCAL , groupId ,
68+ topicNameUnSubscribed )
69+ .exchange ()
70+ .expectStatus ()
71+ .isNotFound ();
72+ }
73+ }
74+
75+ @ Test
76+ void shouldOkWhenConsumerGroupIsNotActiveAndPartitionOffsetExists () {
77+ String topicName = createTopicWithRandomName ();
78+
79+ //Create a consumer and subscribe to the topic
80+ String groupId = UUID .randomUUID ().toString ();
81+
82+ try (KafkaTestProducer <String , String > producer = KafkaTestProducer .forKafka (kafka )) {
83+ Flux .fromStream (
84+ Stream .of ("one" , "two" , "three" , "four" )
85+ .map (value -> Mono .fromFuture (producer .send (topicName , value )))
86+ ).blockLast ();
87+ } catch (Throwable e ) {
88+ log .error ("Error on sending" , e );
89+ throw new RuntimeException (e );
90+ }
91+
92+ try (val consumer = createTestConsumerWithGroupId (groupId )) {
93+ consumer .subscribe (List .of (topicName ));
94+ consumer .poll (Duration .ofMillis (100 ));
95+
96+ //Stop consumers to delete consumer offset from the topic
97+ consumer .pause (consumer .assignment ());
98+ }
99+
100+ //Delete the consumer offset when it's INACTIVE and check
101+ webTestClient
102+ .delete ()
103+ .uri ("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}" , LOCAL , groupId , topicName )
104+ .exchange ()
105+ .expectStatus ()
106+ .isOk ();
40107 }
41108
42109 @ Test
0 commit comments