Skip to content

Commit fbd1e6a

Browse files
committed
[server] Add rebalance id to trace rebalance task
1 parent d477f94 commit fbd1e6a

File tree

19 files changed

+204
-48
lines changed

19 files changed

+204
-48
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@
6767
import org.apache.fluss.security.acl.AclBinding;
6868
import org.apache.fluss.security.acl.AclBindingFilter;
6969

70+
import javax.annotation.Nullable;
71+
7072
import java.util.Collection;
7173
import java.util.List;
7274
import java.util.concurrent.CompletableFuture;
@@ -577,9 +579,12 @@ ListOffsetsResult listOffsets(
577579
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
578580
* </ul>
579581
*
582+
* @param rebalanceId the rebalance id to list progress, if it is null means list the latest
583+
* rebalance task's process. If rebalance id is not exists in server, empty rebalance result
584+
* will be returned.
580585
* @return the rebalance process.
581586
*/
582-
CompletableFuture<RebalanceProgress> listRebalanceProgress();
587+
CompletableFuture<RebalanceProgress> listRebalanceProgress(@Nullable String rebalanceId);
583588

584589
/**
585590
* Cannel the rebalance task.
@@ -589,6 +594,9 @@ ListOffsetsResult listOffsets(
589594
* permissions.
590595
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
591596
* </ul>
597+
*
598+
* @param rebalanceId the rebalance id to cancel, if it is null means cancel the latest
599+
* rebalance task. If rebalanceId is not exists in server, nothing will be done.
592600
*/
593-
CompletableFuture<Void> cancelRebalance();
601+
CompletableFuture<Void> cancelRebalance(@Nullable String rebalanceId);
594602
}

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,15 +563,26 @@ public CompletableFuture<RebalancePlan> rebalance(
563563
}
564564

