Conversation
There was a problem hiding this comment.
Pull request overview
Adds backend support for filtering consumer groups by their Kafka state (e.g., STABLE, EMPTY) in the paged list and CSV export endpoints, aligning with issue #1476.
Changes:
- Extended the Consumer Groups API contract to accept a
statequery parameter (multi-value). - Wired the new
statequery parameter through the controller intoConsumerGroupService. - Added unit and integration tests covering state-based filtering scenarios.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| contract-typespec/api/consumer-groups.tsp | Adds state query param to paged + CSV consumer-group endpoints in the API contract. |
| api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java | Plumbs the new state query param into service calls for paged + CSV endpoints. |
| api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java | Implements state filtering logic for consumer group listings. |
| api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java | Adds unit test validating filtering behavior for single/multi state selections. |
| api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java | Adds integration test validating HTTP query-param filtering by state. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| .map(this::mapToKafkaState) | ||
| .collect(Collectors.toSet()); | ||
| return groups.stream() | ||
| .filter(cg -> kafkaStates.contains(cg.state().orElse(ConsumerGroupState.UNKNOWN))) |
There was a problem hiding this comment.
State filtering relies on ConsumerGroupListing.state() which is optional and may be empty depending on broker/client capabilities. When state() is empty, this code treats it as UNKNOWN, causing valid groups (e.g., STABLE/EMPTY) to be incorrectly excluded whenever a state filter is applied. Consider filtering based on ConsumerGroupDescription.state() (you already fetch descriptions in loadSortedDescriptions) or otherwise ensuring the listing state is populated before filtering.
| .filter(cg -> kafkaStates.contains(cg.state().orElse(ConsumerGroupState.UNKNOWN))) | |
| .filter(cg -> cg.state().map(kafkaStates::contains).orElse(true)) |
| // Create consumers, poll, then close to create EMPTY groups | ||
| try (val consumer1 = createTestConsumerWithGroupId(emptyGroup1)) { | ||
| consumer1.subscribe(List.of(topicName)); | ||
| consumer1.poll(Duration.ofMillis(100)); | ||
| } | ||
| try (val consumer2 = createTestConsumerWithGroupId(emptyGroup2)) { | ||
| consumer2.subscribe(List.of(topicName)); | ||
| consumer2.poll(Duration.ofMillis(100)); | ||
| } |
There was a problem hiding this comment.
This test assumes consumer groups will immediately appear in the expected state after poll()/close. Consumer group creation and state transitions are asynchronous and can take time, so the immediate assertions can be flaky. Consider using Awaitility (already used in other integration tests) to wait until the expected groups are visible (and in the expected state) before asserting, and/or explicitly committing offsets/unsubscribing to ensure the groups reliably end up in EMPTY state.
There was a problem hiding this comment.
@copilot open a new pull request to apply changes based on this feedback
|
@germanosin I've opened a new pull request, #1694, to work on those changes. Once the pull request is ready, I'll request review from you. |
What changes did you make? (Give an overview)
Added backend filters for consumers by their states. backend for #1476
Is there anything you'd like reviewers to focus on?
How Has This Been Tested? (put an "x" (case-sensitive!) next to an item)
Checklist (put an "x" (case-sensitive!) next to all the items, otherwise the build will fail)
Check out Contributing and Code of Conduct
A picture of a cute animal (not mandatory but encouraged)