Skip to content

Commit d477f94

Browse files
committed
[server] Support list rebalance process
1 parent 4179e38 commit d477f94

File tree

13 files changed

+298
-35
lines changed

13 files changed

+298
-35
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.client.metadata.LakeSnapshot;
2424
import org.apache.fluss.cluster.ServerNode;
2525
import org.apache.fluss.cluster.rebalance.GoalType;
26+
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
2627
import org.apache.fluss.cluster.rebalance.ServerTag;
2728
import org.apache.fluss.config.ConfigOptions;
2829
import org.apache.fluss.config.cluster.AlterConfig;

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.cluster.Cluster;
2626
import org.apache.fluss.cluster.ServerNode;
2727
import org.apache.fluss.cluster.rebalance.GoalType;
28+
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
2829
import org.apache.fluss.cluster.rebalance.ServerTag;
2930
import org.apache.fluss.config.cluster.AlterConfig;
3031
import org.apache.fluss.config.cluster.ConfigEntry;
@@ -70,6 +71,7 @@
7071
import org.apache.fluss.rpc.messages.ListDatabasesResponse;
7172
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
7273
import org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
74+
import org.apache.fluss.rpc.messages.ListRebalanceProgressRequest;
7375
import org.apache.fluss.rpc.messages.ListTablesRequest;
7476
import org.apache.fluss.rpc.messages.ListTablesResponse;
7577
import org.apache.fluss.rpc.messages.PbAlterConfig;
@@ -562,7 +564,9 @@ public CompletableFuture<RebalancePlan> rebalance(
562564

563565
@Override
564566
public CompletableFuture<RebalanceProgress> listRebalanceProgress() {
565-
throw new UnsupportedOperationException("Support soon");
567+
ListRebalanceProgressRequest request = new ListRebalanceProgressRequest();
568+
return gateway.listRebalanceProgress(request)
569+
.thenApply(ClientRpcMessageUtils::toRebalanceProgress);
566570
}
567571

568572
@Override

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import org.apache.fluss.client.write.KvWriteBatch;
2828
import org.apache.fluss.client.write.ReadyWriteBatch;
2929
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
30+
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
31+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
32+
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
3033
import org.apache.fluss.config.cluster.AlterConfigOpType;
3134
import org.apache.fluss.config.cluster.ColumnPositionType;
3235
import org.apache.fluss.config.cluster.ConfigEntry;
@@ -48,6 +51,7 @@
4851
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotResponse;
4952
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
5053
import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
54+
import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse;
5155
import org.apache.fluss.rpc.messages.LookupRequest;
5256
import org.apache.fluss.rpc.messages.MetadataRequest;
5357
import org.apache.fluss.rpc.messages.PbAddColumn;
@@ -65,6 +69,8 @@
6569
import org.apache.fluss.rpc.messages.PbPutKvReqForBucket;
6670
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
6771
import org.apache.fluss.rpc.messages.PbRebalancePlanForTable;
72+
import org.apache.fluss.rpc.messages.PbRebalanceProgressForBucket;
73+
import org.apache.fluss.rpc.messages.PbRebalanceProgressForTable;
6874
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
6975
import org.apache.fluss.rpc.messages.PbRenameColumn;
7076
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
@@ -85,6 +91,7 @@
8591
import java.util.Set;
8692
import java.util.stream.Collectors;
8793

94+
import static org.apache.fluss.cluster.rebalance.RebalanceUtils.FINAL_STATUSES;
8895
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec;
8996
import static org.apache.fluss.utils.Preconditions.checkState;
9097

@@ -380,28 +387,59 @@ public static RebalancePlan toRebalancePlan(RebalanceResponse response) {
380387
for (PbRebalancePlanForTable pbTable : response.getTablePlansList()) {
381388
long tableId = pbTable.getTableId();
382389
for (PbRebalancePlanForBucket pbBucket : pbTable.getBucketsPlansList()) {
383-
TableBucket tableBucket =
384-
new TableBucket(
385-
tableId,
386-
pbBucket.hasPartitionId() ? pbBucket.getPartitionId() : null,
387-
pbBucket.getBucketId());
388-
rebalancePlan.put(
389-
tableBucket,
390-
new RebalancePlanForBucket(
391-
tableBucket,
392-
pbBucket.getOriginalLeader(),
393-
pbBucket.getNewLeader(),
394-
Arrays.stream(pbBucket.getOriginalReplicas())
395-
.boxed()
396-
.collect(Collectors.toList()),
397-
Arrays.stream(pbBucket.getNewReplicas())
398-
.boxed()
399-
.collect(Collectors.toList())));
390+
RebalancePlanForBucket planForBucket = toRebalancePlanForBucket(tableId, pbBucket);
391+
rebalancePlan.put(planForBucket.getTableBucket(), planForBucket);
400392
}
401393
}
402394
return new RebalancePlan(rebalancePlan);
403395
}
404396

397+
public static RebalanceProgress toRebalanceProgress(ListRebalanceProgressResponse response) {
398+
RebalanceStatus totalRebalanceStatus = RebalanceStatus.of(response.getRebalanceStatus());
399+
int totalTask = 0;
400+
int finishedTask = 0;
401+
Map<TableBucket, RebalanceResultForBucket> rebalanceProgress = new HashMap<>();
402+
for (PbRebalanceProgressForTable pbTable : response.getTableProgressesList()) {
403+
long tableId = pbTable.getTableId();
404+
for (PbRebalanceProgressForBucket pbBucket : pbTable.getBucketsProgressesList()) {
405+
RebalanceStatus bucketStatus = RebalanceStatus.of(pbBucket.getRebalanceStatus());
406+
RebalancePlanForBucket planForBucket =
407+
toRebalancePlanForBucket(tableId, pbBucket.getRebalancePlan());
408+
rebalanceProgress.put(
409+
planForBucket.getTableBucket(),
410+
new RebalanceResultForBucket(planForBucket, bucketStatus));
411+
if (FINAL_STATUSES.contains(bucketStatus)) {
412+
finishedTask++;
413+
}
414+
totalTask++;
415+
}
416+
}
417+
418+
double progress = -1d;
419+
if (totalTask != 0) {
420+
progress = (double) finishedTask / totalTask;
421+
}
422+
423+
return new RebalanceProgress(totalRebalanceStatus, progress, rebalanceProgress);
424+
}
425+
426+
private static RebalancePlanForBucket toRebalancePlanForBucket(
427+
long tableId, PbRebalancePlanForBucket rebalancePlan) {
428+
TableBucket tableBucket =
429+
new TableBucket(
430+
tableId,
431+
rebalancePlan.hasPartitionId() ? rebalancePlan.getPartitionId() : null,
432+
rebalancePlan.getBucketId());
433+
return new RebalancePlanForBucket(
434+
tableBucket,
435+
rebalancePlan.getOriginalLeader(),
436+
rebalancePlan.getNewLeader(),
437+
Arrays.stream(rebalancePlan.getOriginalReplicas())
438+
.boxed()
439+
.collect(Collectors.toList()),
440+
Arrays.stream(rebalancePlan.getNewReplicas()).boxed().collect(Collectors.toList()));
441+
}
442+
405443
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
406444
return response.getPartitionsInfosList().stream()
407445
.map(

fluss-client/src/test/java/org/apache/fluss/client/admin/RebalanceITCase.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,16 @@
2020
import org.apache.fluss.client.Connection;
2121
import org.apache.fluss.client.ConnectionFactory;
2222
import org.apache.fluss.cluster.rebalance.GoalType;
23+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
24+
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
25+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
2326
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
2427
import org.apache.fluss.cluster.rebalance.ServerTag;
2528
import org.apache.fluss.config.ConfigOptions;
2629
import org.apache.fluss.config.Configuration;
2730
import org.apache.fluss.metadata.DatabaseDescriptor;
2831
import org.apache.fluss.metadata.PartitionSpec;
32+
import org.apache.fluss.metadata.TableBucket;
2933
import org.apache.fluss.metadata.TableDescriptor;
3034
import org.apache.fluss.metadata.TablePath;
3135
import org.apache.fluss.server.replica.ReplicaManager;
@@ -40,6 +44,7 @@
4044
import java.time.Duration;
4145
import java.util.Arrays;
4246
import java.util.Collections;
47+
import java.util.Map;
4348
import java.util.Optional;
4449

4550
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
@@ -203,6 +208,70 @@ void testRebalanceForLogTable() throws Exception {
203208
});
204209
}
205210

