2929import org .apache .kafka .clients .admin .ListOffsetsResult ;
3030import org .apache .kafka .clients .admin .ListOffsetsResult .ListOffsetsResultInfo ;
3131import org .apache .kafka .clients .admin .ListShareGroupOffsetsResult ;
32+ import org .apache .kafka .clients .admin .ListStreamsGroupOffsetsResult ;
3233import org .apache .kafka .clients .admin .OffsetSpec ;
3334import org .apache .kafka .clients .admin .SharePartitionOffsetInfo ;
3435import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
5354import org .skyscreamer .jsonassert .JSONAssert ;
5455import org .skyscreamer .jsonassert .JSONCompareMode ;
5556
57+ import com .github .streamshub .console .api .model .Group ;
5658import com .github .streamshub .console .api .support .Holder ;
5759import com .github .streamshub .console .api .support .Promises ;
5860import com .github .streamshub .console .config .ConsoleConfig ;
61+ import com .github .streamshub .console .config .security .GlobalSecurityConfigBuilder ;
62+ import com .github .streamshub .console .config .security .KafkaSecurityConfigBuilder ;
63+ import com .github .streamshub .console .config .security .Privilege ;
5964import com .github .streamshub .console .kafka .systemtest .TestPlainProfile ;
6065import com .github .streamshub .console .kafka .systemtest .utils .ConsumerUtils ;
6166import com .github .streamshub .console .kafka .systemtest .utils .ConsumerUtils .ConsumerType ;
67+ import com .github .streamshub .console .kafka .systemtest .utils .TokenUtils ;
6268import com .github .streamshub .console .support .Identifiers ;
6369import com .github .streamshub .console .test .AdminClientSpy ;
6470import com .github .streamshub .console .test .TestHelper ;
7177import io .quarkus .test .junit .TestProfile ;
7278import io .strimzi .api .kafka .model .kafka .Kafka ;
7379
80+ import static com .github .streamshub .console .test .EveryEntry .everyEntry ;
7481import static com .github .streamshub .console .test .TestHelper .whenRequesting ;
7582import static java .util .regex .Pattern .compile ;
7683import static org .awaitility .Awaitility .await ;
7784import static org .hamcrest .Matchers .allOf ;
85+ import static org .hamcrest .Matchers .anEmptyMap ;
7886import static org .hamcrest .Matchers .contains ;
7987import static org .hamcrest .Matchers .containsInAnyOrder ;
8088import static org .hamcrest .Matchers .everyItem ;
8492import static org .hamcrest .Matchers .hasSize ;
8593import static org .hamcrest .Matchers .is ;
8694import static org .hamcrest .Matchers .matchesPattern ;
95+ import static org .hamcrest .Matchers .not ;
8796import static org .hamcrest .Matchers .notNullValue ;
8897import static org .hamcrest .Matchers .nullValue ;
8998import static org .hamcrest .Matchers .startsWith ;
92101import static org .mockito .ArgumentMatchers .anyCollection ;
93102import static org .mockito .ArgumentMatchers .anyMap ;
94103import static org .mockito .Mockito .doAnswer ;
104+ import static org .mockito .Mockito .when ;
95105
96106@ QuarkusTest
97107@ TestHTTPEndpoint (GroupsResource .class )
@@ -451,7 +461,7 @@ void testListConsumerGroupsWithDescribeError() {
451461 "SHARE, " , // startOffset for share groups is not reliable so we just check for a numeric value
452462 "STREAMS, 5"
453463 })
454- void testDescribeGroupDefault (ConsumerType consumerType , Integer expectedOffset ) {
464+ void testDescribeGroupExtendedFields (ConsumerType consumerType , Integer expectedOffset ) {
455465 String topic1 = "t1-" + consumerType .name () + "-" + UUID .randomUUID ().toString ();
456466 String group1 = "g1-" + consumerType .name () + "-" + UUID .randomUUID ().toString ();
457467
@@ -467,7 +477,9 @@ void testDescribeGroupDefault(ConsumerType consumerType, Integer expectedOffset)
467477 .autoClose (false )
468478 .consume ()) {
469479
470- whenRequesting (req -> req .get ("{groupId}" , clusterId1 , Identifiers .encode (group1 )))
480+ whenRequesting (req -> req
481+ .param ("fields[groups]" , Group .Fields .DESCRIBE_DEFAULT + "," + Group .Fields .CONFIGS )
482+ .get ("{groupId}" , clusterId1 , Identifiers .encode (group1 )))
471483 .assertThat ()
472484 .statusCode (is (Status .OK .getStatusCode ()))
473485 .body ("data.attributes.groupId" , is (group1 ))
@@ -485,7 +497,17 @@ void testDescribeGroupDefault(ConsumerType consumerType, Integer expectedOffset)
485497 .body ("data.attributes.offsets[0].offset" , offsetMatcher )
486498 .body ("data.attributes.offsets[1].topicName" , is (topic1 ))
487499 .body ("data.attributes.offsets[1].partition" , is (1 ))
488- .body ("data.attributes.offsets[1].offset" , offsetMatcher );
500+ .body ("data.attributes.offsets[1].offset" , offsetMatcher )
501+ .body ("data.attributes.configs" , not (anEmptyMap ()))
502+ .body ("data.attributes.configs" , everyEntry (
503+ Matchers .any (String .class ), // any string key
504+ // all entries have same keys
505+ allOf (
506+ hasKey ("value" ),
507+ hasKey ("source" ),
508+ hasKey ("sensitive" ),
509+ hasKey ("readOnly" ),
510+ hasKey ("type" ))));
489511 }
490512 }
491513
@@ -585,6 +607,42 @@ void testDescribeShareGroupWithFetchGroupOffsetsError() throws Exception {
585607 }
586608 }
587609
610+ @ Test
611+ void testDescribeStreamsGroupWithFetchTopicOffsetsError () throws Exception {
612+ String topic1 = "t1-" + UUID .randomUUID ().toString ();
613+ String group1 = "g1-" + UUID .randomUUID ().toString ();
614+ String group1Id = Identifiers .encode (group1 );
615+ String client1 = "c1-" + UUID .randomUUID ().toString ();
616+
617+ Answer <ListStreamsGroupOffsetsResult > listOffsetsFailed = args -> {
618+ KafkaFutureImpl <Map <TopicPartition , OffsetAndMetadata >> failure = new KafkaFutureImpl <>();
619+ failure .completeExceptionally (new ApiException ("EXPECTED TEST EXCEPTION" ));
620+
621+ ListStreamsGroupOffsetsResult result = Mockito .mock (ListStreamsGroupOffsetsResult .class );
622+ when (result .partitionsToOffsetAndMetadata (group1 )).thenReturn (failure );
623+ return result ;
624+ };
625+
626+ AdminClientSpy .install (adminClient -> {
627+ // Mock listOffsets
628+ doAnswer (listOffsetsFailed )
629+ .when (adminClient )
630+ .listStreamsGroupOffsets (anyMap ());
631+ });
632+
633+ try (var consumer = groupUtils .consume (ConsumerType .STREAMS , group1 , topic1 , client1 , 2 , false )) {
634+ whenRequesting (req -> req
635+ .param ("fields[groups]" , "offsets" )
636+ .get ("{groupId}" , clusterId1 , group1Id ))
637+ .assertThat ()
638+ .statusCode (is (Status .OK .getStatusCode ()))
639+ .body ("data.id" , is (group1Id ))
640+ .body ("data.meta.errors" , hasSize (1 ))
641+ .body ("data.meta.errors.title" , everyItem (startsWith ("Unable to list group offsets" )))
642+ .body ("data.meta.errors.detail" , everyItem (is ("EXPECTED TEST EXCEPTION" )));
643+ }
644+ }
645+
588646 @ Test
589647 void testDescribeConsumerGroupWithFetchTopicOffsetsError () {
590648 Answer <ListOffsetsResult > listOffsetsFailed = args -> {
@@ -623,6 +681,82 @@ void testDescribeConsumerGroupWithFetchTopicOffsetsError() {
623681 }
624682 }
625683
684+ @ Test
685+ void testDescribeGroupConfigsWithLimitedAuthorization () {
686+ utils .resetSecurity (consoleConfig , true );
687+ TokenUtils tokens = new TokenUtils (config );
688+
689+ utils .updateSecurity (consoleConfig .getSecurity (), new GlobalSecurityConfigBuilder ()
690+ .addNewSubject ()
691+ .withInclude ("alice" )
692+ .withRoleNames ("limited-group-configs-role" )
693+ .endSubject ()
694+ .build ());
695+
696+ String topic1 = "t1-" + UUID .randomUUID ().toString ();
697+ String group1 = "g1-" + UUID .randomUUID ().toString ();
698+ String client1 = "c1-" + UUID .randomUUID ().toString ();
699+
700+ String topic2 = "t2-" + UUID .randomUUID ().toString ();
701+ String group2 = "g2-" + UUID .randomUUID ().toString ();
702+ String client2 = "c2-" + UUID .randomUUID ().toString ();
703+
704+ consoleConfig .getKafka ().getClusterById (clusterId1 ).ifPresent (cfg -> {
705+ cfg .setSecurity (new KafkaSecurityConfigBuilder ()
706+ .addNewRole ()
707+ .withName ("limited-group-configs-role" )
708+ .addNewRule ()
709+ .withResources ("groups" )
710+ .withResourceNames ("*" )
711+ .withPrivileges (Privilege .GET )
712+ .endRule ()
713+ .addNewRule ()
714+ .withResources ("groups/configs" )
715+ .withResourceNames (group1 )
716+ .withPrivileges (Privilege .GET )
717+ .endRule ()
718+ // access to group2 configs is not granted
719+ .endRole ()
720+ .build ());
721+ });
722+
723+ try (var consumer1 = groupUtils .consume (group1 , topic1 , client1 , 2 , false );
724+ var consumer2 = groupUtils .consume (group2 , topic2 , client2 , 2 , false )) {
725+ whenRequesting (req -> req
726+ .auth ()
727+ .oauth2 (tokens .getToken ("alice" ))
728+ .param ("fields[groups]" , "groupId,configs" )
729+ .get ("{groupId}" , clusterId1 , Identifiers .encode (group1 )))
730+ .assertThat ()
731+ .statusCode (is (Status .OK .getStatusCode ()))
732+ .body ("data.attributes.groupId" , is (group1 ))
733+ .body ("data.attributes.configs" , not (anEmptyMap ()))
734+ .body ("data.attributes.configs" , everyEntry (
735+ Matchers .any (String .class ), // any string key
736+ // all entries have same keys
737+ allOf (
738+ hasKey ("value" ),
739+ hasKey ("source" ),
740+ hasKey ("sensitive" ),
741+ hasKey ("readOnly" ),
742+ hasKey ("type" ))));
743+
744+ whenRequesting (req -> req
745+ .auth ()
746+ .oauth2 (tokens .getToken ("alice" ))
747+ .param ("fields[groups]" , "groupId,configs" )
748+ .get ("{groupId}" , clusterId1 , Identifiers .encode (group2 )))
749+ .assertThat ()
750+ .statusCode (is (Status .OK .getStatusCode ()))
751+ .body ("data.attributes.groupId" , is (group2 ))
752+ .body ("data.attributes.configs" , not (anEmptyMap ()))
753+ .body ("data.attributes.configs" , hasEntry (is ("meta" ), hasEntry ("type" , "error" )))
754+ .body ("data.attributes.configs" , allOf (
755+ hasEntry ("title" , "Unable to describe group configs" ),
756+ hasEntry (is ("detail" ), startsWith ("Access denied:" ))));
757+ }
758+ }
759+
626760 @ Test
627761 void testDeleteConsumerGroupWithMembers () {
628762 String topic1 = "t1-" + UUID .randomUUID ().toString ();
0 commit comments