Skip to content

Commit fc152fd

Browse files
committed
[server] Support list rebalance process
1 parent 6c6ae4e commit fc152fd

File tree

17 files changed

+441
-37
lines changed

17 files changed

+441
-37
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.client.metadata.LakeSnapshot;
2424
import org.apache.fluss.cluster.ServerNode;
2525
import org.apache.fluss.cluster.rebalance.GoalType;
26+
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
2627
import org.apache.fluss.cluster.rebalance.ServerTag;
2728
import org.apache.fluss.config.ConfigOptions;
2829
import org.apache.fluss.config.cluster.AlterConfig;

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.cluster.Cluster;
2626
import org.apache.fluss.cluster.ServerNode;
2727
import org.apache.fluss.cluster.rebalance.GoalType;
28+
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
2829
import org.apache.fluss.cluster.rebalance.ServerTag;
2930
import org.apache.fluss.config.cluster.AlterConfig;
3031
import org.apache.fluss.config.cluster.ConfigEntry;
@@ -70,6 +71,7 @@
7071
import org.apache.fluss.rpc.messages.ListDatabasesResponse;
7172
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
7273
import org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
74+
import org.apache.fluss.rpc.messages.ListRebalanceProgressRequest;
7375
import org.apache.fluss.rpc.messages.ListTablesRequest;
7476
import org.apache.fluss.rpc.messages.ListTablesResponse;
7577
import org.apache.fluss.rpc.messages.PbAlterConfig;
@@ -562,7 +564,9 @@ public CompletableFuture<RebalancePlan> rebalance(
562564

563565
@Override
564566
public CompletableFuture<RebalanceProgress> listRebalanceProgress() {
565-
throw new UnsupportedOperationException("Support soon");
567+
ListRebalanceProgressRequest request = new ListRebalanceProgressRequest();
568+
return gateway.listRebalanceProgress(request)
569+
.thenApply(ClientRpcMessageUtils::toRebalanceProgress);
566570
}
567571

568572
@Override

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import org.apache.fluss.client.write.KvWriteBatch;
2828
import org.apache.fluss.client.write.ReadyWriteBatch;
2929
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
30+
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
31+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
32+
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
3033
import org.apache.fluss.config.cluster.AlterConfigOpType;
3134
import org.apache.fluss.config.cluster.ColumnPositionType;
3235
import org.apache.fluss.config.cluster.ConfigEntry;
@@ -48,6 +51,7 @@
4851
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotResponse;
4952
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
5053
import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
54+
import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse;
5155
import org.apache.fluss.rpc.messages.LookupRequest;
5256
import org.apache.fluss.rpc.messages.MetadataRequest;
5357
import org.apache.fluss.rpc.messages.PbAddColumn;
@@ -65,6 +69,8 @@
6569
import org.apache.fluss.rpc.messages.PbPutKvReqForBucket;
6670
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
6771
import org.apache.fluss.rpc.messages.PbRebalancePlanForTable;
72+
import org.apache.fluss.rpc.messages.PbRebalanceProgressForBucket;
73+
import org.apache.fluss.rpc.messages.PbRebalanceProgressForTable;
6874
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
6975
import org.apache.fluss.rpc.messages.PbRenameColumn;
7076
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
@@ -85,6 +91,7 @@
8591
import java.util.Set;
8692
import java.util.stream.Collectors;
8793

94+
import static org.apache.fluss.cluster.rebalance.RebalanceUtils.FINAL_STATUSES;
8895
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec;
8996
import static org.apache.fluss.utils.Preconditions.checkState;
9097

@@ -380,28 +387,59 @@ public static RebalancePlan toRebalancePlan(RebalanceResponse response) {
380387
for (PbRebalancePlanForTable pbTable : response.getTablePlansList()) {
381388
long tableId = pbTable.getTableId();
382389
for (PbRebalancePlanForBucket pbBucket : pbTable.getBucketsPlansList()) {
383-
TableBucket tableBucket =
384-
new TableBucket(
385-
tableId,
386-
pbBucket.hasPartitionId() ? pbBucket.getPartitionId() : null,
387-
pbBucket.getBucketId());
388-
rebalancePlan.put(
389-
tableBucket,
390-
new RebalancePlanForBucket(
391-
tableBucket,
392-
pbBucket.getOriginalLeader(),
393-
pbBucket.getNewLeader(),
394-
Arrays.stream(pbBucket.getOriginalReplicas())
395-
.boxed()
396-
.collect(Collectors.toList()),
397-
Arrays.stream(pbBucket.getNewReplicas())
398-
.boxed()
399-
.collect(Collectors.toList())));
390+
RebalancePlanForBucket planForBucket = toRebalancePlanForBucket(tableId, pbBucket);
391+
rebalancePlan.put(planForBucket.getTableBucket(), planForBucket);
400392
}
401393
}
402394
return new RebalancePlan(rebalancePlan);
403395
}
404396

397+
public static RebalanceProgress toRebalanceProgress(ListRebalanceProgressResponse response) {
398+
RebalanceStatus totalRebalanceStatus = RebalanceStatus.of(response.getRebalanceStatus());
399+
int totalTask = 0;
400+
int finishedTask = 0;
401+
Map<TableBucket, RebalanceResultForBucket> rebalanceProgress = new HashMap<>();
402+
for (PbRebalanceProgressForTable pbTable : response.getTableProgressesList()) {
403+
long tableId = pbTable.getTableId();
404+
for (PbRebalanceProgressForBucket pbBucket : pbTable.getBucketsProgressesList()) {
405+
RebalanceStatus bucketStatus = RebalanceStatus.of(pbBucket.getRebalanceStatus());
406+
RebalancePlanForBucket planForBucket =
407+
toRebalancePlanForBucket(tableId, pbBucket.getRebalancePlan());
408+
rebalanceProgress.put(
409+
planForBucket.getTableBucket(),
410+
new RebalanceResultForBucket(planForBucket, bucketStatus));
411+
if (FINAL_STATUSES.contains(bucketStatus)) {
412+
finishedTask++;
413+
}
414+
totalTask++;
415+
}
416+
}
417+
418+
double progress = -1d;
419+
if (totalTask != 0) {
420+
progress = (double) finishedTask / totalTask;
421+
}
422+
423+
return new RebalanceProgress(totalRebalanceStatus, progress, rebalanceProgress);
424+
}
425+
426+
private static RebalancePlanForBucket toRebalancePlanForBucket(
427+
long tableId, PbRebalancePlanForBucket rebalancePlan) {
428+
TableBucket tableBucket =
429+
new TableBucket(
430+
tableId,
431+
rebalancePlan.hasPartitionId() ? rebalancePlan.getPartitionId() : null,
432+
rebalancePlan.getBucketId());
433+
return new RebalancePlanForBucket(
434+
tableBucket,
435+
rebalancePlan.getOriginalLeader(),
436+
rebalancePlan.getNewLeader(),
437+
Arrays.stream(rebalancePlan.getOriginalReplicas())
438+
.boxed()
439+
.collect(Collectors.toList()),
440+
Arrays.stream(rebalancePlan.getNewReplicas()).boxed().collect(Collectors.toList()));
441+
}
442+
405443
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
406444
return response.getPartitionsInfosList().stream()
407445
.map(

fluss-client/src/test/java/org/apache/fluss/client/admin/RebalanceITCase.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,16 @@
2020
import org.apache.fluss.client.Connection;
2121
import org.apache.fluss.client.ConnectionFactory;
2222
import org.apache.fluss.cluster.rebalance.GoalType;
23+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
24+
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
25+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
2326
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
2427
import org.apache.fluss.cluster.rebalance.ServerTag;
2528
import org.apache.fluss.config.ConfigOptions;
2629
import org.apache.fluss.config.Configuration;
2730
import org.apache.fluss.metadata.DatabaseDescriptor;
2831
import org.apache.fluss.metadata.PartitionSpec;
32+
import org.apache.fluss.metadata.TableBucket;
2933
import org.apache.fluss.metadata.TableDescriptor;
3034
import org.apache.fluss.metadata.TablePath;
3135
import org.apache.fluss.server.replica.ReplicaManager;
@@ -40,6 +44,7 @@
4044
import java.time.Duration;
4145
import java.util.Arrays;
4246
import java.util.Collections;
47+
import java.util.Map;
4348
import java.util.Optional;
4449

4550
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
@@ -66,6 +71,8 @@ protected void setup() throws Exception {
6671

6772
@AfterEach
6873
protected void teardown() throws Exception {
74+
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient().deleteRebalancePlan();
75+
6976
if (admin != null) {
7077
admin.close();
7178
admin = null;
@@ -203,6 +210,70 @@ void testRebalanceForLogTable() throws Exception {
203210
});
204211
}
205212

213+
@Test
214+
void testListRebalanceProcess() throws Exception {
215+
RebalanceProgress rebalanceProgress = admin.listRebalanceProgress().get();
216+
assertThat(rebalanceProgress.progress()).isEqualTo(-1d);
217+
assertThat(rebalanceProgress.status()).isEqualTo(RebalanceStatus.NO_TASK);
218+
assertThat(rebalanceProgress.progressForBucketMap()).isEmpty();
219+
220+
String dbName = "db-rebalance-list";
221+
admin.createDatabase(dbName, DatabaseDescriptor.EMPTY, false).get();
222+
223+
TableDescriptor logDescriptor =
224+
TableDescriptor.builder()
225+
.schema(DATA1_SCHEMA)
226+
.distributedBy(3)
227+
.property(
228+
ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key(),
229+
"true")
230+
.build();
231+
// create some none partitioned log table.
232+
for (int i = 0; i < 6; i++) {
233+
long tableId =
234+
createTable(
235+
new TablePath(dbName, "test-rebalance_table-" + i),
236+
logDescriptor,
237+
false);
238+
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
239+
}
240+
241+
// trigger rebalance with goal set[ReplicaDistributionGoal, LeaderReplicaDistributionGoal]
242+
org.apache.fluss.client.admin.RebalancePlan rebalancePlan =
243+
admin.rebalance(
244+
Arrays.asList(
245+
GoalType.REPLICA_DISTRIBUTION_GOAL,
246+
GoalType.LEADER_DISTRIBUTION_GOAL),
247+
false)
248+
.get();
249+
retry(
250+
Duration.ofMinutes(2),
251+
() -> {
252+
RebalanceProgress progress = admin.listRebalanceProgress().get();
253+
assertThat(progress.progress()).isEqualTo(1d);
254+
assertThat(progress.status()).isEqualTo(RebalanceStatus.COMPLETED);
255+
Map<TableBucket, RebalanceResultForBucket> processForBuckets =
256+
progress.progressForBucketMap();
257+
Map<TableBucket, RebalancePlanForBucket> planForBuckets =
258+
rebalancePlan.getPlanForBucketMap();
259+
assertThat(planForBuckets.size()).isEqualTo(processForBuckets.size());
260+
for (TableBucket tableBucket : planForBuckets.keySet()) {
261+
RebalanceResultForBucket processForBucket =
262+
processForBuckets.get(tableBucket);
263+
assertThat(processForBucket.status()).isEqualTo(RebalanceStatus.COMPLETED);
264+
assertThat(processForBucket.plan())
265+
.isEqualTo(planForBuckets.get(tableBucket));
266+
}
267+
});
268+
269+
// cancel rebalance.
270+
admin.cancelRebalance().get();
271+
272+
RebalanceProgress progress = admin.listRebalanceProgress().get();
273+
assertThat(progress.progress()).isEqualTo(1d);
274+
assertThat(progress.status()).isEqualTo(RebalanceStatus.CANCELED);
275+
}
276+
206277
private static Configuration initConfig() {
207278
Configuration configuration = new Configuration();
208279
configuration.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);

fluss-client/src/main/java/org/apache/fluss/client/admin/RebalanceProgress.java renamed to fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgress.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.fluss.client.admin;
18+
package org.apache.fluss.cluster.rebalance;
1919

20-
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
21-
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
2220
import org.apache.fluss.metadata.TableBucket;
2321

2422
import java.util.Map;

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatus.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
*/
2727
@PublicEvolving
2828
public enum RebalanceStatus {
29+
NO_TASK(0),
2930
NOT_STARTED(1),
3031
REBALANCING(2),
3132
FAILED(3),
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.cluster.rebalance;
19+
20+
import java.util.Arrays;
21+
import java.util.HashSet;
22+
import java.util.Set;
23+
24+
import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED;
25+
import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED;
26+
import static org.apache.fluss.cluster.rebalance.RebalanceStatus.FAILED;
27+
28+
/** Rebalance utils. */
29+
public class RebalanceUtils {
30+
public static final Set<RebalanceStatus> FINAL_STATUSES =
31+
new HashSet<>(Arrays.asList(COMPLETED, CANCELED, FAILED));
32+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.procedure;
19+
20+
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
21+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
22+
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
23+
import org.apache.fluss.metadata.TableBucket;
24+
25+
import org.apache.flink.table.annotation.ProcedureHint;
26+
import org.apache.flink.table.procedure.ProcedureContext;
27+
28+
import java.text.NumberFormat;
29+
import java.util.Map;
30+
31+
/** Procedure to list rebalance process. */
32+
public class ListRebalanceProcessProcedure extends ProcedureBase {
33+
34+
@ProcedureHint()
35+
public String[] call(ProcedureContext context) throws Exception {
36+
RebalanceProgress progress = admin.listRebalanceProgress().get();
37+
return progressToString(progress);
38+
}
39+
40+
private static String[] progressToString(RebalanceProgress progress) {
41+
RebalanceStatus status = progress.status();
42+
double rebalanceProgress = progress.progress();
43+
Map<TableBucket, RebalanceResultForBucket> bucketMap = progress.progressForBucketMap();
44+
45+
String[] result = new String[bucketMap.size() + 3];
46+
result[0] = "Reblance total status: " + status;
47+
result[1] = "Rebalance progress: " + formatAsPercentage(rebalanceProgress);
48+
result[2] = "Rebalance detail progress for bucket:";
49+
int i = 3;
50+
for (RebalanceResultForBucket resultForBucket : bucketMap.values()) {
51+
result[i++] = resultForBucket.toString();
52+
}
53+
return result;
54+
}
55+
56+
public static String formatAsPercentage(double value) {
57+
if (value < 0) {
58+
return "NONE";
59+
}
60+
NumberFormat pctFormat = NumberFormat.getPercentInstance();
61+
pctFormat.setMaximumFractionDigits(2);
62+
return pctFormat.format(value);
63+
}
64+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ private enum ProcedureEnum {
7575
ADD_SERVER_TAG("sys.add_server_tag", AddServerTagProcedure.class),
7676
REMOVE_SERVER_TAG("sys.remove_server_tag", RemoveServerTagProcedure.class),
7777
REBALANCE("sys.rebalance", RebalanceProcedure.class),
78-
CANCEL_REBALANCE("sys.cancel_rebalance", CancelRebalanceProcedure.class);
78+
CANCEL_REBALANCE("sys.cancel_rebalance", CancelRebalanceProcedure.class),
79+
LIST_REBALANCE_PROGRESS("sys.list_rebalance_progress", ListRebalanceProcessProcedure.class);
7980

8081
private final String path;
8182
private final Class<? extends ProcedureBase> procedureClass;

0 commit comments

Comments
 (0)