211+
@Test
212+
void testListRebalanceProcess() throws Exception {
213+
RebalanceProgress rebalanceProgress = admin.listRebalanceProgress().get();
214+
assertThat(rebalanceProgress.progress()).isEqualTo(-1d);
215+
assertThat(rebalanceProgress.status()).isEqualTo(RebalanceStatus.NO_TASK);
216+
assertThat(rebalanceProgress.progressForBucketMap()).isEmpty();
217+
218+
String dbName = "db-rebalance-list";
219+
admin.createDatabase(dbName, DatabaseDescriptor.EMPTY, false).get();
220+
221+
TableDescriptor logDescriptor =
222+
TableDescriptor.builder()
223+
.schema(DATA1_SCHEMA)
224+
.distributedBy(3)
225+
.property(
226+
ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key(),
227+
"true")
228+
.build();
229+
// create some none partitioned log table.
230+
for (int i = 0; i < 6; i++) {
231+
long tableId =
232+
createTable(
233+
new TablePath(dbName, "test-rebalance_table-" + i),
234+
logDescriptor,
235+
false);
236+
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
237+
}
238+
239+
// trigger rebalance with goal set[ReplicaDistributionGoal, LeaderReplicaDistributionGoal]
240+
org.apache.fluss.client.admin.RebalancePlan rebalancePlan =
241+
admin.rebalance(
242+
Arrays.asList(
243+
GoalType.REPLICA_DISTRIBUTION_GOAL,
244+
GoalType.LEADER_DISTRIBUTION_GOAL),
245+
false)
246+
.get();
247+
retry(
248+
Duration.ofMinutes(2),
249+
() -> {
250+
RebalanceProgress progress = admin.listRebalanceProgress().get();
251+
assertThat(progress.progress()).isEqualTo(1d);
252+
assertThat(progress.status()).isEqualTo(RebalanceStatus.COMPLETED);
253+
Map<TableBucket, RebalanceResultForBucket> processForBuckets =
254+
progress.progressForBucketMap();
255+
Map<TableBucket, RebalancePlanForBucket> planForBuckets =
256+
rebalancePlan.getPlanForBucketMap();
257+
assertThat(planForBuckets.size()).isEqualTo(processForBuckets.size());
258+
for (TableBucket tableBucket : planForBuckets.keySet()) {
259+
RebalanceResultForBucket processForBucket =
260+
processForBuckets.get(tableBucket);
261+
assertThat(processForBucket.status()).isEqualTo(RebalanceStatus.COMPLETED);
262+
assertThat(processForBucket.plan())
263+
.isEqualTo(planForBuckets.get(tableBucket));
264+
}
265+
});
266+
267+
// cancel rebalance.
268+
admin.cancelRebalance().get();
269+
270+
RebalanceProgress progress = admin.listRebalanceProgress().get();
271+
assertThat(progress.progress()).isEqualTo(1d);
272+
assertThat(progress.status()).isEqualTo(RebalanceStatus.CANCELED);
273+
}
274+
206275
private static Configuration initConfig() {
207276
Configuration configuration = new Configuration();
208277
configuration.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);