565565
@Override
566-
public CompletableFuture<RebalanceProgress> listRebalanceProgress() {
566+
public CompletableFuture<RebalanceProgress> listRebalanceProgress(
567+
@Nullable String rebalanceId) {
567568
ListRebalanceProgressRequest request = new ListRebalanceProgressRequest();
569+
570+
if (rebalanceId != null) {
571+
request.setRebalanceId(rebalanceId);
572+
}
573+
568574
return gateway.listRebalanceProgress(request)
569575
.thenApply(ClientRpcMessageUtils::toRebalanceProgress);
570576
}
571577

572578
@Override
573-
public CompletableFuture<Void> cancelRebalance() {
579+
public CompletableFuture<Void> cancelRebalance(@Nullable String rebalanceId) {
574580
CancelRebalanceRequest request = new CancelRebalanceRequest();
581+
582+
if (rebalanceId != null) {
583+
request.setRebalanceId(rebalanceId);
584+
}
585+
575586
return gateway.cancelRebalance(request).thenApply(r -> null);
576587
}
577588

fluss-client/src/main/java/org/apache/fluss/client/admin/RebalancePlan.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,19 @@
2929
*/
3030
public class RebalancePlan {
3131

32+
private final String rebalanceId;
3233
private final Map<TableBucket, RebalancePlanForBucket> planForBucketMap;
3334

34-
public RebalancePlan(Map<TableBucket, RebalancePlanForBucket> planForBucketMap) {
35+
public RebalancePlan(
36+
String rebalanceId, Map<TableBucket, RebalancePlanForBucket> planForBucketMap) {
37+
this.rebalanceId = rebalanceId;
3538
this.planForBucketMap = planForBucketMap;
3639
}
3740

41+
public String getRebalanceId() {
42+
return rebalanceId;
43+
}
44+
3845
public Map<TableBucket, RebalancePlanForBucket> getPlanForBucketMap() {
3946
return planForBucketMap;
4047
}

fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ public static RebalancePlan toRebalancePlan(RebalanceResponse response) {
391391
rebalancePlan.put(planForBucket.getTableBucket(), planForBucket);
392392
}
393393
}
394-
return new RebalancePlan(rebalancePlan);
394+
return new RebalancePlan(response.getRebalanceId(), rebalancePlan);
395395
}
396396

397397
public static RebalanceProgress toRebalanceProgress(ListRebalanceProgressResponse response) {

fluss-client/src/test/java/org/apache/fluss/client/admin/RebalanceITCase.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ void testRebalanceForLogTable() throws Exception {
210210

211211
@Test
212212
void testListRebalanceProcess() throws Exception {
213-
RebalanceProgress rebalanceProgress = admin.listRebalanceProgress().get();
213+
RebalanceProgress rebalanceProgress = admin.listRebalanceProgress(null).get();
214214
assertThat(rebalanceProgress.progress()).isEqualTo(-1d);
215215
assertThat(rebalanceProgress.status()).isEqualTo(RebalanceStatus.NO_TASK);
216216
assertThat(rebalanceProgress.progressForBucketMap()).isEmpty();
@@ -247,7 +247,8 @@ void testListRebalanceProcess() throws Exception {
247247
retry(
248248
Duration.ofMinutes(2),
249249
() -> {
250-
RebalanceProgress progress = admin.listRebalanceProgress().get();
250+
RebalanceProgress progress =
251+
admin.listRebalanceProgress(rebalancePlan.getRebalanceId()).get();
251252
assertThat(progress.progress()).isEqualTo(1d);
252253
assertThat(progress.status()).isEqualTo(RebalanceStatus.COMPLETED);
253254
Map<TableBucket, RebalanceResultForBucket> processForBuckets =
@@ -265,11 +266,20 @@ void testListRebalanceProcess() throws Exception {
265266
});
266267

267268
// cancel rebalance.
268-
admin.cancelRebalance().get();
269+
admin.cancelRebalance(rebalancePlan.getRebalanceId()).get();
269270

270-
RebalanceProgress progress = admin.listRebalanceProgress().get();
271+
RebalanceProgress progress =
272+
admin.listRebalanceProgress(rebalancePlan.getRebalanceId()).get();
271273
assertThat(progress.progress()).isEqualTo(1d);
272274
assertThat(progress.status()).isEqualTo(RebalanceStatus.CANCELED);
275+
276+
// test list and cancel an un-existed rebalance id.
277+
progress = admin.listRebalanceProgress("unexisted-rebalance-id").get();
278+
assertThat(progress.progress()).isEqualTo(-1d);
279+
assertThat(progress.status()).isEqualTo(RebalanceStatus.NO_TASK);
280+
assertThat(progress.progressForBucketMap()).isEmpty();
281+
282+
admin.cancelRebalance("unexisted-rebalance-id").get();
273283
}
274284

275285
private static Configuration initConfig() {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/CancelRebalanceProcedure.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,25 @@
1717

1818
package org.apache.fluss.flink.procedure;
1919

20+
import org.apache.flink.table.annotation.ArgumentHint;
21+
import org.apache.flink.table.annotation.DataTypeHint;
2022
import org.apache.flink.table.annotation.ProcedureHint;
2123
import org.apache.flink.table.procedure.ProcedureContext;
2224

25+
import javax.annotation.Nullable;
26+
2327
/** Procedure to cancel rebalance. */
2428
public class CancelRebalanceProcedure extends ProcedureBase {
2529

26-
@ProcedureHint()
27-
public String[] call(ProcedureContext context) throws Exception {
28-
admin.cancelRebalance().get();
30+
@ProcedureHint(
31+
argument = {
32+
@ArgumentHint(
33+
name = "rebalanceId",
34+
type = @DataTypeHint("STRING"),
35+
isOptional = true)
36+
})
37+
public String[] call(ProcedureContext context, @Nullable String rebalanceId) throws Exception {
38+
admin.cancelRebalance(rebalanceId).get();
2939
return new String[] {"success"};
3040
}
3141
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,16 @@ void testRebalance(boolean upperCase) throws Exception {
526526
.collect()) {
527527
assertCallResult(listProceduresIterator, new String[] {"+I[success]"});
528528
}
529+
530+
// test cancel an un-existed rebalance.
531+
try (CloseableIterator<Row> listProceduresIterator =
532+
tEnv.executeSql(
533+
String.format(
534+
"Call %s.sys.cancel_rebalance('not-exist-id')",
535+
CATALOG_NAME))
536+
.collect()) {
537+
assertCallResult(listProceduresIterator, new String[] {"+I[success]"});
538+
}
529539
}
530540

531541
private static Configuration initConfig() {

fluss-rpc/src/main/proto/FlussApi.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,10 +596,12 @@ message RebalanceRequest {
596596
}
597597

598598
message RebalanceResponse {
599-
repeated PbRebalancePlanForTable table_plan = 1;
599+
required string rebalance_id = 1;
600+
repeated PbRebalancePlanForTable table_plan = 2;
600601
}
601602

602603
message ListRebalanceProgressRequest {
604+
optional string rebalance_id = 1;
603605
}
604606

605607
message ListRebalanceProgressResponse {
@@ -608,6 +610,7 @@ message ListRebalanceProgressResponse {
608610
}
609611

610612
message CancelRebalanceRequest {
613+
optional string rebalance_id = 1;
611614
}
612615

613616
message CancelRebalanceResponse {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -616,13 +616,14 @@ public void process(CoordinatorEvent event) {
616616
} else if (event instanceof CancelRebalanceEvent) {
617617
CancelRebalanceEvent cancelRebalanceEvent = (CancelRebalanceEvent) event;
618618
completeFromCallable(
619-
cancelRebalanceEvent.getRespCallback(), this::processCancelRebalance);
619+
cancelRebalanceEvent.getRespCallback(),
620+
() -> processCancelRebalance(cancelRebalanceEvent));
620621
} else if (event instanceof ListRebalanceProgressEvent) {
621622
ListRebalanceProgressEvent listRebalanceProgressEvent =
622623
(ListRebalanceProgressEvent) event;
623624
completeFromCallable(
624625
listRebalanceProgressEvent.getRespCallback(),
625-
this::processListRebalanceProgress);
626+
() -> processListRebalanceProgress(listRebalanceProgressEvent));
626627
} else if (event instanceof AccessContextEvent) {
627628
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
628629
processAccessContext(accessContextEvent);
@@ -1160,20 +1161,24 @@ private RebalanceResponse processRebalance(RebalanceEvent rebalanceEvent) {
11601161
}
11611162

11621163
// 2. execute rebalance plan.
1163-
rebalanceManager.registerRebalance(rebalancePlan.getExecutePlan());
1164+
rebalanceManager.registerRebalance(
1165+
rebalancePlan.getRebalanceId(), rebalancePlan.getExecutePlan());
11641166
}
11651167

11661168
return makeRebalanceRespose(rebalancePlan);
11671169
}
11681170

1169-
private CancelRebalanceResponse processCancelRebalance() {
1171+
private CancelRebalanceResponse processCancelRebalance(
1172+
CancelRebalanceEvent cancelRebalanceEvent) {
11701173
CancelRebalanceResponse response = new CancelRebalanceResponse();
1171-
rebalanceManager.cancelRebalance();
1174+
rebalanceManager.cancelRebalance(cancelRebalanceEvent.getRabalanceId());
11721175
return response;
11731176
}
11741177

1175-
private ListRebalanceProgressResponse processListRebalanceProgress() {
1176-
RebalanceProgress rebalanceProgress = rebalanceManager.listRebalanceProgress();
1178+
private ListRebalanceProgressResponse processListRebalanceProgress(
1179+
ListRebalanceProgressEvent event) {
1180+
RebalanceProgress rebalanceProgress =
1181+
rebalanceManager.listRebalanceProgress(event.getRabalanceId());
11771182
return makeListRebalanceProgressResponse(rebalanceProgress);
11781183
}
11791184

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -833,15 +833,25 @@ public CompletableFuture<RebalanceResponse> rebalance(RebalanceRequest request)
833833
public CompletableFuture<ListRebalanceProgressResponse> listRebalanceProgress(
834834
ListRebalanceProgressRequest request) {
835835
CompletableFuture<ListRebalanceProgressResponse> response = new CompletableFuture<>();
836-
eventManagerSupplier.get().put(new ListRebalanceProgressEvent(response));
836+
eventManagerSupplier
837+
.get()
838+
.put(
839+
new ListRebalanceProgressEvent(
840+
request.hasRebalanceId() ? request.getRebalanceId() : null,
841+
response));
837842
return response;
838843
}
839844

840845
@Override
841846
public CompletableFuture<CancelRebalanceResponse> cancelRebalance(
842847
CancelRebalanceRequest request) {
843848
CompletableFuture<CancelRebalanceResponse> response = new CompletableFuture<>();
844-
eventManagerSupplier.get().put(new CancelRebalanceEvent(response));
849+
eventManagerSupplier
850+
.get()
851+
.put(
852+
new CancelRebalanceEvent(
853+
request.hasRebalanceId() ? request.getRebalanceId() : null,
854+
response));
845855
return response;
846856
}
847857

0 commit comments

Comments
 (0)