Skip to content

Commit d085653

Browse files
committed
StarRocksCluster不允许修改ip和端口
1 parent bcea8a3 commit d085653

File tree

2 files changed

+33
-27
lines changed

2 files changed

+33
-27
lines changed

dss-framework/dss-framework-workspace-server/src/main/java/com/webank/wedatasphere/dss/framework/workspace/dao/impl/DSSWorkspaceStarRocksClusterMapper.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
<update id="updateStarRocksCluster">
2727
UPDATE dss_workspace_starrocks_cluster
28-
SET cluster_ip=#{clusterIp}, http_port=#{httpPort}, tcp_port=#{tcpPort}, is_default_cluster=#{isDefaultCluster}, update_user=#{updateUser}, update_time=#{updateTime}
28+
SET is_default_cluster=#{isDefaultCluster}, update_user=#{updateUser}, update_time=#{updateTime}
2929
WHERE cluster_name=#{clusterName} and workspace_id=#{workspaceId}
3030
</update>
3131

dss-framework/dss-framework-workspace-server/src/main/java/com/webank/wedatasphere/dss/framework/workspace/service/impl/DSSWorkspaceServiceImpl.java

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.commons.collections4.CollectionUtils;
6161
import org.apache.commons.collections4.MapUtils;
6262
import org.apache.commons.lang.ArrayUtils;
63+
import org.apache.commons.lang.ObjectUtils;
6364
import org.apache.commons.lang3.StringUtils;
6465
import org.apache.linkis.common.exception.ErrorException;
6566
import org.slf4j.Logger;
@@ -890,6 +891,14 @@ public List<DSSWorkspaceStarRocksCluster> updateStarRocksCluster(List<UpdateWork
890891
List<DSSWorkspaceStarRocksCluster> starRocksClusters = new ArrayList<>();
891892
Set<String> clusterNames = new HashSet<>();
892893
int count = 0;
894+
895+
List<DSSWorkspaceStarRocksCluster> itemsByWorkspaceId = dssWorkspaceStarRocksClusterMapper.getItemsByWorkspaceId(workspaceId);
896+
Map<String, DSSWorkspaceStarRocksCluster> existsClusters = new HashMap<>();
897+
if(CollectionUtils.isNotEmpty(itemsByWorkspaceId)){
898+
existsClusters = itemsByWorkspaceId.stream()
899+
.collect(Collectors.toMap(DSSWorkspaceStarRocksCluster::getClusterName, cluster -> cluster));
900+
}
901+
893902
for (UpdateWorkspaceStarRocksClusterRequest r : request) {
894903

895904
if (!clusterNames.add(r.getClusterName())) {
@@ -902,10 +911,31 @@ public List<DSSWorkspaceStarRocksCluster> updateStarRocksCluster(List<UpdateWork
902911
}
903912

904913
if (StringUtils.length(r.getClusterName()) > 256) {
905-
LOGGER.warn("工作空间{}StarRocks集群名称{}超过128字符", r.getWorkspaceName(), r.getClusterName());
914+
LOGGER.warn("工作空间{}StarRocks集群名称{}超过256字符", r.getWorkspaceName(), r.getClusterName());
906915
throw new DSSErrorException(90054, String.format("StarRocks cluster %s length cannot exceed 256 in workspace %s", r.getClusterName(), r.getWorkspaceName()));
907916
}
908917

918+
// 已经存在的集群 不允许更改IP 和端口
919+
if(existsClusters.containsKey(r.getClusterName())){
920+
DSSWorkspaceStarRocksCluster dssWorkspaceStarRocksCluster = existsClusters.get(r.getClusterName());
921+
922+
if(ObjectUtils.notEqual(dssWorkspaceStarRocksCluster.getClusterIp(),r.getClusterIp())){
923+
LOGGER.warn("{} workspace, {} StarRocks cluster ip cannot be updated, ip is [{},{}]",r.getWorkspaceName(),r.getClusterName(),
924+
r.getClusterIp(),dssWorkspaceStarRocksCluster.getClusterIp());
925+
throw new DSSErrorException(90054, String.format("%s workspace, %s StarRocks cluster ip cannot be updated", r.getWorkspaceName(),r.getClusterName()));
926+
}
927+
928+
if(ObjectUtils.notEqual(dssWorkspaceStarRocksCluster.getHttpPort(),r.getHttpPort())
929+
|| ObjectUtils.notEqual(dssWorkspaceStarRocksCluster.getTcpPort(),r.getTcpPort())
930+
){
931+
LOGGER.warn("{} workspace, {} StarRocks cluster port cannot be updated",r.getWorkspaceName(),r.getClusterName());
932+
LOGGER.warn("tcp port is [{},{}],http port is [{},{}]",r.getTcpPort(),dssWorkspaceStarRocksCluster.getTcpPort(),
933+
r.getHttpPort(),dssWorkspaceStarRocksCluster.getHttpPort());
934+
throw new DSSErrorException(90054, String.format("%s workspace, %s StarRocks cluster port cannot be updated", r.getWorkspaceName(),r.getClusterName()));
935+
}
936+
}
937+
938+
909939
if (Integer.parseInt(r.getHttpPort()) < 0 || Integer.parseInt(r.getHttpPort()) > 65535 || Integer.parseInt(r.getTcpPort()) < 0 || Integer.parseInt(r.getTcpPort()) > 65535) {
910940
LOGGER.warn("工作空间{}StarRocks集群端口必须在0-65535之间", r.getWorkspaceName());
911941
throw new DSSErrorException(90054, String.format("%s workspace StarRocks cluster port must be between 0 and 65535", r.getWorkspaceName()));
@@ -928,11 +958,6 @@ public List<DSSWorkspaceStarRocksCluster> updateStarRocksCluster(List<UpdateWork
928958
List<StarRocksNodeInfo> starRocksNodeInfos = dssFlowService.queryStarRocksNodeList(workspaceId);
929959
Set<String> executeClusterSet = starRocksNodeInfos.stream().map(StarRocksNodeInfo::getNodeUiValue).collect(Collectors.toSet());
930960

931-
List<DSSWorkspaceStarRocksCluster> itemsByWorkspaceId = dssWorkspaceStarRocksClusterMapper.getItemsByWorkspaceId(workspaceId);
932-
933-
Map<String, DSSWorkspaceStarRocksCluster> existsClusters = itemsByWorkspaceId.stream()
934-
.collect(Collectors.toMap(DSSWorkspaceStarRocksCluster::getClusterName, cluster -> cluster));
935-
936961
Map<String, UpdateWorkspaceStarRocksClusterRequest> requestCluster = request.stream().
937962
collect(Collectors.toMap(UpdateWorkspaceStarRocksClusterRequest::getClusterName, r -> r));
938963

@@ -946,26 +971,7 @@ public List<DSSWorkspaceStarRocksCluster> updateStarRocksCluster(List<UpdateWork
946971
dssWorkspaceStarRocksClusterMapper.deleteItemById(item.getId());
947972
}
948973
} else {
949-
//如果属性有更新且该集群有被引用那么就需要更新引用该集群的starrocks节点属性信息
950-
if ( executeClusterSet.contains(requestOne.getClusterName()) && (!Objects.equals(item.getClusterIp(), requestOne.getClusterIp())
951-
|| !Objects.equals(item.getTcpPort(), requestOne.getTcpPort())) ) {
952-
LOGGER.info("集群{}的ip或者tcp端口信息有变更,需要修改使用该集群的节点信息", requestOne.getClusterName());
953-
List<String> nodeKeys = starRocksNodeInfos.stream().filter(s -> s.getNodeUiValue().equals(requestOne.getClusterName())).map(StarRocksNodeInfo::getNodeKey).collect(Collectors.toList());
954-
List<StarRocksNodeInfo> updateNodes = starRocksNodeInfos.stream().filter(s -> nodeKeys.contains(s.getNodeKey())).collect(Collectors.toList());
955-
DSSWorkspaceStarRocksCluster oneByClusterName = dssWorkspaceStarRocksClusterMapper.getOneByClusterName(requestOne.getClusterName());
956-
oneByClusterName.setClusterIp(requestOne.getClusterIp());
957-
oneByClusterName.setTcpPort(requestOne.getTcpPort());
958-
try {
959-
BatchEditFlowRequest editFlowRequest = getEditFlowRequest(updateNodes, oneByClusterName);
960-
dssFlowService.batchEditFlow(editFlowRequest, ticketId, workspace, userName);
961-
} catch (Exception e) {
962-
LOGGER.error("集群{}有被工作流中starrocks的节点引用,在批量修改starrocks节点使用的集群信息失败,不允许修改集群信息", requestOne.getClusterName());
963-
throw new DSSErrorException(90054, String.format("集群 %s 有被工作流中starrocks的节点引用,在批量修改starrocks节点使用的集群信息失败,不允许修改集群信息", requestOne.getClusterName()) );
964-
}
965-
}
966-
item.setClusterIp(requestOne.getClusterIp());
967-
item.setHttpPort(requestOne.getHttpPort());
968-
item.setTcpPort(requestOne.getTcpPort());
974+
969975
item.setDefaultCluster(requestOne.getDefaultCluster());
970976
item.setUpdateUser(requestOne.getUsername());
971977
item.setUpdateTime(new Date());

0 commit comments

Comments
 (0)