@@ -728,8 +728,7 @@ ConsumerGroup consumerGroup(
728728 * created if it does not exist.
729729 *
730730 * @return A ConsumerGroup.
731- * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or
732- * if the group is not a consumer group.
731+ * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false.
733732 * @throws IllegalStateException if the group does not have the expected type.
734733 * Package private for testing.
735734 */
@@ -846,28 +845,28 @@ private ShareGroup getOrMaybeCreateShareGroup(
846845
847846 if (group == null ) {
848847 return new ShareGroup (snapshotRegistry , groupId );
848+ } else {
849+ if (group .type () == SHARE ) {
850+ return (ShareGroup ) group ;
851+ } else {
852+ // We don't support upgrading/downgrading between protocols at the moment so
853+ // we throw an exception if a group exists with the wrong type.
854+ throw new GroupIdNotFoundException (String .format ("Group %s is not a share group." , groupId ));
855+ }
849856 }
850-
851- if (group .type () != SHARE ) {
852- // We don't support upgrading/downgrading between protocols at the moment so
853- // we throw an exception if a group exists with the wrong type.
854- throw new GroupIdNotFoundException (String .format ("Group %s is not a share group." ,
855- groupId ));
856- }
857-
858- return (ShareGroup ) group ;
859857 }
860858
861859 /**
862- * Gets or maybe creates a share group.
860+ * The method should be called on the replay path.
861+ * Gets or maybe creates a share group and updates the groups map if a new group is created.
863862 *
864863 * @param groupId The group id.
865864 * @param createIfNotExists A boolean indicating whether the group should be
866865 * created if it does not exist.
867866 *
868867 * @return A ShareGroup.
869- * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or
870- * if the group is not a consumer group .
868+ * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false.
869+ * @throws IllegalStateException if the group does not have the expected type .
871870 *
872871 * Package private for testing.
873872 */
@@ -878,22 +877,22 @@ ShareGroup getOrMaybeCreatePersistedShareGroup(
878877 Group group = groups .get (groupId );
879878
880879 if (group == null && !createIfNotExists ) {
881- throw new IllegalStateException (String .format ("Share group %s not found." , groupId ));
880+ throw new GroupIdNotFoundException (String .format ("Share group %s not found." , groupId ));
882881 }
883882
884883 if (group == null ) {
885884 ShareGroup shareGroup = new ShareGroup (snapshotRegistry , groupId );
886885 groups .put (groupId , shareGroup );
887886 return shareGroup ;
887+ } else {
888+ if (group .type () == SHARE ) {
889+ return (ShareGroup ) group ;
890+ } else {
891+ // We don't support upgrading/downgrading between protocols at the moment so
892+ // we throw an exception if a group exists with the wrong type.
893+ throw new IllegalStateException (String .format ("Group %s is not a share group." , groupId ));
894+ }
888895 }
889-
890- if (group .type () != SHARE ) {
891- // We don't support upgrading/downgrading between protocols at the moment so
892- // we throw an exception if a group exists with the wrong type.
893- throw new GroupIdNotFoundException (String .format ("Group %s is not a share group." , groupId ));
894- }
895-
896- return (ShareGroup ) group ;
897896 }
898897
899898 /**
@@ -2032,11 +2031,10 @@ private CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sh
20322031
20332032 // Get or create the share group.
20342033 boolean createIfNotExists = memberEpoch == 0 ;
2035- final ShareGroup group = getOrMaybeCreatePersistedShareGroup (groupId , createIfNotExists );
2034+ final ShareGroup group = getOrMaybeCreateShareGroup (groupId , createIfNotExists );
20362035 throwIfShareGroupIsFull (group , memberId );
20372036
20382037 // Get or create the member.
2039- if (memberId .isEmpty ()) memberId = Uuid .randomUuid ().toString ();
20402038 ShareGroupMember member = getOrMaybeSubscribeShareGroupMember (
20412039 group ,
20422040 memberId ,
@@ -2143,9 +2141,12 @@ private CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sh
21432141 .setHeartbeatIntervalMs (shareGroupHeartbeatIntervalMs (groupId ));
21442142
21452143 // The assignment is only provided in the following cases:
2146- // 1. The member just joined or rejoined to group (epoch equals to zero);
2144+ // 1. The member sent a full request. It does so when joining or rejoining the group with zero
2145+ // as the member epoch; or on any errors (e.g. timeout). We use all the non-optional fields
2146+ // (subscribedTopicNames) to detect a full request as those must be set in a full request.
21472147 // 2. The member's assignment has been updated.
2148- if (memberEpoch == 0 || hasAssignedPartitionsChanged (member , updatedMember )) {
2148+ boolean isFullRequest = subscribedTopicNames != null ;
2149+ if (memberEpoch == 0 || isFullRequest || hasAssignedPartitionsChanged (member , updatedMember )) {
21492150 response .setAssignment (createShareGroupResponseAssignment (updatedMember ));
21502151 }
21512152
0 commit comments