Skip to content

Commit c6bcc0e

Browse files
authored
Merge pull request #297 from didi/dev
split op util controller to topic controller and leader controller, and add authority controller, quota controller
2 parents c2ad2d7 + 7719339 commit c6bcc0e

File tree

15 files changed

+344
-98
lines changed

15 files changed

+344
-98
lines changed

kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/TopicQuota.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.xiaojukeji.kafka.manager.common.entity.ao.gateway;
22

3+
import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicQuotaDTO;
4+
35
/**
46
* @author zhongyuankai
57
* @date 2020/4/27
@@ -65,4 +67,15 @@ public String toString() {
6567
", consumeQuota=" + consumeQuota +
6668
'}';
6769
}
70+
71+
public static TopicQuota buildFrom(TopicQuotaDTO dto) {
72+
TopicQuota topicQuota = new TopicQuota();
73+
topicQuota.setAppId(dto.getAppId());
74+
topicQuota.setClusterId(dto.getClusterId());
75+
topicQuota.setTopicName(dto.getTopicName());
76+
topicQuota.setProduceQuota(dto.getProduceQuota());
77+
topicQuota.setConsumeQuota(dto.getConsumeQuota());
78+
return topicQuota;
79+
}
80+
6881
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.xiaojukeji.kafka.manager.common.entity.dto.gateway;
2+
3+
import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO;
4+
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
5+
import io.swagger.annotations.ApiModel;
6+
import io.swagger.annotations.ApiModelProperty;
7+
8+
@ApiModel(description = "配额调整")
9+
public class TopicQuotaDTO extends ClusterTopicDTO {
10+
@ApiModelProperty(value = "appId")
11+
private String appId;
12+
13+
@ApiModelProperty(value = "发送数据速率B/s")
14+
private Long produceQuota;
15+
16+
@ApiModelProperty(value = "消费数据速率B/s")
17+
private Long consumeQuota;
18+
19+
public String getAppId() {
20+
return appId;
21+
}
22+
23+
public void setAppId(String appId) {
24+
this.appId = appId;
25+
}
26+
27+
public Long getProduceQuota() {
28+
return produceQuota;
29+
}
30+
31+
public void setProduceQuota(Long produceQuota) {
32+
this.produceQuota = produceQuota;
33+
}
34+
35+
public Long getConsumeQuota() {
36+
return consumeQuota;
37+
}
38+
39+
public void setConsumeQuota(Long consumeQuota) {
40+
this.consumeQuota = consumeQuota;
41+
}
42+
43+
@Override
44+
public boolean paramLegal() {
45+
return !ValidateUtils.isNullOrLessThanZero(clusterId) && !ValidateUtils.isBlank(topicName) && !ValidateUtils.isBlank(appId);
46+
}
47+
}

kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
1111
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicExpiredDO;
1212
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicStatisticsDO;
13+
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
1314

1415
import java.util.Date;
1516
import java.util.List;
@@ -122,5 +123,12 @@ public interface TopicManagerService {
122123
List<TopicStatisticsDO> getTopicStatistic(Long clusterId, String topicName, Date startTime, Date endTime);
123124

124125
TopicBusinessInfo getTopicBusinessInfo(Long physicalClusterId, String topicName);
126+
127+
/**
128+
* topic权限调整
129+
* @param authorityDO topic权限
130+
* @return
131+
*/
132+
ResultStatus addAuthority(AuthorityDO authorityDO);
125133
}
126134

kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,5 @@ List<String> fetchTopicData(KafkaConsumer kafkaConsumer,
105105
List<TopicBrokerDTO> getTopicBrokerList(Long clusterId, String topicName);
106106

107107
Result<TopicOffsetChangedEnum> checkTopicOffsetChanged(Long physicalClusterId, String topicName, Long latestTime);
108+
108109
}

kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/QuotaService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.xiaojukeji.kafka.manager.service.service.gateway;
22

3+
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
34
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
45

56
/**
@@ -34,4 +35,11 @@ public interface QuotaService {
3435
TopicQuota getQuotaFromZk(Long clusterId, String topicName, String appId);
3536

3637
Boolean modifyProduceQuota(Long clusterId, String topicName, String appId, Long produceQuota);
38+
39+
/**
40+
* topic配额调整
41+
* @param topicQuota topic配额
42+
* @return
43+
*/
44+
ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota);
3745
}

kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/QuotaServiceImpl.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package com.xiaojukeji.kafka.manager.service.service.gateway.impl;
22

3+
import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum;
4+
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
35
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
6+
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
47
import com.xiaojukeji.kafka.manager.common.utils.NumberUtils;
58
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
69
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.config.TopicQuotaData;
710
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
11+
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
812
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
13+
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
914
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
1015
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
1116
import com.xiaojukeji.kafka.manager.service.strategy.AbstractAllocateQuotaStrategy;
@@ -28,6 +33,12 @@ public class QuotaServiceImpl implements QuotaService {
2833
@Autowired
2934
private AbstractAllocateQuotaStrategy allocateQuotaStrategy;
3035

36+
@Autowired
37+
private LogicalClusterMetadataManager logicalClusterMetadataManager;
38+
39+
@Autowired
40+
private AuthorityService authorityService;
41+
3142
@Override
3243
public int addTopicQuota(TopicQuota topicQuotaDO) {
3344
return KafkaZookeeperUtils.setTopicQuota(
@@ -78,4 +89,34 @@ public Boolean modifyProduceQuota(Long clusterId, String topicName, String appId
7889
}
7990
return Boolean.TRUE;
8091
}
92+
93+
@Override
94+
public ResultStatus addTopicQuotaByAuthority(TopicQuota topicQuota) {
95+
// 获取物理集群id
96+
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(topicQuota.getClusterId());
97+
if (ValidateUtils.isNull(physicalClusterId)) {
98+
return ResultStatus.CLUSTER_NOT_EXIST;
99+
}
100+
// 权限判断(access 0:无权限, 1:读, 2:写, 3:读写,4:可管理)
101+
AuthorityDO authority = authorityService.getAuthority(physicalClusterId,
102+
topicQuota.getTopicName(), topicQuota.getAppId());
103+
if (ValidateUtils.isNull(authority) || authority.getAccess() == TopicAuthorityEnum.DENY.getCode()) {
104+
return ResultStatus.USER_WITHOUT_AUTHORITY;
105+
}
106+
if (authority.getAccess() == TopicAuthorityEnum.READ.getCode()) {
107+
// 可以消费
108+
topicQuota.setProduceQuota(null);
109+
}
110+
if (authority.getAccess() == TopicAuthorityEnum.WRITE.getCode()) {
111+
// 可以生产
112+
topicQuota.setConsumeQuota(null);
113+
}
114+
// 设置物理集群id
115+
topicQuota.setClusterId(physicalClusterId);
116+
// 添加配额
117+
if (addTopicQuota(topicQuota) > 0) {
118+
return ResultStatus.SUCCESS;
119+
}
120+
return ResultStatus.ZOOKEEPER_WRITE_FAILED;
121+
}
81122
}

kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.xiaojukeji.kafka.manager.common.utils.DateUtils;
2121
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
2222
import com.xiaojukeji.kafka.manager.common.utils.NumberUtils;
23+
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
2324
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
2425
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
2526
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.config.TopicQuotaData;
@@ -618,6 +619,38 @@ public TopicBusinessInfo getTopicBusinessInfo(Long clusterId, String topicName)
618619
return topicBusinessInfo;
619620
}
620621

