Skip to content

Commit 2938c42

Browse files
authored
KAFKA-19754: Add RPC-level integration test for StreamsGroupDescribeRequest (#20632)
Test the `StreamsGroupDescribeRequest` RPC and corresponding responses for situations where - `streams.version` not upgraded to 1 - `streams.version` enabled, multiple groups listening to the same topic. Reviewers: Lucas Brutschy <[email protected]>
1 parent ebae768 commit 2938c42

File tree

2 files changed

+368
-2
lines changed

2 files changed

+368
-2
lines changed

core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
2626
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
2727
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment
2828
import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic}
29-
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
29+
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, StreamsGroupDescribeRequestData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
3030
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
31-
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
31+
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, StreamsGroupDescribeRequest, StreamsGroupDescribeResponse, StreamsGroupHeartbeatRequest, StreamsGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
3232
import org.apache.kafka.common.serialization.StringSerializer
3333
import org.apache.kafka.common.test.ClusterInstance
3434
import org.apache.kafka.common.utils.ProducerIdAndEpoch
@@ -768,6 +768,21 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
768768
shareGroupDescribeResponse.data.groups.asScala.toList
769769
}
770770

771+
protected def streamsGroupDescribe(
772+
groupIds: List[String],
773+
includeAuthorizedOperations: Boolean = false,
774+
version: Short = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)
775+
): List[StreamsGroupDescribeResponseData.DescribedGroup] = {
776+
val streamsGroupDescribeRequest = new StreamsGroupDescribeRequest.Builder(
777+
new StreamsGroupDescribeRequestData()
778+
.setGroupIds(groupIds.asJava)
779+
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
780+
).build(version)
781+
782+
val streamsGroupDescribeResponse = connectAndReceive[StreamsGroupDescribeResponse](streamsGroupDescribeRequest)
783+
streamsGroupDescribeResponse.data.groups.asScala.toList
784+
}
785+
771786
protected def heartbeat(
772787
groupId: String,
773788
generationId: Int,
@@ -855,6 +870,41 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
855870
shareGroupHeartbeatResponse.data
856871
}
857872

873+
protected def streamsGroupHeartbeat(
874+
groupId: String,
875+
memberId: String = "",
876+
memberEpoch: Int = 0,
877+
rebalanceTimeoutMs: Int = -1,
878+
activeTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
879+
standbyTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
880+
warmupTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
881+
topology: StreamsGroupHeartbeatRequestData.Topology = null,
882+
expectedError: Errors = Errors.NONE,
883+
version: Short = ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion(isUnstableApiEnabled)
884+
): StreamsGroupHeartbeatResponseData = {
885+
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder(
886+
new StreamsGroupHeartbeatRequestData()
887+
.setGroupId(groupId)
888+
.setMemberId(memberId)
889+
.setMemberEpoch(memberEpoch)
890+
.setRebalanceTimeoutMs(rebalanceTimeoutMs)
891+
.setActiveTasks(activeTasks.asJava)
892+
.setStandbyTasks(standbyTasks.asJava)
893+
.setWarmupTasks(warmupTasks.asJava)
894+
.setTopology(topology)
895+
).build(version)
896+
897+
// Send the request until receiving a successful response. There is a delay
898+
// here because the group coordinator is loaded in the background.
899+
var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null
900+
TestUtils.waitUntilTrue(() => {
901+
streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest)
902+
streamsGroupHeartbeatResponse.data.errorCode == expectedError.code
903+
}, msg = s"Could not heartbeat successfully. Last response $streamsGroupHeartbeatResponse.")
904+
905+
streamsGroupHeartbeatResponse.data
906+
}
907+
858908
protected def leaveGroupWithNewProtocol(
859909
groupId: String,
860910
memberId: String

0 commit comments

Comments
 (0)