Skip to content

Commit 52e7e58

Browse files
[FLINK-36209] Remove redundant operations in the initialization of KafkaSourceEnumState (#116)
In certain methods, such as the DynamicKafkaSourceEnumerator#onHandleSubscribedStreamsFetch() method, partitions are divided into assignedPartitions and unassignedInitialPartitions before being passed as parameters to the KafkaSourceEnumState constructor. However, within the constructor, these assignedPartitions and unassignedInitialPartitions are recombined into partitions, leading to unnecessary operations and reduced performance. By optimizing the code to pass partitions directly as a parameter when initializing KafkaSourceEnumState, we can eliminate redundant operations and enhance performance. --------- Co-authored-by: ClownXC <[email protected]>
1 parent 122a743 commit 52e7e58

File tree

4 files changed

+17
-24
lines changed

4 files changed

+17
-24
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
3636
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
3737
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
38+
import org.apache.flink.connector.kafka.source.enumerator.TopicPartitionAndAssignmentStatus;
3839
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
3940
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
4041
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
@@ -298,24 +299,16 @@ private void onHandleSubscribedStreamsFetch(Set<KafkaStream> fetchedKafkaStreams
298299
final Set<String> activeTopics = activeClusterTopics.getValue();
299300

300301
// filter out removed topics
301-
Set<TopicPartition> activeAssignedPartitions =
302-
kafkaSourceEnumState.assignedPartitions().stream()
303-
.filter(tp -> activeTopics.contains(tp.topic()))
304-
.collect(Collectors.toSet());
305-
Set<TopicPartition> activeUnassignedInitialPartitions =
306-
kafkaSourceEnumState.unassignedInitialPartitions().stream()
307-
.filter(tp -> activeTopics.contains(tp.topic()))
302+
Set<TopicPartitionAndAssignmentStatus> partitions =
303+
kafkaSourceEnumState.partitions().stream()
304+
.filter(tp -> activeTopics.contains(tp.topicPartition().topic()))
308305
.collect(Collectors.toSet());
309306

310307
newKafkaSourceEnumState =
311308
new KafkaSourceEnumState(
312-
activeAssignedPartitions,
313-
activeUnassignedInitialPartitions,
314-
kafkaSourceEnumState.initialDiscoveryFinished());
309+
partitions, kafkaSourceEnumState.initialDiscoveryFinished());
315310
} else {
316-
newKafkaSourceEnumState =
317-
new KafkaSourceEnumState(
318-
Collections.emptySet(), Collections.emptySet(), false);
311+
newKafkaSourceEnumState = new KafkaSourceEnumState(Collections.emptySet(), false);
319312
}
320313

321314
// restarts enumerator from state using only the active topic partitions, to avoid

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumState.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public class KafkaSourceEnumState {
3232
/** Partitions with status: ASSIGNED or UNASSIGNED_INITIAL. */
3333
private final Set<TopicPartitionAndAssignmentStatus> partitions;
3434
/**
35-
* this flag will be marked as true if inital partitions are discovered after enumerator starts.
35+
* this flag will be marked as true if initial partitions are discovered after enumerator
36+
* starts.
3637
*/
3738
private final boolean initialDiscoveryFinished;
3839

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,7 @@ public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws I
8989
case CURRENT_VERSION:
9090
return deserializeTopicPartitionAndAssignmentStatus(serialized);
9191
case VERSION_1:
92-
final Set<TopicPartition> assignedPartitions =
93-
deserializeTopicPartitions(serialized);
94-
return new KafkaSourceEnumState(assignedPartitions, new HashSet<>(), true);
92+
return deserializeAssignedTopicPartitions(serialized);
9593
case VERSION_0:
9694
Map<Integer, Set<KafkaPartitionSplit>> currentPartitionAssignment =
9795
SerdeUtils.deserializeSplitAssignments(
@@ -113,23 +111,24 @@ public KafkaSourceEnumState deserialize(int version, byte[] serialized) throws I
113111
}
114112
}
115113

116-
private static Set<TopicPartition> deserializeTopicPartitions(byte[] serializedTopicPartitions)
117-
throws IOException {
114+
private static KafkaSourceEnumState deserializeAssignedTopicPartitions(
115+
byte[] serializedTopicPartitions) throws IOException {
118116
try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedTopicPartitions);
119117
DataInputStream in = new DataInputStream(bais)) {
120118

121119
final int numPartitions = in.readInt();
122-
Set<TopicPartition> topicPartitions = new HashSet<>(numPartitions);
120+
Set<TopicPartitionAndAssignmentStatus> partitions = new HashSet<>(numPartitions);
123121
for (int i = 0; i < numPartitions; i++) {
124122
final String topic = in.readUTF();
125123
final int partition = in.readInt();
126-
topicPartitions.add(new TopicPartition(topic, partition));
124+
partitions.add(
125+
new TopicPartitionAndAssignmentStatus(
126+
new TopicPartition(topic, partition), AssignmentStatus.ASSIGNED));
127127
}
128128
if (in.available() > 0) {
129129
throw new IOException("Unexpected trailing bytes in serialized topic partitions");
130130
}
131-
132-
return topicPartitions;
131+
return new KafkaSourceEnumState(partitions, true);
133132
}
134133
}
135134

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public KafkaSourceEnumerator(
112112
properties,
113113
context,
114114
boundedness,
115-
new KafkaSourceEnumState(Collections.emptySet(), Collections.emptySet(), false));
115+
new KafkaSourceEnumState(Collections.emptySet(), false));
116116
}
117117

118118
public KafkaSourceEnumerator(

0 commit comments

Comments
 (0)