622+
@Override
623+
public ResultStatus addAuthority(AuthorityDO authorityDO) {
624+
// 查询该用户拥有的应用
625+
List<AppDO> appDOs = appService.getByPrincipal(SpringTool.getUserName());
626+
if (ValidateUtils.isEmptyList(appDOs)) {
627+
// 该用户无应用,需要先申请应用
628+
return ResultStatus.APP_NOT_EXIST;
629+
}
630+
List<Long> appIds = appDOs.stream().map(AppDO::getId).collect(Collectors.toList());
631+
if (!appIds.contains(authorityDO.getAppId())) {
632+
// 入参中的appId,该用户未拥有
633+
return ResultStatus.APP_NOT_EXIST;
634+
}
635+
// 获取物理集群id
636+
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(authorityDO.getClusterId());
637+
if (ValidateUtils.isNull(physicalClusterId)) {
638+
// 集群不存在
639+
return ResultStatus.CLUSTER_NOT_EXIST;
640+
}
641+
TopicDO topic = getByTopicName(physicalClusterId, authorityDO.getTopicName());
642+
if (ValidateUtils.isNull(topic)) {
643+
// topic不存在
644+
return ResultStatus.TOPIC_NOT_EXIST;
645+
}
646+
// 设置物理集群id
647+
authorityDO.setClusterId(physicalClusterId);
648+
if (authorityService.addAuthority(authorityDO) > 0) {
649+
return ResultStatus.SUCCESS;
650+
}
651+
return ResultStatus.MYSQL_ERROR;
652+
}
653+
621654
private RdTopicBasic convert2RdTopicBasic(ClusterDO clusterDO,
622655
String topicName,
623656
TopicDO topicDO,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.xiaojukeji.kafka.manager.openapi.common.dto;
2+
3+
import com.xiaojukeji.kafka.manager.common.entity.dto.ClusterTopicDTO;
4+
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
5+
import io.swagger.annotations.ApiModel;
6+
import io.swagger.annotations.ApiModelProperty;
7+
8+
@ApiModel(description = "权限调整")
9+
public class TopicAuthorityDTO extends ClusterTopicDTO {
10+
@ApiModelProperty(value = "appId")
11+
private String appId;
12+
13+
@ApiModelProperty(value = "0:无权限, 1:读, 2:写, 3:读写, 4:可管理")
14+
private Integer access;
15+
16+
public String getAppId() {
17+
return appId;
18+
}
19+
20+
public void setAppId(String appId) {
21+
this.appId = appId;
22+
}
23+
24+
public Integer getAccess() {
25+
return access;
26+
}
27+
28+
public void setAccess(Integer access) {
29+
this.access = access;
30+
}
31+
32+
@Override
33+
public boolean paramLegal() {
34+
return !ValidateUtils.isNullOrLessThanZero(clusterId)
35+
&& !ValidateUtils.isBlank(topicName)
36+
&& !ValidateUtils.isBlank(appId)
37+
&& !ValidateUtils.isNullOrLessThanZero(access);
38+
}
39+
}

kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ public Result<List<TopicMyAppVO>> getTopicMyApps(@PathVariable Long clusterId,
337337
}
338338

339339
return new Result<>(TopicModelConverter.convert2TopicMineAppVOList(
340-
topicManagerService.getTopicMineApps(physicalClusterId, topicName, SpringTool.getUserName()))
340+
topicManagerService.getTopicMineApps(physicalClusterId, topicName, SpringTool.getUserName()))
341341
);
342342
}
343343

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.xiaojukeji.kafka.manager.web.api.versionone.op;
2+
3+
import com.xiaojukeji.kafka.manager.common.entity.Result;
4+
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
5+
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
6+
import com.xiaojukeji.kafka.manager.openapi.common.dto.TopicAuthorityDTO;
7+
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
8+
import com.xiaojukeji.kafka.manager.web.converters.AuthorityConverter;
9+
import io.swagger.annotations.Api;
10+
import io.swagger.annotations.ApiOperation;
11+
import org.springframework.beans.factory.annotation.Autowired;
12+
import org.springframework.web.bind.annotation.*;
13+
14+
/**
15+
* Authority操作相关接口
16+
* @author zengqiao
17+
* @date 21/5/18
18+
*/
19+
@Api(tags = "OP-Authority操作相关接口(REST)")
20+
@RestController
21+
public class OpAuthorityController {
22+
@Autowired
23+
private TopicManagerService topicManagerService;
24+
25+
@ApiOperation(value = "权限调整",notes = "权限调整")
26+
@PostMapping(value = "topic-authorities")
27+
@ResponseBody
28+
public Result addAuthority(@RequestBody TopicAuthorityDTO dto) {
29+
//非空校验
30+
if (ValidateUtils.isNull(dto) || !dto.paramLegal()) {
31+
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
32+
}
33+
return Result.buildFrom(topicManagerService.addAuthority(AuthorityConverter.convert2AuthorityDO(dto)));
34+
}
35+
}

0 commit comments

Comments
 (0)