Skip to content

Commit 6eb1c03

Browse files
authored
[FLINK-35328] Autoscaler supports setting the maximum floating parallelism by the number of Pulsar partitions
1 parent c3e94ae commit 6eb1c03

File tree

2 files changed

+47
-7
lines changed

2 files changed

+47
-7
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,14 @@
5858
import java.util.HashSet;
5959
import java.util.List;
6060
import java.util.Map;
61+
import java.util.Objects;
6162
import java.util.Optional;
6263
import java.util.Set;
6364
import java.util.SortedMap;
6465
import java.util.concurrent.ConcurrentHashMap;
6566
import java.util.concurrent.TimeUnit;
6667
import java.util.function.Supplier;
68+
import java.util.regex.Matcher;
6769
import java.util.regex.Pattern;
6870
import java.util.stream.Collectors;
6971
import java.util.stream.Stream;
@@ -202,7 +204,7 @@ protected JobTopology getJobTopology(
202204

203205
Set<JobVertexID> vertexSet = Set.copyOf(t.getVerticesInTopologicalOrder());
204206
updateVertexList(stateStore, ctx, clock.instant(), vertexSet);
205-
updateKafkaSourceMaxParallelisms(ctx, jobDetailsInfo.getJobId(), t);
207+
updateKafkaPulsarSourceMaxParallelisms(ctx, jobDetailsInfo.getJobId(), t);
206208
excludeVerticesFromScaling(ctx.getConfiguration(), t.getFinishedVertices());
207209
return t;
208210
}
@@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) {
247249
json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished);
248250
}
249251

250-
private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology)
251-
throws Exception {
252+
private void updateKafkaPulsarSourceMaxParallelisms(
253+
Context ctx, JobID jobId, JobTopology topology) throws Exception {
252254
try (var restClient = ctx.getRestClusterClient()) {
253-
var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
255+
Pattern partitionRegex =
256+
Pattern.compile(
257+
"^.*\\.KafkaSourceReader\\.topic\\.(?<kafkaTopic>.+)\\.partition\\.(?<kafkaId>\\d+)\\.currentOffset$"
258+
+ "|^.*\\.PulsarConsumer\\.(?<pulsarTopic>.+)-partition-(?<pulsarId>\\d+)\\..*\\.numMsgsReceived$");
254259
for (var vertexInfo : topology.getVertexInfos().values()) {
255260
if (vertexInfo.getInputs().isEmpty()) {
256261
var sourceVertex = vertexInfo.getId();
257262
var numPartitions =
258263
queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream()
259-
.filter(partitionRegex.asMatchPredicate())
260-
.count();
264+
.map(
265+
v -> {
266+
Matcher matcher = partitionRegex.matcher(v);
267+
if (matcher.matches()) {
268+
String kafkaTopic = matcher.group("kafkaTopic");
269+
String kafkaId = matcher.group("kafkaId");
270+
String pulsarTopic =
271+
matcher.group("pulsarTopic");
272+
String pulsarId = matcher.group("pulsarId");
273+
return kafkaTopic != null
274+
? kafkaTopic + "-" + kafkaId
275+
: pulsarTopic + "-" + pulsarId;
276+
}
277+
return null;
278+
})
279+
.filter(Objects::nonNull)
280+
.collect(Collectors.toSet())
281+
.size();
261282
if (numPartitions > 0) {
262283
LOG.debug(
263284
"Updating source {} max parallelism based on available partitions to {}",

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ private void setDefaultMetrics(
232232
}
233233

234234
@Test
235-
public void testKafkaPartitionMaxParallelism() throws Exception {
235+
public void testKafkaPulsarPartitionMaxParallelism() throws Exception {
236236
setDefaultMetrics(metricsCollector);
237237
metricsCollector.updateMetrics(context, stateStore);
238238

@@ -261,6 +261,25 @@ public void testKafkaPartitionMaxParallelism() throws Exception {
261261
collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
262262
assertEquals(5, collectedMetrics.getJobTopology().get(source1).getMaxParallelism());
263263
assertEquals(720, collectedMetrics.getJobTopology().get(source2).getMaxParallelism());
264+
265+
metricsCollector.setMetricNames(
266+
Map.of(
267+
source2,
268+
List.of(
269+
"0.Source__pulsar_source[1].PulsarConsumer"
270+
+ ".persistent_//public/default/testTopic-partition-1.d842f.numMsgsReceived",
271+
"0.Source__pulsar_source[1].PulsarConsumer"
272+
+ ".persistent_//public/default/testTopic-partition-1.660d2.numMsgsReceived",
273+
"0.Source__pulsar_source[1].PulsarConsumer"
274+
+ ".persistent_//public/default/testTopic-partition-2.d356f.numMsgsReceived",
275+
"0.Source__pulsar_source[1].PulsarConsumer"
276+
+ ".persistent_//public/default/otherTopic-partition-2.m953d.numMsgsReceived",
277+
"0.Source__pulsar_source[1].PulsarConsumer"
278+
+ ".persistent_//public/default/testTopic-partition-3.e427h.numMsgsReceived",
279+
"0.Source__pulsar_source[1].PulsarConsumer"
280+
+ ".persistent_//public/default/testTopic-partition-4.m962n.numMsgsReceived")));
281+
collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
282+
assertEquals(5, collectedMetrics.getJobTopology().get(source2).getMaxParallelism());
264283
}
265284

266285
@Test

0 commit comments

Comments
 (0)