diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java index ee3ded9e0b..310e063b5b 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java @@ -269,7 +269,7 @@ private void updateKafkaPulsarSourceNumPartitions( try (var restClient = ctx.getRestClusterClient()) { Pattern partitionRegex = Pattern.compile( - "^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" + "^.*?(\\.kafkaCluster\\.(?.+))?\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" + "|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$"); for (var vertexInfo : topology.getVertexInfos().values()) { if (vertexInfo.getInputs().isEmpty()) { @@ -281,12 +281,18 @@ private void updateKafkaPulsarSourceNumPartitions( Matcher matcher = partitionRegex.matcher(v); if (matcher.matches()) { String kafkaTopic = matcher.group("kafkaTopic"); + String kafkaCluster = + matcher.group("kafkaCluster"); String kafkaId = matcher.group("kafkaId"); String pulsarTopic = matcher.group("pulsarTopic"); String pulsarId = matcher.group("pulsarId"); return kafkaTopic != null - ? kafkaTopic + "-" + kafkaId + ? kafkaCluster + + "-" + + kafkaTopic + + "-" + + kafkaId : pulsarTopic + "-" + pulsarId; } return null; diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java index 1176075f9c..5bca303f98 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -258,6 +258,20 @@ public void testKafkaPulsarNumPartitions() throws Exception { collectedMetrics = metricsCollector.updateMetrics(context, stateStore); assertEquals(5, collectedMetrics.getJobTopology().get(source1).getNumSourcePartitions()); + metricsCollector.setMetricNames( + Map.of( + source1, + List.of( + "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-1.KafkaSourceReader.topic.testTopic.partition.0.currentOffset", + "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-1.KafkaSourceReader.topic.anotherTopic.partition.0.currentOffset", + "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.0.currentOffset", + "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.1.currentOffset", + "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.2.currentOffset", + "1.Source__Kafka_Source_(testTopic).kafkaCluster.my-cluster-2.KafkaSourceReader.topic.testTopic.partition.3.currentOffset"))); + + collectedMetrics = metricsCollector.updateMetrics(context, stateStore); + assertEquals(6, collectedMetrics.getJobTopology().get(source1).getNumSourcePartitions()); + metricsCollector.setMetricNames( Map.of( source2,