Skip to content

Commit 203859b

Browse files
authored
Merge branch 'didi:dev' into dev
2 parents 9a25c22 + 0a03f41 commit 203859b

File tree

28 files changed

+281
-79
lines changed

28 files changed

+281
-79
lines changed

bin/startup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ error_exit ()
99
[ ! -e "$JAVA_HOME/bin/java" ] && unset JAVA_HOME
1010

1111
if [ -z "$JAVA_HOME" ]; then
12-
if $darwin; then
12+
if [ "Darwin" = "$(uname -s)" ]; then
1313

1414
if [ -x '/usr/libexec/java_home' ] ; then
1515
export JAVA_HOME=`/usr/libexec/java_home`

docs/install_guide/版本升级手册.md

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ thread-pool:
4040

4141
```
4242

43-
44-
**SQL变更**
43+
**SQL 变更**
4544

4645
```sql
4746
-- 多集群管理权限2022-09-06新增
@@ -80,12 +79,11 @@ ALTER TABLE `logi_security_oplog`
8079

8180
### 6.2.2、升级至 `v3.0.0-beta.1`版本
8281

83-
84-
**SQL变更**
82+
**SQL 变更**
8583

8684
1、在`ks_km_broker`表增加了一个监听信息字段。
87-
2、为`logi_security_oplog`表operation_methods字段设置默认值''。
88-
因此需要执行下面的sql对数据库表进行更新
85+
2、为`logi_security_oplog`表 operation_methods 字段设置默认值''。
86+
因此需要执行下面的 sql 对数据库表进行更新
8987

9088
```sql
9189
ALTER TABLE `ks_km_broker`
@@ -98,7 +96,6 @@ ALTER COLUMN `operation_methods` set default '';
9896

9997
---
10098

101-
10299
### 6.2.3、`2.x`版本 升级至 `v3.0.0-beta.0`版本
103100

104101
**升级步骤:**
@@ -123,14 +120,14 @@ ALTER COLUMN `operation_methods` set default '';
123120
UPDATE ks_km_topic
124121
INNER JOIN
125122
(SELECT
126-
topic.cluster_id AS cluster_id,
127-
topic.topic_name AS topic_name,
128-
topic.description AS description
123+
topic.cluster_id AS cluster_id,
124+
topic.topic_name AS topic_name,
125+
topic.description AS description
129126
FROM topic WHERE description != ''
130127
) AS t
131128

132-
ON ks_km_topic.cluster_phy_id = t.cluster_id
133-
AND ks_km_topic.topic_name = t.topic_name
134-
AND ks_km_topic.id > 0
135-
SET ks_km_topic.description = t.description;
129+
ON ks_km_topic.cluster_phy_id = t.cluster_id
130+
AND ks_km_topic.topic_name = t.topic_name
131+
AND ks_km_topic.id > 0
132+
SET ks_km_topic.description = t.description;
136133
```

docs/user_guide/用户使用手册.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
下面是用户第一次使用我们产品的典型体验路径:
1313

14-
![text](http://img-ys011.didistatic.com/static/dc2img/do1_YehqxqmsVaqU5gf3XphI)
14+
![text](http://img-ys011.didistatic.com/static/dc2img/do1_qgqPsAY46sZeBaPUCwXY)
1515

1616
## 5.3、常用功能
1717

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO;
2020
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
2121
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
22-
import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum;
22+
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
2323
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
2424
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
2525
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
@@ -199,12 +199,12 @@ private Result<Void> checkFieldLegal(GroupOffsetResetDTO dto) {
199199
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getTopicNotExist(dto.getClusterId(), dto.getTopicName()));
200200
}
201201

202-
if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getResetType()
202+
if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getResetType()
203203
&& ValidateUtils.isEmptyList(dto.getOffsetList())) {
204204
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "参数错误,指定offset重置需传offset信息");
205205
}
206206

207-
if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()
207+
if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()
208208
&& ValidateUtils.isNull(dto.getTimestamp())) {
209209
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "参数错误,指定时间重置需传时间信息");
210210
}
@@ -213,7 +213,7 @@ private Result<Void> checkFieldLegal(GroupOffsetResetDTO dto) {
213213
}
214214

