From 63141159c1f115029508c7bffe9f5e2a9a861a5f Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Thu, 18 Sep 2025 15:49:43 +0200 Subject: [PATCH 1/2] [FLINK-38381] Enumerate Kafka partitions across Kafka clusters When using the DynamicKafkaSink, topics can be spread across multiple clusters. This used to work fine, but a regression has been added which considers partitions across different clusters to be identical. This limits the scale out of the source operator. Here is an example: ``` "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-1.KafkaSourceReader.topic.testTopic.partition.0.currentOffset", "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.0.currentOffset" ``` Those would result be treated as one partition, but there are two partitions from separate kafka clusters. --- .../flink/autoscaler/ScalingMetricCollector.java | 6 ++++-- .../MetricsCollectionAndEvaluationTest.java | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java index ee3ded9e0b..b59d7bdd46 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java @@ -269,7 +269,7 @@ private void updateKafkaPulsarSourceNumPartitions( try (var restClient = ctx.getRestClusterClient()) { Pattern partitionRegex = Pattern.compile( - "^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" + "^.*?(\\.kafkaCluster\\.(?.+))?\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" + "|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$"); for (var vertexInfo : topology.getVertexInfos().values()) { if (vertexInfo.getInputs().isEmpty()) { @@ -281,12 +281,14 @@ private void updateKafkaPulsarSourceNumPartitions( Matcher matcher = partitionRegex.matcher(v); if (matcher.matches()) { String kafkaTopic = matcher.group("kafkaTopic"); + String kafkaCluster = + matcher.group("kafkaCluster"); String kafkaId = matcher.group("kafkaId"); String pulsarTopic = matcher.group("pulsarTopic"); String pulsarId = matcher.group("pulsarId"); return kafkaTopic != null - ? kafkaTopic + "-" + kafkaId + ? kafkaCluster + "-" + kafkaTopic + "-" + kafkaId : pulsarTopic + "-" + pulsarId; } return null; diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java index 1176075f9c..5bca303f98 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -258,6 +258,20 @@ public void testKafkaPulsarNumPartitions() throws Exception { collectedMetrics = metricsCollector.updateMetrics(context, stateStore); assertEquals(5, collectedMetrics.getJobTopology().get(source1).getNumSourcePartitions()); + metricsCollector.setMetricNames( + Map.of( + source1, + List.of( + "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-1.KafkaSourceReader.topic.testTopic.partition.0.currentOffset", + "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-1.KafkaSourceReader.topic.anotherTopic.partition.0.currentOffset", + "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.0.currentOffset", + "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.1.currentOffset", + "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.2.currentOffset", + "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.3.currentOffset"))); + + collectedMetrics = metricsCollector.updateMetrics(context, stateStore); + assertEquals(6, collectedMetrics.getJobTopology().get(source1).getNumSourcePartitions()); + metricsCollector.setMetricNames( Map.of( source2, From da26fdf89595ab0d51d31ee0e08bad1f609bb866 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Thu, 18 Sep 2025 15:54:57 +0200 Subject: [PATCH 2/2] fixup! spotless --- .../org/apache/flink/autoscaler/ScalingMetricCollector.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java index b59d7bdd46..310e063b5b 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java @@ -288,7 +288,11 @@ private void updateKafkaPulsarSourceNumPartitions( matcher.group("pulsarTopic"); String pulsarId = matcher.group("pulsarId"); return kafkaTopic != null - ? kafkaCluster + "-" + kafkaTopic + "-" + kafkaId + ? kafkaCluster + + "-" + + kafkaTopic + + "-" + + kafkaId : pulsarTopic + "-" + pulsarId; } return null;