Skip to content

Commit df4b3ae

Browse files
authored
[KIP-848] [mock cluster] Improved static group membership implementation (confluentinc#5030)
When member is leaving with -2 a new member that is joining can replace the previous member id with the new one. If previous member didn't leave the new member joining with same group.instance.id is fenced in new protocol.
1 parent 1012b74 commit df4b3ae

File tree

5 files changed

+85
-34
lines changed

5 files changed

+85
-34
lines changed

src/rdkafka_mock.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3156,7 +3156,8 @@ static int ut_cgrp_consumer_member_next_assignment0(
31563156
fixtures[i].comment);
31573157

31583158
if (fixtures[i].session_timed_out) {
3159-
rd_kafka_mock_cgrp_consumer_member_leave(mcgrp, member);
3159+
rd_kafka_mock_cgrp_consumer_member_leave(mcgrp, member,
3160+
rd_false);
31603161
member = rd_kafka_mock_cgrp_consumer_member_add(
31613162
mcgrp, conn, &MemberId, &InstanceId,
31623163
&SubscribedTopic, 1, &SubscribedTopicRegex);

src/rdkafka_mock_cgrp.c

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1486,6 +1486,15 @@ static rd_bool_t rd_kafka_mock_cgrp_consumer_member_subscribed_topic_names_set(
14861486
return changed;
14871487
}
14881488

1489+
static void rd_kafka_mock_cgrp_consumer_member_topic_id_set(
1490+
rd_kafka_mock_cgrp_consumer_member_t *member,
1491+
const rd_kafkap_str_t *MemberId) {
1492+
/* KIP 1082: MemberId is generated by the client */
1493+
rd_assert(RD_KAFKAP_STR_LEN(MemberId) > 0);
1494+
RD_IF_FREE(member->id, rd_free);
1495+
member->id = RD_KAFKAP_STR_DUP(MemberId);
1496+
}
1497+
14891498
/**
14901499
* @brief Adds a member to consumer group \p mcgrp. If member with same
14911500
* \p MemberId is already present, only updates the connection and
@@ -1521,17 +1530,24 @@ rd_kafka_mock_cgrp_consumer_member_t *rd_kafka_mock_cgrp_consumer_member_add(
15211530
if (!member) {
15221531
member = rd_kafka_mock_cgrp_consumer_member_find_by_instance_id(
15231532
mcgrp, InstanceId);
1524-
if (member && RD_KAFKAP_STR_LEN(MemberId) > 0 &&
1525-
rd_kafkap_str_cmp_str(MemberId, member->id) != 0) {
1526-
/* Either member is a new instance and is rejoining
1527-
* with same InstanceId, so MemberId is NULL,
1528-
* or it's rejoining after unsubscribing,
1529-
* then it must have the same MemberId as before,
1530-
* as it lasts for member lifetime.
1531-
* It both don't hold, we cannot add the member
1532-
* to the group. */
1533-
return NULL;
1533+
1534+
if (member) {
1535+
if (!member->left_static_membership) {
1536+
/* Old member still active,
1537+
* fence this one */
1538+
return NULL;
1539+
}
1540+
1541+
if (rd_kafkap_str_cmp_str(MemberId, member->id) != 0) {
1542+
/* Member is a new instance and is rejoining
1543+
* with a new MemberId. */
1544+
rd_kafka_mock_cgrp_consumer_member_topic_id_set(
1545+
member, MemberId);
1546+
}
1547+
member->left_static_membership = rd_false;
15341548
}
1549+
} else {
1550+
member->left_static_membership = rd_false;
15351551
}
15361552

15371553
if (!member) {
@@ -1548,7 +1564,8 @@ rd_kafka_mock_cgrp_consumer_member_t *rd_kafka_mock_cgrp_consumer_member_add(
15481564
member = rd_calloc(1, sizeof(*member));
15491565
member->mcgrp = mcgrp;
15501566

1551-
member->id = RD_KAFKAP_STR_DUP(MemberId);
1567+
rd_kafka_mock_cgrp_consumer_member_topic_id_set(member,
1568+
MemberId);
15521569

15531570
if (!RD_KAFKAP_STR_IS_NULL(InstanceId))
15541571
member->instance_id = RD_KAFKAP_STR_DUP(InstanceId);
@@ -1617,28 +1634,39 @@ static void rd_kafka_mock_cgrp_consumer_member_destroy(
16171634
rd_free(member);
16181635
}
16191636

1637+
static void rd_kafka_mock_cgrp_consumer_member_leave_static(
1638+
rd_kafka_mock_cgrp_consumer_member_t *member) {
1639+
member->left_static_membership = rd_true;
1640+
rd_kafka_mock_cgrp_consumer_member_returned_assignment_set(member,
1641+
NULL);
1642+
}
1643+
16201644

16211645
/**
16221646
* @brief Called when a member must leave a consumer group.
16231647
*
16241648
* @param mcgrp Consumer group to leave.
16251649
* @param member Member that leaves.
1650+
* @param leave_static If true, the member is leaving with static group
1651+
* membership.
16261652
*
16271653
* @locks mcluster->lock MUST be held.
16281654
*/
16291655
void rd_kafka_mock_cgrp_consumer_member_leave(
16301656
rd_kafka_mock_cgrp_consumer_t *mcgrp,
1631-
rd_kafka_mock_cgrp_consumer_member_t *member) {
1657+
rd_kafka_mock_cgrp_consumer_member_t *member,
1658+
rd_bool_t leave_static) {
16321659
rd_bool_t is_static = member->instance_id != NULL;
16331660

16341661
rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK",
1635-
"Member %s is leaving group %s, is static: %s", member->id,
1636-
mcgrp->id, RD_STR_ToF(is_static));
1637-
if (!is_static)
1662+
"Member %s is leaving group %s, is static: %s, "
1663+
"static leave: %s",
1664+
member->id, mcgrp->id, RD_STR_ToF(is_static),
1665+
RD_STR_ToF(leave_static));
1666+
if (!is_static || !leave_static)
16381667
rd_kafka_mock_cgrp_consumer_member_destroy(mcgrp, member);
16391668
else
1640-
rd_kafka_mock_cgrp_consumer_member_returned_assignment_set(
1641-
member, NULL);
1669+
rd_kafka_mock_cgrp_consumer_member_leave_static(member);
16421670
}
16431671

16441672
/**

src/rdkafka_mock_handlers.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2896,7 +2896,7 @@ rd_kafka_mock_handle_ConsumerGroupHeartbeat(rd_kafka_mock_connection_t *mconn,
28962896
}
28972897
} else {
28982898
rd_kafka_mock_cgrp_consumer_member_leave(
2899-
mcgrp, member);
2899+
mcgrp, member, MemberEpoch == -2);
29002900
member = NULL;
29012901
}
29022902
} else {

src/rdkafka_mock_int.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,9 @@ typedef struct rd_kafka_mock_cgrp_consumer_member_s {
170170
rd_list_t *subscribed_topic_names; /**< Subscribed topic names received
171171
in the heartbeat */
172172
char *subscribed_topic_regex; /**< Subscribed regex */
173+
174+
rd_bool_t left_static_membership; /**< Member left the group
175+
* with static membership. */
173176
struct rd_kafka_mock_connection_s *conn; /**< Connection, may be NULL
174177
* if there is no ongoing
175178
* request. */
@@ -675,7 +678,8 @@ rd_kafka_mock_cgrp_consumer_get(rd_kafka_mock_cluster_t *mcluster,
675678

676679
void rd_kafka_mock_cgrp_consumer_member_leave(
677680
rd_kafka_mock_cgrp_consumer_t *mcgrp,
678-
rd_kafka_mock_cgrp_consumer_member_t *member);
681+
rd_kafka_mock_cgrp_consumer_member_t *member,
682+
rd_bool_t static_leave);
679683

680684
void rd_kafka_mock_cgrp_consumer_member_fenced(
681685
rd_kafka_mock_cgrp_consumer_t *mcgrp,

tests/0102-static_group_rebalance.c

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -673,16 +673,30 @@ static rd_bool_t is_api_key(rd_kafka_mock_request_t *request, void *opaque) {
673673
return rd_kafka_mock_request_api_key(request) == api_key;
674674
}
675675

676+
/**
677+
* @enum do_test_static_membership_mock_variation_t
678+
* @brief Variations of the static membership mock test.
679+
*/
680+
typedef enum do_test_static_membership_mock_variation_t {
681+
/** Consumer 1 leaves with unsubscribe and rejoins the group */
682+
DO_TEST_STATIC_MEMBERSHIP_MOCK_VARIATION_SAME_INSTANCE = 0,
683+
/** Consumer 1 leaves with unsubscribe and a new consumer with same
684+
* group.instance.id joins the group */
685+
DO_TEST_STATIC_MEMBERSHIP_MOCK_VARIATION_NEW_INSTANCE = 1,
686+
DO_TEST_STATIC_MEMBERSHIP_MOCK_VARIATION__CNT
687+
} do_test_static_membership_mock_variation_t;
688+
676689
/**
677690
* @brief Static group membership tests with the mock cluster.
678691
* Checks that consumer returns the same assignment
679692
* and generation id after re-joining.
680693
*
681-
* variation 1) consumer 1 leaves and rejoins the group.
682-
* variation 2) consumer 1 leaves and a new consumer with same
683-
* group.instance.id joins the group.
694+
* @param variation Test variation to run.
695+
*
696+
* @sa `do_test_static_membership_mock_variation_t`
684697
*/
685-
static void do_test_static_membership_mock(int variation) {
698+
static void do_test_static_membership_mock(
699+
do_test_static_membership_mock_variation_t variation) {
686700
const char *bootstraps;
687701
rd_kafka_mock_cluster_t *mcluster;
688702
int32_t api_key = RD_KAFKAP_ConsumerGroupHeartbeat;
@@ -693,8 +707,11 @@ static void do_test_static_membership_mock(int variation) {
693707
rd_kafka_topic_partition_list_t *prev_assignment1, *prev_assignment2,
694708
*next_assignment1, *next_assignment2;
695709

696-
SUB_TEST_QUICK("%s", variation == 1 ? "with unsubscribe"
697-
: "with new instance");
710+
SUB_TEST_QUICK(
711+
"%s",
712+
variation == DO_TEST_STATIC_MEMBERSHIP_MOCK_VARIATION_SAME_INSTANCE
713+
? "with same instance"
714+
: "with new instance");
698715

699716
mcluster = test_mock_cluster_new(3, &bootstraps);
700717
rd_kafka_mock_topic_create(mcluster, topic, 2, 3);
@@ -730,7 +747,8 @@ static void do_test_static_membership_mock(int variation) {
730747
&api_key);
731748
rd_kafka_mock_stop_request_tracking(mcluster);
732749

733-
if (variation == 2) {
750+
if (variation ==
751+
DO_TEST_STATIC_MEMBERSHIP_MOCK_VARIATION_NEW_INSTANCE) {
734752
/* Don't destroy it immediately because the
735753
* topic partition lists still hold a reference. */
736754
consumer_1_to_destroy = consumer1;
@@ -800,11 +818,7 @@ int main_0102_static_group_rebalance(int argc, char **argv) {
800818

801819
int main_0102_static_group_rebalance_mock(int argc, char **argv) {
802820
TEST_SKIP_MOCK_CLUSTER(0);
803-
804-
TEST_SKIP(
805-
"Static membership mock fixes are present in another PR. Test it "
806-
"with that PR.\n");
807-
return 0;
821+
int variation;
808822

809823
if (test_consumer_group_protocol_classic()) {
810824
TEST_SKIP(
@@ -813,7 +827,11 @@ int main_0102_static_group_rebalance_mock(int argc, char **argv) {
813827
return 0;
814828
}
815829

816-
do_test_static_membership_mock(1);
817-
do_test_static_membership_mock(2);
830+
for (variation = DO_TEST_STATIC_MEMBERSHIP_MOCK_VARIATION_SAME_INSTANCE;
831+
variation < DO_TEST_STATIC_MEMBERSHIP_MOCK_VARIATION__CNT;
832+
variation++) {
833+
do_test_static_membership_mock(variation);
834+
}
835+
818836
return 0;
819837
}

0 commit comments

Comments
 (0)