215215
private Result<Map<TopicPartition, Long>> getPartitionOffset(GroupOffsetResetDTO dto) {
216-
if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getResetType()) {
216+
if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getResetType()) {
217217
return Result.buildSuc(dto.getOffsetList().stream().collect(Collectors.toMap(
218218
elem -> new TopicPartition(dto.getTopicName(), elem.getPartitionId()),
219219
PartitionOffsetDTO::getOffset,
@@ -222,9 +222,9 @@ private Result<Map<TopicPartition, Long>> getPartitionOffset(GroupOffsetResetDTO
222222
}
223223

224224
OffsetSpec offsetSpec = null;
225-
if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()) {
225+
if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()) {
226226
offsetSpec = OffsetSpec.forTimestamp(dto.getTimestamp());
227-
} else if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getResetType()) {
227+
} else if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getResetType()) {
228228
offsetSpec = OffsetSpec.earliest();
229229
} else {
230230
offsetSpec = OffsetSpec.latest();

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.xiaojukeji.know.streaming.km.biz.topic;
22

3+
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
34
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
45
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
56
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO;

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,26 @@
2222
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
2323
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
2424
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
25-
import com.xiaojukeji.know.streaming.km.common.converter.PartitionConverter;
2625
import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter;
26+
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
27+
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
2728
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
2829
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
2930
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
31+
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
3032
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
3133
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
3234
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
3335
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
34-
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
3536
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
37+
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
3638
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
3739
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
3840
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems;
41+
import org.apache.commons.lang3.ObjectUtils;
42+
import org.apache.commons.lang3.StringUtils;
3943
import org.apache.kafka.clients.admin.OffsetSpec;
40-
import org.apache.kafka.clients.consumer.ConsumerConfig;
41-
import org.apache.kafka.clients.consumer.ConsumerRecord;
42-
import org.apache.kafka.clients.consumer.ConsumerRecords;
43-
import org.apache.kafka.clients.consumer.KafkaConsumer;
44+
import org.apache.kafka.clients.consumer.*;
4445
import org.apache.kafka.common.TopicPartition;
4546
import org.apache.kafka.common.config.TopicConfig;
4647
import org.springframework.beans.factory.annotation.Autowired;
@@ -160,8 +161,31 @@ public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String to
160161
}
161162
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
162163
kafkaConsumer.assign(partitionList);
164+
165+
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetAndTimestampMap = new HashMap<>();
166+
// 获取指定时间每个分区的offset(按指定开始时间查询消息时)
167+
if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
168+
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
169+
partitionList.forEach(topicPartition -> {
170+
timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs());
171+
});
172+
partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
173+
}
174+
163175
for (TopicPartition partition : partitionList) {
164-
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
176+
if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) {
177+
// 重置到最旧
178+
kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition));
179+
} else if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
180+
// 重置到指定时间
181+
kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset());
182+
} else if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) {
183+
// 重置到指定位置
184+
185+
} else {
186+
// 默认,重置到最新
187+
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
188+
}
165189
}
166190

167191
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间
@@ -185,6 +209,15 @@ public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String to
185209
}
186210
}
187211

