Skip to content

Commit cfa0b41

Browse files
authored
MINOR: Remove metrics attribute from StreamsGroup (#20559)
The `metrics` attribute in `StreamsGroup` is not used anymore. This patch removes it. Reviewers: Ken Huang <[email protected]>, Lucas Brutschy <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent d067c6c commit cfa0b41

File tree

3 files changed

+14
-36
lines changed

3 files changed

+14
-36
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -851,10 +851,10 @@ StreamsGroup getOrCreateStreamsGroup(
851851
Group group = groups.get(groupId);
852852

853853
if (group == null) {
854-
return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
854+
return new StreamsGroup(logContext, snapshotRegistry, groupId);
855855
} else if (maybeDeleteEmptyClassicGroup(group, records)) {
856856
log.info("[GroupId {}] Converted the empty classic group to a streams group.", groupId);
857-
return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
857+
return new StreamsGroup(logContext, snapshotRegistry, groupId);
858858
} else {
859859
return castToStreamsGroup(group);
860860
}
@@ -1023,7 +1023,7 @@ private StreamsGroup getOrMaybeCreatePersistedStreamsGroup(
10231023
}
10241024

10251025
if (group == null) {
1026-
StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
1026+
StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, groupId);
10271027
groups.put(groupId, streamsGroup);
10281028
return streamsGroup;
10291029
} else if (group.type() == STREAMS) {

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
3232
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
3333
import org.apache.kafka.coordinator.group.Utils;
34-
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
3534
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
3635
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
3736
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -179,11 +178,6 @@ public static class DeadlineAndEpoch {
179178
private final TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> currentStandbyTaskToProcessIds;
180179
private final TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> currentWarmupTaskToProcessIds;
181180

182-
/**
183-
* The coordinator metrics.
184-
*/
185-
private final GroupCoordinatorMetricsShard metrics;
186-
187181
/**
188182
* The Streams topology.
189183
*/
@@ -220,8 +214,7 @@ public static class DeadlineAndEpoch {
220214
public StreamsGroup(
221215
LogContext logContext,
222216
SnapshotRegistry snapshotRegistry,
223-
String groupId,
224-
GroupCoordinatorMetricsShard metrics
217+
String groupId
225218
) {
226219
this.log = logContext.logger(StreamsGroup.class);
227220
this.logContext = logContext;
@@ -238,7 +231,6 @@ public StreamsGroup(
238231
this.currentActiveTaskToProcessId = new TimelineHashMap<>(snapshotRegistry, 0);
239232
this.currentStandbyTaskToProcessIds = new TimelineHashMap<>(snapshotRegistry, 0);
240233
this.currentWarmupTaskToProcessIds = new TimelineHashMap<>(snapshotRegistry, 0);
241-
this.metrics = Objects.requireNonNull(metrics);
242234
this.topology = new TimelineObject<>(snapshotRegistry, Optional.empty());
243235
this.configuredTopology = new TimelineObject<>(snapshotRegistry, Optional.empty());
244236
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.kafka.coordinator.group.streams;
1818

19-
import org.apache.kafka.common.TopicPartition;
2019
import org.apache.kafka.common.Uuid;
2120
import org.apache.kafka.common.errors.GroupNotEmptyException;
2221
import org.apache.kafka.common.errors.StaleMemberEpochException;
@@ -43,7 +42,6 @@
4342
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
4443
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
4544
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
46-
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
4745
import org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
4846
import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
4947
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
@@ -90,8 +88,7 @@ private StreamsGroup createStreamsGroup(String groupId) {
9088
return new StreamsGroup(
9189
LOG_CONTEXT,
9290
snapshotRegistry,
93-
groupId,
94-
mock(GroupCoordinatorMetricsShard.class)
91+
groupId
9592
);
9693
}
9794

@@ -693,8 +690,7 @@ public void testAsListedGroup() {
693690
StreamsGroup group = new StreamsGroup(
694691
LOG_CONTEXT,
695692
snapshotRegistry,
696-
"group-foo",
697-
mock(GroupCoordinatorMetricsShard.class)
693+
"group-foo"
698694
);
699695
group.setGroupEpoch(1);
700696
group.setTopology(new StreamsTopology(1, Map.of()));
@@ -719,8 +715,7 @@ public void testValidateOffsetFetch() {
719715
StreamsGroup group = new StreamsGroup(
720716
LOG_CONTEXT,
721717
snapshotRegistry,
722-
"group-foo",
723-
mock(GroupCoordinatorMetricsShard.class)
718+
"group-foo"
724719
);
725720

726721
// Simulate a call from the admin client without member ID and member epoch.
@@ -790,7 +785,7 @@ public void testOffsetExpirationCondition() {
790785
long commitTimestamp = 20000L;
791786
long offsetsRetentionMs = 10000L;
792787
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID);
793-
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, new SnapshotRegistry(LOG_CONTEXT), "group-id", mock(GroupCoordinatorMetricsShard.class));
788+
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, new SnapshotRegistry(LOG_CONTEXT), "group-id");
794789

795790
Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition();
796791
assertTrue(offsetExpirationCondition.isPresent());
@@ -803,7 +798,7 @@ public void testOffsetExpirationCondition() {
803798
@Test
804799
public void testAsDescribedGroup() {
805800
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
806-
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-1", mock(GroupCoordinatorMetricsShard.class));
801+
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-id-1");
807802
snapshotRegistry.idempotentCreateSnapshot(0);
808803
assertEquals(StreamsGroup.StreamsGroupState.EMPTY.toString(), group.stateAsString(0));
809804

@@ -887,12 +882,7 @@ public void testAsDescribedGroup() {
887882
@Test
888883
public void testIsInStatesCaseInsensitiveAndUnderscored() {
889884
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
890-
GroupCoordinatorMetricsShard metricsShard = new GroupCoordinatorMetricsShard(
891-
snapshotRegistry,
892-
Map.of(),
893-
new TopicPartition("__consumer_offsets", 0)
894-
);
895-
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-foo", metricsShard);
885+
StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, "group-foo");
896886
snapshotRegistry.idempotentCreateSnapshot(0);
897887
assertTrue(group.isInStates(Set.of("empty"), 0));
898888
assertFalse(group.isInStates(Set.of("Empty"), 0));
@@ -911,8 +901,7 @@ public void testComputeMetadataHash() {
911901
StreamsGroup streamsGroup = new StreamsGroup(
912902
LOG_CONTEXT,
913903
snapshotRegistry,
914-
"group-foo",
915-
mock(GroupCoordinatorMetricsShard.class)
904+
"group-foo"
916905
);
917906

918907
MetadataImage metadataImage = new MetadataImageBuilder()
@@ -933,8 +922,7 @@ void testCreateGroupTombstoneRecords() {
933922
StreamsGroup streamsGroup = new StreamsGroup(
934923
LOG_CONTEXT,
935924
snapshotRegistry,
936-
"test-group",
937-
mock(GroupCoordinatorMetricsShard.class)
925+
"test-group"
938926
);
939927
streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
940928
.setMemberEpoch(1)
@@ -961,8 +949,7 @@ void testCreateGroupTombstoneRecords() {
961949
public void testIsSubscribedToTopic() {
962950
LogContext logContext = new LogContext();
963951
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
964-
GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
965-
StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group", metricsShard);
952+
StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group");
966953

967954
assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
968955
assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
@@ -1008,8 +995,7 @@ public void testShutdownRequestedMethods() {
1008995
String memberId2 = "test-member-id2";
1009996
LogContext logContext = new LogContext();
1010997
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
1011-
GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class);
1012-
StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group", metricsShard);
998+
StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group");
1013999

10141000
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId1));
10151001
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId2));

0 commit comments

Comments
 (0)