Skip to content

Commit 624dd45

Browse files
authored
KAFKA-18321: Add StreamsGroupMember, MemberState and Assignment classes (apache#18276)
* KAFKA-18321: Add StreamsGroupMember, MemberState and Assignment classes This commit adds the classes to represent a Streams group member in the consumer coordinator. Reviewers: Bill Bejeck <[email protected]>, Lucas Brutschy <[email protected]>
1 parent aa22676 commit 624dd45

File tree

6 files changed

+1233
-0
lines changed

6 files changed

+1233
-0
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.coordinator.group.streams;
18+
19+
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
20+
21+
import java.util.Collections;
22+
import java.util.HashSet;
23+
import java.util.Map;
24+
import java.util.Objects;
25+
import java.util.Set;
26+
import java.util.stream.Collectors;
27+
28+
/**
29+
* An immutable assignment for a member.
30+
*
31+
* @param activeTasks Active tasks assigned to the member.
32+
* The key of the map is the subtopology ID and the value is the set of partition IDs.
33+
* @param standbyTasks Standby tasks assigned to the member.
34+
* The key of the map is the subtopology ID and the value is the set of partition IDs.
35+
* @param warmupTasks Warm-up tasks assigned to the member.
36+
* The key of the map is the subtopology ID and the value is the set of partition IDs.
37+
*/
38+
public record Assignment(Map<String, Set<Integer>> activeTasks,
39+
Map<String, Set<Integer>> standbyTasks,
40+
Map<String, Set<Integer>> warmupTasks) {
41+
42+
public Assignment {
43+
activeTasks = Collections.unmodifiableMap(Objects.requireNonNull(activeTasks));
44+
standbyTasks = Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks));
45+
warmupTasks = Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks));
46+
}
47+
48+
/**
49+
* An empty assignment.
50+
*/
51+
public static final Assignment EMPTY = new Assignment(
52+
Collections.emptyMap(),
53+
Collections.emptyMap(),
54+
Collections.emptyMap()
55+
);
56+
57+
/**
58+
* Creates a {{@link org.apache.kafka.coordinator.group.streams.Assignment}} from a
59+
* {{@link org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue}}.
60+
*
61+
* @param record The record.
62+
* @return A {{@link org.apache.kafka.coordinator.group.streams.Assignment}}.
63+
*/
64+
public static Assignment fromRecord(
65+
StreamsGroupTargetAssignmentMemberValue record
66+
) {
67+
return new Assignment(
68+
record.activeTasks().stream()
69+
.collect(Collectors.toMap(
70+
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
71+
taskId -> new HashSet<>(taskId.partitions())
72+
)
73+
),
74+
record.standbyTasks().stream()
75+
.collect(Collectors.toMap(
76+
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
77+
taskId -> new HashSet<>(taskId.partitions())
78+
)
79+
),
80+
record.warmupTasks().stream()
81+
.collect(Collectors.toMap(
82+
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
83+
taskId -> new HashSet<>(taskId.partitions())
84+
)
85+
)
86+
);
87+
}
88+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.coordinator.group.streams;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
/**
23+
* The various states that a member can be in. For their definition, refer to the documentation of
24+
* {{@link org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder}}.
25+
*/
26+
public enum MemberState {
27+
28+
/**
29+
* The member is fully reconciled with the desired target assignment.
30+
*/
31+
STABLE((byte) 1),
32+
33+
/**
34+
* The member must revoke some tasks in order to be able to transition to the next epoch.
35+
*/
36+
UNREVOKED_TASKS((byte) 2),
37+
38+
/**
39+
* The member transitioned to the last epoch but waits on some tasks which have not been revoked by their previous owners yet.
40+
*/
41+
UNRELEASED_TASKS((byte) 3),
42+
43+
/**
44+
* The member is in an unknown state. This can only happen if a future version of the software introduces a new state unknown by this
45+
* version.
46+
*/
47+
UNKNOWN((byte) 127);
48+
49+
private static final Map<Byte, MemberState> VALUES_TO_ENUMS = new HashMap<>();
50+
51+
static {
52+
for (MemberState state : MemberState.values()) {
53+
VALUES_TO_ENUMS.put(state.value(), state);
54+
}
55+
}
56+
57+
private final byte value;
58+
59+
MemberState(byte value) {
60+
this.value = value;
61+
}
62+
63+
public byte value() {
64+
return value;
65+
}
66+
67+
public static MemberState fromValue(byte value) {
68+
MemberState state = VALUES_TO_ENUMS.get(value);
69+
if (state == null) {
70+
return UNKNOWN;
71+
}
72+
return state;
73+
}
74+
}

0 commit comments

Comments
 (0)