212+
// 排序
213+
if (ObjectUtils.isNotEmpty(voList)) {
214+
// 默认按时间倒序排序
215+
if (StringUtils.isBlank(dto.getSortType())) {
216+
dto.setSortType(SortTypeEnum.DESC.getSortType());
217+
}
218+
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType());
219+
}
220+
188221
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));
189222
} catch (Exception e) {
190223
log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhyId, topicName, dto, e);

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/version/impl/VersionControlManagerImpl.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77
import com.didiglobal.logi.security.common.dto.config.ConfigDTO;
88
import com.didiglobal.logi.security.service.ConfigService;
99
import com.xiaojukeji.know.streaming.km.biz.version.VersionControlManager;
10+
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDetailDTO;
1011
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.UserMetricConfigDTO;
1112
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.metric.UserMetricConfig;
1213
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
1314
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem;
1415
import com.xiaojukeji.know.streaming.km.common.bean.vo.config.metric.UserMetricConfigVO;
1516
import com.xiaojukeji.know.streaming.km.common.bean.vo.version.VersionItemVO;
17+
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
1618
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum;
1719
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
1820
import com.xiaojukeji.know.streaming.km.common.utils.VersionUtil;
@@ -159,6 +161,9 @@ public Result<List<UserMetricConfigVO>> listUserMetricItem(Long clusterId, Integ
159161

160162
UserMetricConfig umc = userMetricConfigMap.get(itemType + "@" + metric);
161163
userMetricConfigVO.setSet(null != umc && umc.isSet());
164+
if (umc != null) {
165+
userMetricConfigVO.setRank(umc.getRank());
166+
}
162167
userMetricConfigVO.setName(itemVO.getName());
163168
userMetricConfigVO.setType(itemVO.getType());
164169
userMetricConfigVO.setDesc(itemVO.getDesc());
@@ -178,13 +183,30 @@ public Result<List<UserMetricConfigVO>> listUserMetricItem(Long clusterId, Integ
178183
@Override
179184
public Result<Void> updateUserMetricItem(Long clusterId, Integer type, UserMetricConfigDTO dto, String operator) {
180185
Map<String, Boolean> metricsSetMap = dto.getMetricsSet();
181-
if(null == metricsSetMap || metricsSetMap.isEmpty()){
186+
187+
//转换metricDetailDTOList
188+
List<MetricDetailDTO> metricDetailDTOList = dto.getMetricDetailDTOList();
189+
Map<String, MetricDetailDTO> metricDetailMap = new HashMap<>();
190+
if (metricDetailDTOList != null && !metricDetailDTOList.isEmpty()) {
191+
metricDetailMap = metricDetailDTOList.stream().collect(Collectors.toMap(MetricDetailDTO::getMetric, Function.identity()));
192+
}
193+
194+
//转换metricsSetMap
195+
if (metricsSetMap != null && !metricsSetMap.isEmpty()) {
196+
for (Map.Entry<String, Boolean> metricAndShowEntry : metricsSetMap.entrySet()) {
197+
if (metricDetailMap.containsKey(metricAndShowEntry.getKey())) continue;
198+
metricDetailMap.put(metricAndShowEntry.getKey(), new MetricDetailDTO(metricAndShowEntry.getKey(), metricAndShowEntry.getValue(), null));
199+
}
200+
}
201+
202+
if (metricDetailMap.isEmpty()) {
182203
return Result.buildSuc();
183204
}
184205

206+
185207
Set<UserMetricConfig> userMetricConfigs = getUserMetricConfig(operator);
186208
for(Map.Entry<String, Boolean> metricAndShowEntry : metricsSetMap.entrySet()){
187-
UserMetricConfig userMetricConfig = new UserMetricConfig(type, metricAndShowEntry.getKey(), metricAndShowEntry.getValue());
209+
UserMetricConfig userMetricConfig = new UserMetricConfig(type, metricAndShowEntry.getKey(), metricAndShowEntry.getValue(), metricDetailMap.get(metricAndShowEntry.getKey()).getRank());
188210
userMetricConfigs.remove(userMetricConfig);
189211
userMetricConfigs.add(userMetricConfig);
190212
}
@@ -228,7 +250,15 @@ private Set<UserMetricConfig> getUserMetricConfig(String operator){
228250
return defaultMetrics;
229251
}
230252

231-
return JSON.parseObject(value, new TypeReference<Set<UserMetricConfig>>(){});
253+
Set<UserMetricConfig> userMetricConfigs = JSON.parseObject(value, new TypeReference<Set<UserMetricConfig>>() {});
254+
255+
//补充rank不存在情况
256+
for (UserMetricConfig userMetricConfig : userMetricConfigs) {
257+
if (userMetricConfig.getRank() == 0) {
258+
userMetricConfig.setRank(Constant.DEFAULT_METRIC_RANK);
259+
}
260+
}
261+
return userMetricConfigs;
232262
}
233263

234264
public static void main(String[] args){

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/group/GroupOffsetResetDTO.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
44
import com.xiaojukeji.know.streaming.km.common.bean.dto.partition.PartitionOffsetDTO;
55
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.ClusterTopicDTO;
6+
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
67
import io.swagger.annotations.ApiModelProperty;
78
import lombok.Data;
89

@@ -23,7 +24,7 @@ public class GroupOffsetResetDTO extends ClusterTopicDTO {
2324
private String groupName;
2425

2526
/**
26-
* @see com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum
27+
* @see OffsetTypeEnum
2728
*/
2829
@NotNull(message = "resetType不允许为空")
2930
@ApiModelProperty(value = "重置方式", example = "1")
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.xiaojukeji.know.streaming.km.common.bean.dto.metrices;
2+
3+
import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO;
4+
import io.swagger.annotations.ApiModel;
5+
import io.swagger.annotations.ApiModelProperty;
6+
import lombok.AllArgsConstructor;
7+
import lombok.Data;
8+
import lombok.NoArgsConstructor;
9+
10+
/**
11+
* @author didi
12+
*/
13+
@Data
14+
@NoArgsConstructor
15+
@AllArgsConstructor
16+
@ApiModel(description = "指标详细属性信息")
17+
public class MetricDetailDTO extends BaseDTO {
18+
19+
@ApiModelProperty("指标名称")
20+
private String metric;
21+
22+
@ApiModelProperty("指标是否显示")
23+
private Boolean set;
24+
25+
@ApiModelProperty("指标优先级")
26+
private Integer rank;
27+
28+
}

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/UserMetricConfigDTO.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import lombok.Data;
88
import lombok.NoArgsConstructor;
99

10+
import java.util.List;
1011
import java.util.Map;
1112

1213

@@ -17,4 +18,7 @@
1718
public class UserMetricConfigDTO extends BaseDTO {
1819
@ApiModelProperty("指标展示设置项,key:指标名;value:是否展现(true展现/false不展现)")
1920
private Map<String, Boolean> metricsSet;
21+
22+
@ApiModelProperty("指标自定义属性列表")
23+
private List<MetricDetailDTO> metricDetailDTOList;
2024
}

0 commit comments

Comments
 (0)