Skip to content

Commit 07aa237

Browse files
authored
[server] Support AddServerTag and RemoveServerTag (#1400)
1 parent 1055e42 commit 07aa237

File tree

12 files changed

+596
-25
lines changed

12 files changed

+596
-25
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.fluss.rpc.gateway.AdminGateway;
4747
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
4848
import org.apache.fluss.rpc.gateway.TabletServerGateway;
49+
import org.apache.fluss.rpc.messages.AddServerTagRequest;
4950
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
5051
import org.apache.fluss.rpc.messages.AlterTableRequest;
5152
import org.apache.fluss.rpc.messages.CreateAclsRequest;
@@ -74,6 +75,7 @@
7475
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
7576
import org.apache.fluss.rpc.messages.PbPartitionSpec;
7677
import org.apache.fluss.rpc.messages.PbTablePath;
78+
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
7779
import org.apache.fluss.rpc.messages.TableExistsRequest;
7880
import org.apache.fluss.rpc.messages.TableExistsResponse;
7981
import org.apache.fluss.rpc.protocol.ApiError;
@@ -535,13 +537,17 @@ public CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> confi
535537

536538
@Override
537539
public CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag) {
538-
throw new UnsupportedOperationException("Support soon");
540+
AddServerTagRequest request = new AddServerTagRequest().setServerTag(serverTag.value);
541+
tabletServers.forEach(request::addServerId);
542+
return gateway.addServerTag(request).thenApply(r -> null);
539543
}
540544

541545
@Override
542546
public CompletableFuture<Void> removeServerTag(
543547
List<Integer> tabletServers, ServerTag serverTag) {
544-
throw new UnsupportedOperationException("Support soon");
548+
RemoveServerTagRequest request = new RemoveServerTagRequest().setServerTag(serverTag.value);
549+
tabletServers.forEach(request::addServerId);
550+
return gateway.removeServerTag(request).thenApply(r -> null);
545551
}
546552

547553
@Override

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.fluss.client.table.Table;
2525
import org.apache.fluss.client.table.writer.UpsertWriter;
2626
import org.apache.fluss.cluster.ServerNode;
27+
import org.apache.fluss.cluster.rebalance.ServerTag;
2728
import org.apache.fluss.config.AutoPartitionTimeUnit;
2829
import org.apache.fluss.config.ConfigOptions;
2930
import org.apache.fluss.config.Configuration;
@@ -42,6 +43,9 @@
4243
import org.apache.fluss.exception.PartitionAlreadyExistsException;
4344
import org.apache.fluss.exception.PartitionNotExistException;
4445
import org.apache.fluss.exception.SchemaNotExistException;
46+
import org.apache.fluss.exception.ServerNotExistException;
47+
import org.apache.fluss.exception.ServerTagAlreadyExistException;
48+
import org.apache.fluss.exception.ServerTagNotExistException;
4549
import org.apache.fluss.exception.TableNotExistException;
4650
import org.apache.fluss.exception.TableNotPartitionedException;
4751
import org.apache.fluss.exception.TooManyBucketsException;
@@ -64,6 +68,8 @@
6468
import org.apache.fluss.metadata.TablePath;
6569
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
6670
import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
71+
import org.apache.fluss.server.zk.ZooKeeperClient;
72+
import org.apache.fluss.server.zk.data.ServerTags;
6773
import org.apache.fluss.types.DataTypes;
6874

