Skip to content

Commit 654239f

Browse files
committed
[server] Add rebalance id to trace rebalance task
1 parent 1b27c79 commit 654239f

File tree

22 files changed

+254
-59
lines changed

22 files changed

+254
-59
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@
6767
import org.apache.fluss.security.acl.AclBinding;
6868
import org.apache.fluss.security.acl.AclBindingFilter;
6969

70+
import javax.annotation.Nullable;
71+
7072
import java.util.Collection;
7173
import java.util.List;
7274
import java.util.concurrent.CompletableFuture;
@@ -577,9 +579,12 @@ ListOffsetsResult listOffsets(
577579
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
578580
* </ul>
579581
*
582+
* @param rebalanceId the rebalance id to list progress, if it is null means list the latest
583+
* rebalance task's process. If rebalance id is not exists in server, empty rebalance result
584+
* will be returned.
580585
* @return the rebalance process.
581586
*/
582-
CompletableFuture<RebalanceProgress> listRebalanceProgress();
587+
CompletableFuture<RebalanceProgress> listRebalanceProgress(@Nullable String rebalanceId);
583588

584589
/**
585590
* Cannel the rebalance task.
@@ -589,6 +594,9 @@ ListOffsetsResult listOffsets(
589594
* permissions.
590595
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
591596
* </ul>
597+
*
598+
* @param rebalanceId the rebalance id to cancel, if it is null means cancel the latest
599+
* rebalance task. If rebalanceId is not exists in server, nothing will be done.
592600
*/
593-
CompletableFuture<Void> cancelRebalance();
601+
CompletableFuture<Void> cancelRebalance(@Nullable String rebalanceId);
594602
}

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,15 +563,26 @@ public CompletableFuture<RebalancePlan> rebalance(
563563
}
564564

