@@ -1486,6 +1486,15 @@ static rd_bool_t rd_kafka_mock_cgrp_consumer_member_subscribed_topic_names_set(
14861486 return changed ;
14871487}
14881488
1489+ static void rd_kafka_mock_cgrp_consumer_member_topic_id_set (
1490+ rd_kafka_mock_cgrp_consumer_member_t * member ,
1491+ const rd_kafkap_str_t * MemberId ) {
1492+ /* KIP 1082: MemberId is generated by the client */
1493+ rd_assert (RD_KAFKAP_STR_LEN (MemberId ) > 0 );
1494+ RD_IF_FREE (member -> id , rd_free );
1495+ member -> id = RD_KAFKAP_STR_DUP (MemberId );
1496+ }
1497+
14891498/**
14901499 * @brief Adds a member to consumer group \p mcgrp. If member with same
14911500 * \p MemberId is already present, only updates the connection and
@@ -1521,17 +1530,24 @@ rd_kafka_mock_cgrp_consumer_member_t *rd_kafka_mock_cgrp_consumer_member_add(
15211530 if (!member ) {
15221531 member = rd_kafka_mock_cgrp_consumer_member_find_by_instance_id (
15231532 mcgrp , InstanceId );
1524- if (member && RD_KAFKAP_STR_LEN (MemberId ) > 0 &&
1525- rd_kafkap_str_cmp_str (MemberId , member -> id ) != 0 ) {
1526- /* Either member is a new instance and is rejoining
1527- * with same InstanceId, so MemberId is NULL,
1528- * or it's rejoining after unsubscribing,
1529- * then it must have the same MemberId as before,
1530- * as it lasts for member lifetime.
1531- * It both don't hold, we cannot add the member
1532- * to the group. */
1533- return NULL ;
1533+
1534+ if (member ) {
1535+ if (!member -> left_static_membership ) {
1536+ /* Old member still active,
1537+ * fence this one */
1538+ return NULL ;
1539+ }
1540+
1541+ if (rd_kafkap_str_cmp_str (MemberId , member -> id ) != 0 ) {
1542+ /* Member is a new instance and is rejoining
1543+ * with a new MemberId. */
1544+ rd_kafka_mock_cgrp_consumer_member_topic_id_set (
1545+ member , MemberId );
1546+ }
1547+ member -> left_static_membership = rd_false ;
15341548 }
1549+ } else {
1550+ member -> left_static_membership = rd_false ;
15351551 }
15361552
15371553 if (!member ) {
@@ -1548,7 +1564,8 @@ rd_kafka_mock_cgrp_consumer_member_t *rd_kafka_mock_cgrp_consumer_member_add(
15481564 member = rd_calloc (1 , sizeof (* member ));
15491565 member -> mcgrp = mcgrp ;
15501566
1551- member -> id = RD_KAFKAP_STR_DUP (MemberId );
1567+ rd_kafka_mock_cgrp_consumer_member_topic_id_set (member ,
1568+ MemberId );
15521569
15531570 if (!RD_KAFKAP_STR_IS_NULL (InstanceId ))
15541571 member -> instance_id = RD_KAFKAP_STR_DUP (InstanceId );
@@ -1617,28 +1634,39 @@ static void rd_kafka_mock_cgrp_consumer_member_destroy(
16171634 rd_free (member );
16181635}
16191636
1637+ static void rd_kafka_mock_cgrp_consumer_member_leave_static (
1638+ rd_kafka_mock_cgrp_consumer_member_t * member ) {
1639+ member -> left_static_membership = rd_true ;
1640+ rd_kafka_mock_cgrp_consumer_member_returned_assignment_set (member ,
1641+ NULL );
1642+ }
1643+
16201644
16211645/**
16221646 * @brief Called when a member must leave a consumer group.
16231647 *
16241648 * @param mcgrp Consumer group to leave.
16251649 * @param member Member that leaves.
1650+ * @param leave_static If true, the member is leaving with static group
1651+ * membership.
16261652 *
16271653 * @locks mcluster->lock MUST be held.
16281654 */
16291655void rd_kafka_mock_cgrp_consumer_member_leave (
16301656 rd_kafka_mock_cgrp_consumer_t * mcgrp ,
1631- rd_kafka_mock_cgrp_consumer_member_t * member ) {
1657+ rd_kafka_mock_cgrp_consumer_member_t * member ,
1658+ rd_bool_t leave_static ) {
16321659 rd_bool_t is_static = member -> instance_id != NULL ;
16331660
16341661 rd_kafka_dbg (mcgrp -> cluster -> rk , MOCK , "MOCK" ,
1635- "Member %s is leaving group %s, is static: %s" , member -> id ,
1636- mcgrp -> id , RD_STR_ToF (is_static ));
1637- if (!is_static )
1662+ "Member %s is leaving group %s, is static: %s, "
1663+ "static leave: %s" ,
1664+ member -> id , mcgrp -> id , RD_STR_ToF (is_static ),
1665+ RD_STR_ToF (leave_static ));
1666+ if (!is_static || !leave_static )
16381667 rd_kafka_mock_cgrp_consumer_member_destroy (mcgrp , member );
16391668 else
1640- rd_kafka_mock_cgrp_consumer_member_returned_assignment_set (
1641- member , NULL );
1669+ rd_kafka_mock_cgrp_consumer_member_leave_static (member );
16421670}
16431671
16441672/**
0 commit comments