6975
import org.junit.jupiter.api.BeforeEach;
@@ -80,6 +86,7 @@
8086
import java.util.HashMap;
8187
import java.util.List;
8288
import java.util.Map;
89+
import java.util.Optional;
8390
import java.util.OptionalLong;
8491
import java.util.concurrent.ExecutionException;
8592
import java.util.stream.Collectors;
@@ -1436,4 +1443,75 @@ public void testSystemsColumns() throws Exception {
14361443
+ "Please use other names for these columns. "
14371444
+ "The reserved system columns are: __offset, __timestamp, __bucket");
14381445
}
1446+
1447+
@Test
1448+
public void testAddAndRemoveServerTags() throws Exception {
1449+
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
1450+
// 1.add server tag to a none exists server.
1451+
assertThatThrownBy(
1452+
() ->
1453+
admin.addServerTag(
1454+
Collections.singletonList(100),
1455+
ServerTag.PERMANENT_OFFLINE)
1456+
.get())
1457+
.cause()
1458+
.isInstanceOf(ServerNotExistException.class)
1459+
.hasMessageContaining("Server 100 not exists when trying to add server tag.");
1460+
1461+
// 2.add server tag for server 0,1.
1462+
admin.addServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
1463+
assertThat(zkClient.getServerTags()).isPresent();
1464+
assertThat(zkClient.getServerTags().get().getServerTags())
1465+
.containsEntry(0, ServerTag.PERMANENT_OFFLINE)
1466+
.containsEntry(1, ServerTag.PERMANENT_OFFLINE);
1467+
1468+
// 3.add different server tag for server 0,2. error will be thrown and tag for 2 will not be
1469+
// added.
1470+
assertThatThrownBy(
1471+
() ->
1472+
admin.addServerTag(Arrays.asList(0, 2), ServerTag.TEMPORARY_OFFLINE)
1473+
.get())
1474+
.cause()
1475+
.isInstanceOf(ServerTagAlreadyExistException.class)
1476+
.hasMessageContaining(
1477+
"Server tag PERMANENT_OFFLINE already exists for server 0. However "
1478+
+ "you want to set it to TEMPORARY_OFFLINE, please remove the server tag first.");
1479+
Optional<ServerTags> serverTagsOpt = zkClient.getServerTags();
1480+
assertThat(serverTagsOpt).isPresent();
1481+
Map<Integer, ServerTag> serverTags = serverTagsOpt.get().getServerTags();
1482+
assertThat(serverTags.size()).isEqualTo(2);
1483+
assertThat(serverTags)
1484+
.containsEntry(0, ServerTag.PERMANENT_OFFLINE)
1485+
.containsEntry(1, ServerTag.PERMANENT_OFFLINE);
1486+
1487+
// 4.remove server tag for server 100
1488+
assertThatThrownBy(
1489+
() ->
1490+
admin.removeServerTag(
1491+
Collections.singletonList(100),
1492+
ServerTag.PERMANENT_OFFLINE)
1493+
.get())
1494+
.cause()
1495+
.isInstanceOf(ServerNotExistException.class)
1496+
.hasMessageContaining("Server 100 not exists when trying to removing server tag.");
1497+
1498+
// 5.remove server tag for server 0,1.
1499+
admin.removeServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
1500+
assertThat(zkClient.getServerTags()).isNotPresent();
1501+
1502+
// 6.remove server tag for server 2. error will be thrown and tag for 2 will not be removed
1503+
// as the removed server tag is not equals with the exists one.
1504+
admin.addServerTag(Collections.singletonList(2), ServerTag.TEMPORARY_OFFLINE).get();
1505+
assertThatThrownBy(
1506+
() ->
1507+
admin.removeServerTag(
1508+
Collections.singletonList(2),
1509+
ServerTag.PERMANENT_OFFLINE)
1510+
.get())
1511+
.cause()
1512+
.isInstanceOf(ServerTagNotExistException.class)
1513+
.hasMessageContaining(
1514+
"Server tag PERMANENT_OFFLINE not exists for server 2, the current "
1515+
+ "server tag of this server is TEMPORARY_OFFLINE.");
1516+
}
14391517
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.procedure;
19+
20+
import org.apache.fluss.cluster.rebalance.ServerTag;
21+
22+
import org.apache.flink.table.annotation.ArgumentHint;
23+
import org.apache.flink.table.annotation.DataTypeHint;
24+
import org.apache.flink.table.annotation.ProcedureHint;
25+
import org.apache.flink.table.procedure.ProcedureContext;
26+
27+
import java.util.List;
28+
29+
import static org.apache.fluss.flink.procedure.RemoveServerTagProcedure.validateAndGetServerTag;
30+
import static org.apache.fluss.flink.procedure.RemoveServerTagProcedure.validateAndGetTabletServers;
31+
32+
/** Procedure to add server tag. */
33+
public class AddServerTagProcedure extends ProcedureBase {
34+
35+
@ProcedureHint(
36+
argument = {
37+
@ArgumentHint(name = "tabletServers", type = @DataTypeHint("STRING")),
38+
@ArgumentHint(name = "serverTag", type = @DataTypeHint("STRING"))
39+
})
40+
public String[] call(ProcedureContext context, String tabletServers, String serverTag)
41+
throws Exception {
42+
List<Integer> servers = validateAndGetTabletServers(tabletServers);
43+
ServerTag tag = validateAndGetServerTag(serverTag);
44+
admin.addServerTag(servers, tag).get();
45+
return new String[] {"success"};
46+
}
47+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ private enum ProcedureEnum {
7171
DROP_ACL("sys.drop_acl", DropAclProcedure.class),
7272
List_ACL("sys.list_acl", ListAclProcedure.class),
7373
SET_CLUSTER_CONFIG("sys.set_cluster_config", SetClusterConfigProcedure.class),
74-
GET_CLUSTER_CONFIG("sys.get_cluster_config", GetClusterConfigProcedure.class);
74+
GET_CLUSTER_CONFIG("sys.get_cluster_config", GetClusterConfigProcedure.class),
75+
ADD_SERVER_TAG("sys.add_server_tag", AddServerTagProcedure.class),
76+
REMOVE_SERVER_TAG("sys.remove_server_tag", RemoveServerTagProcedure.class);
7577

7678
private final String path;
7779
private final Class<? extends ProcedureBase> procedureClass;
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.procedure;
19+
20+
import org.apache.fluss.cluster.rebalance.ServerTag;
21+
22+
import org.apache.flink.table.annotation.ArgumentHint;
23+
import org.apache.flink.table.annotation.DataTypeHint;
24+
import org.apache.flink.table.annotation.ProcedureHint;
25+
import org.apache.flink.table.procedure.ProcedureContext;
26+
27+
import java.util.ArrayList;
28+
import java.util.Arrays;
29+
import java.util.List;
30+
31+
/** Procedure to remove server tags. */
32+
public class RemoveServerTagProcedure extends ProcedureBase {
33+
34+
@ProcedureHint(
35+
argument = {
36+
@ArgumentHint(name = "tabletServers", type = @DataTypeHint("STRING")),
37+
@ArgumentHint(name = "serverTag", type = @DataTypeHint("STRING"))
38+
})
39+
public String[] call(ProcedureContext context, String tabletServers, String serverTag)
40+
throws Exception {
41+
List<Integer> servers = validateAndGetTabletServers(tabletServers);
42+
ServerTag tag = validateAndGetServerTag(serverTag);
43+
admin.removeServerTag(servers, tag).get();
44+
return new String[] {"success"};
45+
}
46+
47+
public static List<Integer> validateAndGetTabletServers(String tabletServers) {
48+
if (tabletServers == null || tabletServers.trim().isEmpty()) {
49+
throw new IllegalArgumentException(
50+
"tabletServers cannot be null or empty. You can specify one tabletServer as 1 or "
51+
+ "specify multi tabletServers as 1;2 (split by ';')");
52+
}
53+
54+
tabletServers = tabletServers.trim();
55+
String[] splitServers = tabletServers.split(";");
56+
if (splitServers.length == 0) {
57+
throw new IllegalArgumentException(
58+
"tabletServers cannot be empty. You can specify one tabletServer as 1 or "
59+
+ "specify multi tabletServers as 1;2 (split by ';')");
60+
}
61+
List<Integer> servers = new ArrayList<>();
62+
for (String server : splitServers) {
63+
servers.add(Integer.parseInt(server));
64+
}
65+
return servers;
66+
}
67+
68+
public static ServerTag validateAndGetServerTag(String serverTag) {
69+
if (serverTag == null || serverTag.trim().isEmpty()) {
70+
throw new IllegalArgumentException(
71+
"serverTag cannot be null or empty. Please specify a valid serverTag. serverTag is one of "
72+
+ Arrays.asList(ServerTag.values()));
73+
}
74+
serverTag = serverTag.trim().toUpperCase();
75+
return ServerTag.valueOf(serverTag);
76+
}
77+
}

0 commit comments

Comments
 (0)