Skip to content

Commit c1f7a11

Browse files
authored
Change the ListRebalanceProcessProcedure result from String[] to row[] (apache#2341)
Signed-off-by: Pei Yu <125331682@qq.com>
1 parent c9370ae commit c1f7a11

File tree

5 files changed

+210
-50
lines changed

5 files changed

+210
-50
lines changed

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgress.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.metadata.TableBucket;
2121

22+
import java.text.NumberFormat;
2223
import java.util.Map;
2324

2425
import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -72,4 +73,13 @@ public double progress() {
7273
public Map<TableBucket, RebalanceResultForBucket> progressForBucketMap() {
7374
return progressForBucketMap;
7475
}
76+
77+
public String formatAsPercentage() {
78+
if (progress < 0) {
79+
return "NONE";
80+
}
81+
NumberFormat pctFormat = NumberFormat.getPercentInstance();
82+
pctFormat.setMaximumFractionDigits(2);
83+
return pctFormat.format(progress);
84+
}
7585
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.cluster.rebalance;
19+
20+
import org.apache.fluss.metadata.TableBucket;
21+
import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
22+
import org.apache.fluss.utils.json.JsonSerializer;
23+
24+
import java.io.IOException;
25+
import java.util.Map;
26+
27+
/** Json serializer for {@link RebalanceProgress}. */
28+
public class RebalanceProgressJsonSerializer implements JsonSerializer<RebalanceProgress> {
29+
30+
public static final RebalanceProgressJsonSerializer INSTANCE =
31+
new RebalanceProgressJsonSerializer();
32+
33+
private static final String REBALANCE_ID = "rebalance_id";
34+
private static final String REBALANCE_STATUS = "rebalance_status";
35+
private static final String PROGRESS = "progress";
36+
private static final String PROGRESS_FOR_BUCKETS = "progress_for_buckets";
37+
38+
private static final String TABLE_ID = "table_id";
39+
private static final String PARTITION_ID = "partition_id";
40+
private static final String BUCKET_ID = "bucket_id";
41+
private static final String ORIGINAL_LEADER = "original_leader";
42+
private static final String NEW_LEADER = "new_leader";
43+
private static final String ORIGIN_REPLICAS = "origin_replicas";
44+
private static final String NEW_REPLICAS = "new_replicas";
45+
46+
@Override
47+
public void serialize(RebalanceProgress rebalanceProgress, JsonGenerator generator)
48+
throws IOException {
49+
generator.writeStartObject();
50+
51+
generator.writeStringField(REBALANCE_ID, rebalanceProgress.rebalanceId());
52+
generator.writeNumberField(REBALANCE_STATUS, rebalanceProgress.status().getCode());
53+
generator.writeStringField(PROGRESS, rebalanceProgress.formatAsPercentage());
54+
55+
Map<TableBucket, RebalanceResultForBucket> resultForBucketMap =
56+
rebalanceProgress.progressForBucketMap();
57+
58+
// RebalanceProgress.progressForBucketMap
59+
generator.writeArrayFieldStart(PROGRESS_FOR_BUCKETS);
60+
for (RebalanceResultForBucket rebalanceResultForBucket : resultForBucketMap.values()) {
61+
TableBucket tableBucket = rebalanceResultForBucket.tableBucket();
62+
RebalancePlanForBucket plan = rebalanceResultForBucket.plan();
63+
64+
// RebalanceResultForBucket.plan
65+
generator.writeStartObject();
66+
generator.writeNumberField(TABLE_ID, tableBucket.getTableId());
67+
generator.writeNumberField(BUCKET_ID, tableBucket.getBucket());
68+
Long partitionId = tableBucket.getPartitionId();
69+
if (null != partitionId) {
70+
generator.writeNumberField(PARTITION_ID, partitionId);
71+
}
72+
generator.writeNumberField(ORIGINAL_LEADER, plan.getOriginalLeader());
73+
generator.writeNumberField(NEW_LEADER, plan.getNewLeader());
74+
generator.writeArrayFieldStart(ORIGIN_REPLICAS);
75+
for (Integer replica : plan.getOriginReplicas()) {
76+
generator.writeNumber(replica);
77+
}
78+
generator.writeEndArray();
79+
generator.writeArrayFieldStart(NEW_REPLICAS);
80+
for (Integer replica : plan.getNewReplicas()) {
81+
generator.writeNumber(replica);
82+
}
83+
generator.writeEndArray();
84+
85+
// RebalanceResultForBucket.rebalanceStatus
86+
generator.writeNumberField(
87+
REBALANCE_STATUS, rebalanceResultForBucket.status().getCode());
88+
89+
generator.writeEndObject();
90+
}
91+
generator.writeEndArray();
92+
93+
generator.writeEndObject();
94+
}
95+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.cluster.rebalance;
19+
20+
import org.apache.fluss.metadata.TableBucket;
21+
import org.apache.fluss.utils.json.JsonSerdeUtils;
22+
23+
import org.junit.jupiter.api.Test;
24+
25+
import java.nio.charset.StandardCharsets;
26+
import java.util.Arrays;
27+
import java.util.HashMap;
28+
import java.util.Map;
29+
30+
import static org.assertj.core.api.Assertions.assertThat;
31+
32+
/** Test for {@link RebalanceProgressJsonSerializer}. */
33+
public class RebalanceProgressJsonSerializerTest {
34+
35+
@Test
36+
public void testSerializer() {
37+
String serialize =
38+
new String(
39+
JsonSerdeUtils.writeValueAsBytes(
40+
createProgressObj(), RebalanceProgressJsonSerializer.INSTANCE),
41+
StandardCharsets.UTF_8);
42+
assertThat(serialize).isEqualTo(createProgressJson());
43+
}
44+
45+
private RebalanceProgress createProgressObj() {
46+
Map<TableBucket, RebalanceResultForBucket> progressForBucketMap = new HashMap<>();
47+
progressForBucketMap.put(
48+
new TableBucket(0L, 0),
49+
RebalanceResultForBucket.of(
50+
new RebalancePlanForBucket(
51+
new TableBucket(0L, 0),
52+
0,
53+
3,
54+
Arrays.asList(0, 1, 2),
55+
Arrays.asList(3, 4, 5)),
56+
RebalanceStatus.COMPLETED));
57+
progressForBucketMap.put(
58+
new TableBucket(1L, 0L, 0),
59+
RebalanceResultForBucket.of(
60+
new RebalancePlanForBucket(
61+
new TableBucket(1L, 0L, 0),
62+
0,
63+
3,
64+
Arrays.asList(0, 1, 2),
65+
Arrays.asList(3, 4, 5)),
66+
RebalanceStatus.COMPLETED));
67+
return new RebalanceProgress(
68+
"rebalance-task-21jd", RebalanceStatus.COMPLETED, 1d, progressForBucketMap);
69+
}
70+
71+
private String createProgressJson() {
72+
return "{\"rebalance_id\":\"rebalance-task-21jd\",\"rebalance_status\":3,\"progress\":\"100%\",\"progress_for_buckets\":"
73+
+ "[{\"table_id\":1,\"bucket_id\":0,\"partition_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5],\"rebalance_status\":3},"
74+
+ "{\"table_id\":0,\"bucket_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5],\"rebalance_status\":3}]}";
75+
}
76+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ListRebalanceProcessProcedure.java

Lines changed: 21 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,18 @@
1919

2020
import org.apache.fluss.client.admin.Admin;
2121
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
22-
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
23-
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
24-
import org.apache.fluss.metadata.TableBucket;
22+
import org.apache.fluss.cluster.rebalance.RebalanceProgressJsonSerializer;
23+
import org.apache.fluss.utils.json.JsonSerdeUtils;
2524

2625
import org.apache.flink.table.annotation.ArgumentHint;
2726
import org.apache.flink.table.annotation.DataTypeHint;
2827
import org.apache.flink.table.annotation.ProcedureHint;
2928
import org.apache.flink.table.procedure.ProcedureContext;
29+
import org.apache.flink.types.Row;
3030

3131
import javax.annotation.Nullable;
3232

33-
import java.text.NumberFormat;
34-
import java.util.ArrayList;
35-
import java.util.List;
36-
import java.util.Map;
33+
import java.nio.charset.StandardCharsets;
3734
import java.util.Optional;
3835

3936
/**
@@ -60,41 +57,26 @@ public class ListRebalanceProcessProcedure extends ProcedureBase {
6057
name = "rebalanceId",
6158
type = @DataTypeHint("STRING"),
6259
isOptional = true)
63-
})
64-
public String[] call(ProcedureContext context, @Nullable String rebalanceId) throws Exception {
60+
},
61+
output =
62+
@DataTypeHint(
63+
"ROW<rebalance_id STRING, rebalance_status STRING, rebalance_progress STRING, rebalance_plan STRING>"))
64+
public Row[] call(ProcedureContext context, @Nullable String rebalanceId) throws Exception {
6565
Optional<RebalanceProgress> progressOpt = admin.listRebalanceProgress(rebalanceId).get();
6666

6767
if (!progressOpt.isPresent()) {
68-
return new String[0];
68+
return new Row[0];
6969
}
70-
71-
return progressToString(progressOpt.get());
72-
}
73-
74-
private static String[] progressToString(RebalanceProgress progress) {
75-
RebalanceStatus status = progress.status();
76-
double rebalanceProgress = progress.progress();
77-
Map<TableBucket, RebalanceResultForBucket> bucketMap = progress.progressForBucketMap();
78-
79-
// TODO format the result into a row type, and the detail progress for bucket show in json
80-
// format. Trace by: https://github.com/apache/fluss/issues/2325
81-
List<String> result = new ArrayList<>();
82-
result.add("Rebalance id: " + progress.rebalanceId());
83-
result.add("Reblance total status: " + status);
84-
result.add("Rebalance progress: " + formatAsPercentage(rebalanceProgress));
85-
result.add("Rebalance detail progress for bucket:");
86-
for (RebalanceResultForBucket resultForBucket : bucketMap.values()) {
87-
result.add(resultForBucket.toString());
88-
}
89-
return result.toArray(new String[0]);
90-
}
91-
92-
public static String formatAsPercentage(double value) {
93-
if (value < 0) {
94-
return "NONE";
95-
}
96-
NumberFormat pctFormat = NumberFormat.getPercentInstance();
97-
pctFormat.setMaximumFractionDigits(2);
98-
return pctFormat.format(value);
70+
RebalanceProgress progress = progressOpt.get();
71+
return new Row[] {
72+
Row.of(
73+
progress.rebalanceId(),
74+
progress.status(),
75+
progress.formatAsPercentage(),
76+
new String(
77+
JsonSerdeUtils.writeValueAsBytes(
78+
progress, RebalanceProgressJsonSerializer.INSTANCE),
79+
StandardCharsets.UTF_8))
80+
};
9981
}
10082
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.client.ConnectionFactory;
2222
import org.apache.fluss.client.admin.Admin;
2323
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
24+
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
2425
import org.apache.fluss.cluster.rebalance.ServerTag;
2526
import org.apache.fluss.config.ConfigOptions;
2627
import org.apache.fluss.config.Configuration;
@@ -767,17 +768,13 @@ void testListRebalanceProgress() throws Exception {
767768
"Call %s.sys.list_rebalance('%s')",
768769
CATALOG_NAME, progress.rebalanceId()))
769770
.collect()) {
770-
List<String> listProgressResult =
771-
CollectionUtil.iteratorToList(rows).stream()
772-
.map(Row::toString)
773-
.collect(Collectors.toList());
774-
assertThat(listProgressResult.get(0)).startsWith("+I[Rebalance id:");
775-
assertThat(listProgressResult.get(1))
776-
.isEqualTo("+I[Reblance total status: COMPLETED]");
777-
assertThat(listProgressResult.get(2))
778-
.isEqualTo("+I[Rebalance progress: 100%]");
779-
assertThat(listProgressResult.get(3))
780-
.isEqualTo("+I[Rebalance detail progress for bucket:]");
771+
List<Row> listProgressResult = CollectionUtil.iteratorToList(rows);
772+
Row row = listProgressResult.get(0);
773+
assertThat(row.getArity()).isEqualTo(4);
774+
assertThat(row.getField(0)).isEqualTo(progress.rebalanceId());
775+
assertThat(row.getField(1)).isEqualTo(RebalanceStatus.COMPLETED);
776+
assertThat((String) row.getField(2)).endsWith("%");
777+
assertThat((String) row.getField(3)).startsWith("{\"rebalance_id\":");
781778
}
782779
});
783780
}

0 commit comments

Comments
 (0)