Skip to content

Commit 7addc72

Browse files
authored
[server] Support generate and execute reblance plan (#1452)
1 parent 38c2555 commit 7addc72

File tree

72 files changed

+6211
-299
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+6211
-299
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.client.metadata.LakeSnapshot;
2424
import org.apache.fluss.cluster.ServerNode;
2525
import org.apache.fluss.cluster.rebalance.GoalType;
26+
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
2627
import org.apache.fluss.cluster.rebalance.ServerTag;
2728
import org.apache.fluss.config.ConfigOptions;
2829
import org.apache.fluss.config.cluster.AlterConfig;
@@ -66,8 +67,11 @@
6667
import org.apache.fluss.security.acl.AclBinding;
6768
import org.apache.fluss.security.acl.AclBindingFilter;
6869

70+
import javax.annotation.Nullable;
71+
6972
import java.util.Collection;
7073
import java.util.List;
74+
import java.util.Optional;
7175
import java.util.concurrent.CompletableFuture;
7276

7377
/**
@@ -551,43 +555,53 @@ ListOffsetsResult listOffsets(
551555
* balancing according to the user-defined {@code priorityGoals}.
552556
*
553557
* <p>Currently, Fluss only supports one active rebalance task in the cluster. If an uncompleted
554-
* rebalance task exists, an {@link RebalanceFailureException} will be thrown.
558+
* rebalance task exists, Fluss will return the uncompleted rebalance task's progress.
559+
*
560+
* <p>If you want to cancel the rebalance task, you can use {@link #cancelRebalance(String)}
555561
*
556562
* <ul>
557563
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
558564
* permissions.
559-
* <li>{@link RebalanceFailureException} If the rebalance failed. Such as there is an ongoing
560-
* execution.
565+
* <li>{@link RebalanceFailureException} If the rebalance failed. Such as there is an
566+
* inProgress execution.
561567
* </ul>
562568
*
563569
* @param priorityGoals the goals to be optimized.
564-
* @param dryRun Calculate and return the rebalance optimization proposal, but do not execute
565-
* it.
566-
* @return the generated rebalance plan for all the tableBuckets which need to do rebalance.
570+
* @return the rebalance id. If there is no rebalance task in progress, it will trigger a new
571+
* rebalance task and return the rebalance id.
567572
*/
568-
CompletableFuture<RebalancePlan> rebalance(List<GoalType> priorityGoals, boolean dryRun);
573+
CompletableFuture<String> rebalance(List<GoalType> priorityGoals);
569574

570575
/**
571576
* List the rebalance progress.
572577
*
573578
* <ul>
574579
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
575580
* permissions.
576-
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
581+
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress for
582+
* the input rebalanceId.
577583
* </ul>
578584
*
585+
* @param rebalanceId the rebalance id to list progress, if it is null means list the in
586+
* progress rebalance task's.
579587
* @return the rebalance process.
580588
*/
581-
CompletableFuture<RebalanceProgress> listRebalanceProgress();
589+
CompletableFuture<Optional<RebalanceProgress>> listRebalanceProgress(
590+
@Nullable String rebalanceId);
582591

583592
/**
584593
* Cannel the rebalance task.
585594
*
586595
* <ul>
587596
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
588597
* permissions.
589-
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
598+
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress or
599+
* the rebalance id is not exists.
590600
* </ul>
601+
*
602+
* @param rebalanceId the rebalance id to cancel, if it is null means cancel the exists
603+
* rebalance task. If rebalanceId is not exists in server, {@link
604+
* NoRebalanceInProgressException} will be thrown.
591605
*/
592-
CompletableFuture<Void> cancelRebalance();
606+
CompletableFuture<Void> cancelRebalance(@Nullable String rebalanceId);
593607
}

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.cluster.Cluster;
2626
import org.apache.fluss.cluster.ServerNode;
2727
import org.apache.fluss.cluster.rebalance.GoalType;
28+
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
2829
import org.apache.fluss.cluster.rebalance.ServerTag;
2930
import org.apache.fluss.config.cluster.AlterConfig;
3031
import org.apache.fluss.config.cluster.ConfigEntry;
@@ -49,6 +50,7 @@
4950
import org.apache.fluss.rpc.messages.AddServerTagRequest;
5051
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
5152
import org.apache.fluss.rpc.messages.AlterTableRequest;
53+
import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
5254
import org.apache.fluss.rpc.messages.CreateAclsRequest;
5355
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
5456
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -69,12 +71,15 @@
6971
import org.apache.fluss.rpc.messages.ListDatabasesResponse;
7072
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
7173
import org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
74+
import org.apache.fluss.rpc.messages.ListRebalanceProgressRequest;
7275
import org.apache.fluss.rpc.messages.ListTablesRequest;
7376
import org.apache.fluss.rpc.messages.ListTablesResponse;
7477
import org.apache.fluss.rpc.messages.PbAlterConfig;
7578
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
7679
import org.apache.fluss.rpc.messages.PbPartitionSpec;
7780
import org.apache.fluss.rpc.messages.PbTablePath;
81+
import org.apache.fluss.rpc.messages.RebalanceRequest;
82+
import org.apache.fluss.rpc.messages.RebalanceResponse;
7883
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
7984
import org.apache.fluss.rpc.messages.TableExistsRequest;
8085
import org.apache.fluss.rpc.messages.TableExistsResponse;
@@ -91,6 +96,7 @@
9196
import java.util.HashMap;
9297
import java.util.List;
9398
import java.util.Map;
99+
import java.util.Optional;
94100
import java.util.concurrent.CompletableFuture;
95101

