Skip to content

Commit 003e594

Browse files
authored
[FLINK-38381] Enumerate Kafka partitions across Kafka clusters (#1030)
When using the DynamicKafkaSink, topics can be spread across multiple clusters. This used to work fine, but a regression has been added which considers partitions across different clusters to be identical. This limits the scale out of the source operator. Here is an example: ``` "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-1.KafkaSourceReader.topic.testTopic.partition.0.currentOffset", "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.0.currentOffset" ``` Those would result be treated as one partition, but there are two partitions from separate kafka clusters.
1 parent b820577 commit 003e594

File tree

2 files changed

+22
-2
lines changed

2 files changed

+22
-2
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ private void updateKafkaPulsarSourceNumPartitions(
269269
try (var restClient = ctx.getRestClusterClient()) {
270270
Pattern partitionRegex =
271271
Pattern.compile(
272-
"^.*\\.KafkaSourceReader\\.topic\\.(?<kafkaTopic>.+)\\.partition\\.(?<kafkaId>\\d+)\\.currentOffset$"
272+
"^.*?(\\.kafkaCluster\\.(?<kafkaCluster>.+))?\\.KafkaSourceReader\\.topic\\.(?<kafkaTopic>.+)\\.partition\\.(?<kafkaId>\\d+)\\.currentOffset$"
273273
+ "|^.*\\.PulsarConsumer\\.(?<pulsarTopic>.+)-partition-(?<pulsarId>\\d+)\\..*\\.numMsgsReceived$");
274274
for (var vertexInfo : topology.getVertexInfos().values()) {
275275
if (vertexInfo.getInputs().isEmpty()) {
@@ -281,12 +281,18 @@ private void updateKafkaPulsarSourceNumPartitions(
281281
Matcher matcher = partitionRegex.matcher(v);
282282
if (matcher.matches()) {
283283
String kafkaTopic = matcher.group("kafkaTopic");
284+
String kafkaCluster =
285+
matcher.group("kafkaCluster");
284286
String kafkaId = matcher.group("kafkaId");
285287
String pulsarTopic =
286288
matcher.group("pulsarTopic");
287289
String pulsarId = matcher.group("pulsarId");
288290
return kafkaTopic != null
289-
? kafkaTopic + "-" + kafkaId
291+
? kafkaCluster
292+
+ "-"
293+
+ kafkaTopic
294+
+ "-"
295+
+ kafkaId
290296
: pulsarTopic + "-" + pulsarId;
291297
}
292298
return null;

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,20 @@ public void testKafkaPulsarNumPartitions() throws Exception {
258258
collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
259259
assertEquals(5, collectedMetrics.getJobTopology().get(source1).getNumSourcePartitions());
260260

261+
metricsCollector.setMetricNames(
262+
Map.of(
263+
source1,
264+
List.of(
265+
"1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-1.KafkaSourceReader.topic.testTopic.partition.0.currentOffset",
266+
"1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-1.KafkaSourceReader.topic.anotherTopic.partition.0.currentOffset",
267+
"1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.0.currentOffset",
268+
"1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.1.currentOffset",
269+
"1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.2.currentOffset",
270+
"1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.3.currentOffset")));
271+
272+
collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
273+
assertEquals(6, collectedMetrics.getJobTopology().get(source1).getNumSourcePartitions());
274+
261275
metricsCollector.setMetricNames(
262276
Map.of(
263277
source2,

0 commit comments

Comments
 (0)