fluss-client/src/main/java/org/apache/fluss/client/admin/RebalanceProgress.java renamed to fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgress.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.fluss.client.admin;
18+
package org.apache.fluss.cluster.rebalance;
1919

20-
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
21-
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
2220
import org.apache.fluss.metadata.TableBucket;
2321

2422
import java.util.Map;

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatus.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
*/
2727
@PublicEvolving
2828
public enum RebalanceStatus {
29+
NO_TASK(0),
2930
NOT_STARTED(1),
3031
REBALANCING(2),
3132
FAILED(3),
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.cluster.rebalance;
19+
20+
import java.util.Arrays;
21+
import java.util.HashSet;
22+
import java.util.Set;
23+
24+
import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED;
25+
import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED;
26+
import static org.apache.fluss.cluster.rebalance.RebalanceStatus.FAILED;
27+
28+
/** Rebalance utils. */
29+
public class RebalanceUtils {
30+
public static final Set<RebalanceStatus> FINAL_STATUSES =
31+
new HashSet<>(Arrays.asList(COMPLETED, CANCELED, FAILED));
32+
}

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,8 @@ message ListRebalanceProgressRequest {
603603
}
604604

605605
message ListRebalanceProgressResponse {
606-
repeated PbRebalanceProgressForTable table_progress = 1;
606+
required int32 rebalance_status = 1;
607+
repeated PbRebalanceProgressForTable table_progress = 2;
607608
}
608609

609610
message CancelRebalanceRequest {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.cluster.ServerNode;
2323
import org.apache.fluss.cluster.ServerType;
2424
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
25+
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
2526
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
2627
import org.apache.fluss.cluster.rebalance.ServerTag;
2728
import org.apache.fluss.config.ConfigOptions;
@@ -53,6 +54,7 @@
5354
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
5455
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
5556
import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
57+
import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse;
5658
import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable;
5759
import org.apache.fluss.rpc.messages.RebalanceResponse;
5860
import org.apache.fluss.rpc.messages.RemoveServerTagResponse;
@@ -75,6 +77,7 @@
7577
import org.apache.fluss.server.coordinator.event.DropTableEvent;
7678
import org.apache.fluss.server.coordinator.event.EventProcessor;
7779
import org.apache.fluss.server.coordinator.event.FencedCoordinatorEvent;
80+
import org.apache.fluss.server.coordinator.event.ListRebalanceProgressEvent;
7881
import org.apache.fluss.server.coordinator.event.NewTabletServerEvent;
7982
import org.apache.fluss.server.coordinator.event.NotifyKvSnapshotOffsetEvent;
8083
import org.apache.fluss.server.coordinator.event.NotifyLakeTableOffsetEvent;
@@ -142,6 +145,7 @@
142145
import static org.apache.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionStarted;
143146
import static org.apache.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionSuccessful;
144147
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeAdjustIsrResponse;
148+
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeListRebalanceProgressResponse;
145149
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeRebalanceRespose;
146150
import static org.apache.fluss.utils.concurrent.FutureUtils.completeFromCallable;
147151

@@ -613,6 +617,12 @@ public void process(CoordinatorEvent event) {
613617
CancelRebalanceEvent cancelRebalanceEvent = (CancelRebalanceEvent) event;
614618
completeFromCallable(
615619
cancelRebalanceEvent.getRespCallback(), this::processCancelRebalance);
620+
} else if (event instanceof ListRebalanceProgressEvent) {
621+
ListRebalanceProgressEvent listRebalanceProgressEvent =
622+
(ListRebalanceProgressEvent) event;
623+
completeFromCallable(
624+
listRebalanceProgressEvent.getRespCallback(),
625+
this::processListRebalanceProgress);
616626
} else if (event instanceof AccessContextEvent) {
617627
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
618628
processAccessContext(accessContextEvent);
@@ -1162,6 +1172,11 @@ private CancelRebalanceResponse processCancelRebalance() {
11621172
return response;
11631173
}
11641174

1175+
private ListRebalanceProgressResponse processListRebalanceProgress() {
1176+
RebalanceProgress rebalanceProgress = rebalanceManager.listRebalanceProgress();
1177+
return makeListRebalanceProgressResponse(rebalanceProgress);
1178+
}
1179+
11651180
/**
11661181
* This method can be trigger by:
11671182
*

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
115115
import org.apache.fluss.server.coordinator.event.ControlledShutdownEvent;
116116
import org.apache.fluss.server.coordinator.event.EventManager;
117+
import org.apache.fluss.server.coordinator.event.ListRebalanceProgressEvent;
117118
import org.apache.fluss.server.coordinator.event.RebalanceEvent;
118119
import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
119120
import org.apache.fluss.server.coordinator.rebalance.goal.Goal;
@@ -831,7 +832,9 @@ public CompletableFuture<RebalanceResponse> rebalance(RebalanceRequest request)
831832
@Override
832833
public CompletableFuture<ListRebalanceProgressResponse> listRebalanceProgress(
833834
ListRebalanceProgressRequest request) {
834-
throw new UnsupportedOperationException("Support soon!");
835+
CompletableFuture<ListRebalanceProgressResponse> response = new CompletableFuture<>();
836+
eventManagerSupplier.get().put(new ListRebalanceProgressEvent(response));
837+
return response;
835838
}
836839

837840
@Override
@@ -840,9 +843,6 @@ public CompletableFuture<CancelRebalanceResponse> cancelRebalance(
840843
CompletableFuture<CancelRebalanceResponse> response = new CompletableFuture<>();
841844
eventManagerSupplier.get().put(new CancelRebalanceEvent(response));
842845
return response;
843-
844-
// rebalanceManagerSupplier.get().cancelRebalance();
845-
// return CompletableFuture.completedFuture(new CancelRebalanceResponse());
846846
}
847847

848848
@VisibleForTesting

0 commit comments

Comments
 (0)