96102
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
@@ -551,19 +557,34 @@ public CompletableFuture<Void> removeServerTag(
551557
}
552558

553559
@Override
554-
public CompletableFuture<RebalancePlan> rebalance(
555-
List<GoalType> priorityGoals, boolean dryRun) {
556-
throw new UnsupportedOperationException("Support soon");
560+
public CompletableFuture<String> rebalance(List<GoalType> priorityGoals) {
561+
RebalanceRequest request = new RebalanceRequest();
562+
priorityGoals.forEach(goal -> request.addGoal(goal.value));
563+
return gateway.rebalance(request).thenApply(RebalanceResponse::getRebalanceId);
557564
}
558565

559566
@Override
560-
public CompletableFuture<RebalanceProgress> listRebalanceProgress() {
561-
throw new UnsupportedOperationException("Support soon");
567+
public CompletableFuture<Optional<RebalanceProgress>> listRebalanceProgress(
568+
@Nullable String rebalanceId) {
569+
ListRebalanceProgressRequest request = new ListRebalanceProgressRequest();
570+
571+
if (rebalanceId != null) {
572+
request.setRebalanceId(rebalanceId);
573+
}
574+
575+
return gateway.listRebalanceProgress(request)
576+
.thenApply(ClientRpcMessageUtils::toRebalanceProgress);
562577
}
563578

564579
@Override
565-
public CompletableFuture<Void> cancelRebalance() {
566-
throw new UnsupportedOperationException("Support soon");
580+
public CompletableFuture<Void> cancelRebalance(@Nullable String rebalanceId) {
581+
CancelRebalanceRequest request = new CancelRebalanceRequest();
582+
583+
if (rebalanceId != null) {
584+
request.setRebalanceId(rebalanceId);
585+
}
586+
587+
return gateway.cancelRebalance(request).thenApply(r -> null);
567588
}
568589

569590
@Override

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
import org.apache.fluss.client.metadata.LakeSnapshot;
2626
import org.apache.fluss.client.write.KvWriteBatch;
2727
import org.apache.fluss.client.write.ReadyWriteBatch;
28+
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
29+
import org.apache.fluss.cluster.rebalance.RebalanceProgress;
30+
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
31+
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
2832
import org.apache.fluss.config.cluster.AlterConfigOpType;
2933
import org.apache.fluss.config.cluster.ColumnPositionType;
3034
import org.apache.fluss.config.cluster.ConfigEntry;
@@ -46,6 +50,7 @@
4650
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotResponse;
4751
import org.apache.fluss.rpc.messages.ListOffsetsRequest;
4852
import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
53+
import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse;
4954
import org.apache.fluss.rpc.messages.LookupRequest;
5055
import org.apache.fluss.rpc.messages.MetadataRequest;
5156
import org.apache.fluss.rpc.messages.PbAddColumn;
@@ -61,6 +66,9 @@
6166
import org.apache.fluss.rpc.messages.PbPrefixLookupReqForBucket;
6267
import org.apache.fluss.rpc.messages.PbProduceLogReqForBucket;
6368
import org.apache.fluss.rpc.messages.PbPutKvReqForBucket;
69+
import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket;
70+
import org.apache.fluss.rpc.messages.PbRebalanceProgressForBucket;
71+
import org.apache.fluss.rpc.messages.PbRebalanceProgressForTable;
6472
import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
6573
import org.apache.fluss.rpc.messages.PbRenameColumn;
6674
import org.apache.fluss.rpc.messages.PrefixLookupRequest;
@@ -77,10 +85,13 @@
7785
import java.util.HashMap;
7886
import java.util.List;
7987
import java.util.Map;
88+
import java.util.Optional;
8089
import java.util.Set;
8190
import java.util.stream.Collectors;
8291

92+
import static org.apache.fluss.cluster.rebalance.RebalanceStatus.FINAL_STATUSES;
8393
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec;
94+
import static org.apache.fluss.utils.Preconditions.checkArgument;
8495
import static org.apache.fluss.utils.Preconditions.checkState;
8596

8697
/**
@@ -370,6 +381,65 @@ public static AlterTableRequest makeAlterTableRequest(
370381
return request;
371382
}
372383

384+
public static Optional<RebalanceProgress> toRebalanceProgress(
385+
ListRebalanceProgressResponse response) {
386+
if (!response.hasRebalanceId()) {
387+
return Optional.empty();
388+
}
389+
390+
checkArgument(response.hasRebalanceStatus(), "Rebalance status is not set");
391+
RebalanceStatus totalRebalanceStatus = RebalanceStatus.of(response.getRebalanceStatus());
392+
int totalTask = 0;
393+
int finishedTask = 0;
394+
Map<TableBucket, RebalanceResultForBucket> rebalanceProgress = new HashMap<>();
395+
for (PbRebalanceProgressForTable pbTable : response.getTableProgressesList()) {
396+
long tableId = pbTable.getTableId();
397+
for (PbRebalanceProgressForBucket pbBucket : pbTable.getBucketsProgressesList()) {
398+
RebalanceStatus bucketStatus = RebalanceStatus.of(pbBucket.getRebalanceStatus());
399+
RebalancePlanForBucket planForBucket =
400+
toRebalancePlanForBucket(tableId, pbBucket.getRebalancePlan());
401+
rebalanceProgress.put(
402+
planForBucket.getTableBucket(),
403+
new RebalanceResultForBucket(planForBucket, bucketStatus));
404+
if (FINAL_STATUSES.contains(bucketStatus)) {
405+
finishedTask++;
406+
}
407+
totalTask++;
408+
}
409+
}
410+
411+
// For these rebalance task without only bucket level rebalance tasks, we return -1 as
412+
// progress.
413+
double progress = -1d;
414+
if (totalTask != 0) {
415+
progress = (double) finishedTask / totalTask;
416+
}
417+
418+
return Optional.of(
419+
new RebalanceProgress(
420+
response.getRebalanceId(),
421+
totalRebalanceStatus,
422+
progress,
423+
rebalanceProgress));
424+
}
425+
426+
private static RebalancePlanForBucket toRebalancePlanForBucket(
427+
long tableId, PbRebalancePlanForBucket rebalancePlan) {
428+
TableBucket tableBucket =
429+
new TableBucket(
430+
tableId,
431+
rebalancePlan.hasPartitionId() ? rebalancePlan.getPartitionId() : null,
432+
rebalancePlan.getBucketId());
433+
return new RebalancePlanForBucket(
434+
tableBucket,
435+
rebalancePlan.getOriginalLeader(),
436+
rebalancePlan.getNewLeader(),
437+
Arrays.stream(rebalancePlan.getOriginalReplicas())
438+
.boxed()
439+
.collect(Collectors.toList()),
440+
Arrays.stream(rebalancePlan.getNewReplicas()).boxed().collect(Collectors.toList()));
441+
}
442+
373443
public static List<PartitionInfo> toPartitionInfos(ListPartitionInfosResponse response) {
374444
return response.getPartitionsInfosList().stream()
375445
.map(

0 commit comments

Comments
 (0)