565565
@Override
566-
public CompletableFuture<RebalanceProgress> listRebalanceProgress() {
566+
public CompletableFuture<RebalanceProgress> listRebalanceProgress(
567+
@Nullable String rebalanceId) {
567568
ListRebalanceProgressRequest request = new ListRebalanceProgressRequest();
569+
570+
if (rebalanceId != null) {
571+
request.setRebalanceId(rebalanceId);
572+
}
573+
568574
return gateway.listRebalanceProgress(request)
569575
.thenApply(ClientRpcMessageUtils::toRebalanceProgress);
570576
}
571577

572578
@Override
573-
public CompletableFuture<Void> cancelRebalance() {
579+
public CompletableFuture<Void> cancelRebalance(@Nullable String rebalanceId) {
574580
CancelRebalanceRequest request = new CancelRebalanceRequest();
581+
582+
if (rebalanceId != null) {
583+
request.setRebalanceId(rebalanceId);
584+
}
585+
575586
return gateway.cancelRebalance(request).thenApply(r -> null);
576587
}
577588

fluss-client/src/main/java/org/apache/fluss/client/admin/RebalancePlan.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,19 @@
2929
*/
3030
public class RebalancePlan {
3131

32+
private final String rebalanceId;
3233
private final Map<TableBucket, RebalancePlanForBucket> planForBucketMap;
3334

34-
public RebalancePlan(Map<TableBucket, RebalancePlanForBucket> planForBucketMap) {
35+
public RebalancePlan(
36+
String rebalanceId, Map<TableBucket, RebalancePlanForBucket> planForBucketMap) {
37+
this.rebalanceId = rebalanceId;
3538
this.planForBucketMap = planForBucketMap;
3639
}
3740

41+
public String getRebalanceId() {
42+
return rebalanceId;
43+
}
44+
3845
public Map<TableBucket, RebalancePlanForBucket> getPlanForBucketMap() {
3946
return planForBucketMap;
4047
}

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ public static RebalancePlan toRebalancePlan(RebalanceResponse response) {
391391
rebalancePlan.put(planForBucket.getTableBucket(), planForBucket);
392392
}
393393
}
394-
return new RebalancePlan(rebalancePlan);
394+
return new RebalancePlan(response.getRebalanceId(), rebalancePlan);
395395
}
396396

397397
public static RebalanceProgress toRebalanceProgress(ListRebalanceProgressResponse response) {
@@ -420,7 +420,11 @@ public static RebalanceProgress toRebalanceProgress(ListRebalanceProgressRespons
420420
progress = (double) finishedTask / totalTask;
421421
}
422422

423-
return new RebalanceProgress(totalRebalanceStatus, progress, rebalanceProgress);
423+
return new RebalanceProgress(
424+
response.hasRebalanceId() ? response.getRebalanceId() : null,
425+
totalRebalanceStatus,
426+
progress,
427+
rebalanceProgress);
424428
}
425429

426430
private static RebalancePlanForBucket toRebalancePlanForBucket(

fluss-client/src/test/java/org/apache/fluss/client/admin/RebalanceITCase.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ void testRebalanceForLogTable() throws Exception {
212212

213213
@Test
214214
void testListRebalanceProcess() throws Exception {
215-
RebalanceProgress rebalanceProgress = admin.listRebalanceProgress().get();
215+
RebalanceProgress rebalanceProgress = admin.listRebalanceProgress(null).get();
216216
assertThat(rebalanceProgress.progress()).isEqualTo(-1d);
217217
assertThat(rebalanceProgress.status()).isEqualTo(RebalanceStatus.NO_TASK);
218218
assertThat(rebalanceProgress.progressForBucketMap()).isEmpty();
@@ -249,7 +249,8 @@ void testListRebalanceProcess() throws Exception {
249249
retry(
250250
Duration.ofMinutes(2),
251251
() -> {
252-
RebalanceProgress progress = admin.listRebalanceProgress().get();
252+
RebalanceProgress progress =
253+
admin.listRebalanceProgress(rebalancePlan.getRebalanceId()).get();
253254
assertThat(progress.progress()).isEqualTo(1d);
254255
assertThat(progress.status()).isEqualTo(RebalanceStatus.COMPLETED);
255256
Map<TableBucket, RebalanceResultForBucket> processForBuckets =
@@ -267,11 +268,20 @@ void testListRebalanceProcess() throws Exception {
267268
});
268269

269270
// cancel rebalance.
270-
admin.cancelRebalance().get();
271+
admin.cancelRebalance(rebalancePlan.getRebalanceId()).get();
271272

272-
RebalanceProgress progress = admin.listRebalanceProgress().get();
273+
RebalanceProgress progress =
274+
admin.listRebalanceProgress(rebalancePlan.getRebalanceId()).get();
273275
assertThat(progress.progress()).isEqualTo(1d);
274276
assertThat(progress.status()).isEqualTo(RebalanceStatus.CANCELED);
277+
278+
// test list and cancel an un-existed rebalance id.
279+
progress = admin.listRebalanceProgress("unexisted-rebalance-id").get();
280+
assertThat(progress.progress()).isEqualTo(-1d);
281+
assertThat(progress.status()).isEqualTo(RebalanceStatus.NO_TASK);
282+
assertThat(progress.progressForBucketMap()).isEmpty();
283+
284+
admin.cancelRebalance("unexisted-rebalance-id").get();
275285
}
276286

277287
private static Configuration initConfig() {

fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceProgress.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import org.apache.fluss.metadata.TableBucket;
2121

22+
import javax.annotation.Nullable;
23+
2224
import java.util.Map;
2325

2426
import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -30,6 +32,9 @@
3032
*/
3133
public class RebalanceProgress {
3234

35+
/** The rebalance id. */
36+
private final @Nullable String rebalanceId;
37+
3338
/** The rebalance status for the overall rebalance. */
3439
private final RebalanceStatus rebalanceStatus;
3540

@@ -40,15 +45,21 @@ public class RebalanceProgress {
4045
private final Map<TableBucket, RebalanceResultForBucket> progressForBucketMap;
4146

4247
public RebalanceProgress(
48+
@Nullable String rebalanceId,
4349
RebalanceStatus rebalanceStatus,
4450
double progress,
4551
Map<TableBucket, RebalanceResultForBucket> progressForBucketMap) {
52+
this.rebalanceId = rebalanceId;
4653
// TODO: we may derive the overall progress and status from progressForBucketMap
4754
this.rebalanceStatus = checkNotNull(rebalanceStatus);
4855
this.progress = progress;
4956
this.progressForBucketMap = checkNotNull(progressForBucketMap);
5057
}
5158

59+
public @Nullable String rebalanceId() {
60+
return rebalanceId;
61+
}
62+
5263
public RebalanceStatus status() {
5364
return rebalanceStatus;
5465
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/CancelRebalanceProcedure.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,25 @@
1717

1818
package org.apache.fluss.flink.procedure;
1919

20+
import org.apache.flink.table.annotation.ArgumentHint;
21+
import org.apache.flink.table.annotation.DataTypeHint;
2022
import org.apache.flink.table.annotation.ProcedureHint;
2123
import org.apache.flink.table.procedure.ProcedureContext;
2224

25+
import javax.annotation.Nullable;
26+
2327
/** Procedure to cancel rebalance. */
2428
public class CancelRebalanceProcedure extends ProcedureBase {
2529

26-
@ProcedureHint()
27-
public String[] call(ProcedureContext context) throws Exception {
28-
admin.cancelRebalance().get();
30+
@ProcedureHint(
31+
argument = {
32+
@ArgumentHint(
33+
name = "rebalanceId",
34+
type = @DataTypeHint("STRING"),
35+
isOptional = true)
36+
})
37+
public String[] call(ProcedureContext context, @Nullable String rebalanceId) throws Exception {
38+
admin.cancelRebalance(rebalanceId).get();
2939
return new String[] {"success"};
3040
}
3141
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ListRebalanceProcessProcedure.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,28 @@
2222
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
2323
import org.apache.fluss.metadata.TableBucket;
2424

25+
import org.apache.flink.table.annotation.ArgumentHint;
26+
import org.apache.flink.table.annotation.DataTypeHint;
2527
import org.apache.flink.table.annotation.ProcedureHint;
2628
import org.apache.flink.table.procedure.ProcedureContext;
2729

30+
import javax.annotation.Nullable;
31+
2832
import java.text.NumberFormat;
2933
import java.util.Map;
3034

3135
/** Procedure to list rebalance process. */
3236
public class ListRebalanceProcessProcedure extends ProcedureBase {
3337

34-
@ProcedureHint()
35-
public String[] call(ProcedureContext context) throws Exception {
36-
RebalanceProgress progress = admin.listRebalanceProgress().get();
38+
@ProcedureHint(
39+
argument = {
40+
@ArgumentHint(
41+
name = "rebalanceId",
42+
type = @DataTypeHint("STRING"),
43+
isOptional = true)
44+
})
45+
public String[] call(ProcedureContext context, @Nullable String rebalanceId) throws Exception {
46+
RebalanceProgress progress = admin.listRebalanceProgress(rebalanceId).get();
3747
return progressToString(progress);
3848
}
3949

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,16 @@ void testRebalance(boolean upperCase) throws Exception {
529529
assertCallResult(listProceduresIterator, new String[] {"+I[success]"});
530530
}
531531

532+
// test cancel an un-existed rebalance.
533+
try (CloseableIterator<Row> listProceduresIterator =
534+
tEnv.executeSql(
535+
String.format(
536+
"Call %s.sys.cancel_rebalance('not-exist-id')",
537+
CATALOG_NAME))
538+
.collect()) {
539+
assertCallResult(listProceduresIterator, new String[] {"+I[success]"});
540+
}
541+
532542
// delete rebalance plan to avoid conflict with other tests.
533543
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient().deleteRebalancePlan();
534544
}

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -598,18 +598,22 @@ message RebalanceRequest {
598598
}
599599

600600
message RebalanceResponse {
601-
repeated PbRebalancePlanForTable table_plan = 1;
601+
required string rebalance_id = 1;
602+
repeated PbRebalancePlanForTable table_plan = 2;
602603
}
603604

604605
message ListRebalanceProgressRequest {
606+
optional string rebalance_id = 1;
605607
}
606608

607609
message ListRebalanceProgressResponse {
608610
required int32 rebalance_status = 1;
609-
repeated PbRebalanceProgressForTable table_progress = 2;
611+
optional string rebalance_id = 2;
612+
repeated PbRebalanceProgressForTable table_progress = 3;
610613
}
611614

612615
message CancelRebalanceRequest {
616+
optional string rebalance_id = 1;
613617
}
614618

615619
message CancelRebalanceResponse {

0 commit comments

Comments
 (0)