Skip to content

Commit 8e50d14

Browse files
committed
Topic消息查询支持Timestamp排序,支持查询最新消息或最早消息 #534
1 parent 0f35427 commit 8e50d14

File tree

6 files changed

+18
-32
lines changed

6 files changed

+18
-32
lines changed

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.xiaojukeji.know.streaming.km.biz.topic;
22

3+
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
34
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
45
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
56
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO;
@@ -15,7 +16,7 @@
1516
public interface TopicStateManager {
1617
TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException;
1718

18-
Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException;
19+
Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException;
1920

2021
Result<TopicStateVO> getTopicState(Long clusterPhyId, String topicName);
2122

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.didiglobal.logi.log.ILog;
44
import com.didiglobal.logi.log.LogFactory;
55
import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager;
6+
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
67
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
78
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
89
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
@@ -22,17 +23,18 @@
2223
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
2324
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
2425
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
25-
import com.xiaojukeji.know.streaming.km.common.converter.PartitionConverter;
2626
import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter;
27+
import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum;
2728
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
2829
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
2930
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
31+
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
3032
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
3133
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
3234
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
3335
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
34-
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
3536
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
37+
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
3638
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
3739
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
3840
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems;
@@ -121,7 +123,7 @@ public TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, S
121123
}
122124

123125
@Override
124-
public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException {
126+
public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException {
125127
long startTime = System.currentTimeMillis();
126128

127129
// 获取集群
@@ -162,7 +164,7 @@ public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String to
162164
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
163165
kafkaConsumer.assign(partitionList);
164166
for (TopicPartition partition : partitionList) {
165-
if (Constant.EARLIEST.equals(dto.getFilterOffsetReset())) {
167+
if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) {
166168
kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition));
167169
} else {
168170
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
@@ -192,11 +194,7 @@ public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String to
192194

193195
// 排序
194196
if (ObjectUtils.isNotEmpty(voList)) {
195-
if (Constant.ASC.equals(dto.getSortType())) {
196-
voList.sort((o1, o2) -> (int) (o1.getTimestampUnitMs() - o2.getTimestampUnitMs()));
197-
} else if (Constant.DESC.equals(dto.getSortType())) {
198-
voList.sort((o1, o2) -> (int) (o2.getTimestampUnitMs() - o1.getTimestampUnitMs()));
199-
}
197+
PaginationUtil.pageBySort(voList, sortDto.getSortField(), sortDto.getSortField());
200198
}
201199

202200
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,7 @@ public class TopicRecordDTO extends BaseDTO {
3535
@ApiModelProperty(value = "预览超时时间", example = "10000")
3636
private Long pullTimeoutUnitMs = 8000L;
3737

38-
@ApiModelProperty(value = "排序", example = "desc")
39-
private String sortType;
40-
41-
@ApiModelProperty(value = "offset", example = "latest")
42-
private String filterOffsetReset;
38+
@ApiModelProperty(value = "offset", example = "")
39+
private Integer filterOffsetReset = 0;
4340

4441
}

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,4 @@ private Constant() {}
6565

6666
public static final Integer DEFAULT_RETRY_TIME = 3;
6767

68-
/**
69-
* 排序
70-
*/
71-
public static final String ASC = "asc";
72-
public static final String DESC = "desc";
73-
74-
/**
75-
* 消费策略
76-
*/
77-
public static final String LATEST = "latest";
78-
public static final String EARLIEST = "earliest";
79-
8068
}

km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ const defaultParams: any = {
1010
maxRecords: 100,
1111
pullTimeoutUnitMs: 5000,
1212
// filterPartitionId: 1,
13-
filterOffsetReset: 'latest'
13+
filterOffsetReset: 0
1414
};
1515
const defaultpaPagination = {
1616
current: 1,
@@ -32,8 +32,8 @@ const TopicMessages = (props: any) => {
3232

3333
// 获取消息开始位置
3434
const offsetResetList = [
35-
{ 'label': 'latest', value: 'latest' },
36-
{ 'label': 'earliest', value: 'earliest' }
35+
{ 'label': 'latest', value: '0' },
36+
{ 'label': 'earliest', value: '1' }
3737
];
3838

3939
// 默认排序

km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager;
44
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
55
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
6+
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
67
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
78
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
89
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
@@ -91,8 +92,9 @@ public Result<List<TopicPartitionVO>> getTopicPartitions(@PathVariable Long clus
9192
@ResponseBody
9293
public Result<List<TopicRecordVO>> getTopicMessages(@PathVariable Long clusterPhyId,
9394
@PathVariable String topicName,
94-
@Validated @RequestBody TopicRecordDTO dto) throws Exception {
95-
return topicStateManager.getTopicMessages(clusterPhyId, topicName, dto);
95+
@Validated @RequestBody TopicRecordDTO dto,
96+
@Validated PaginationSortDTO sortDto) throws Exception {
97+
return topicStateManager.getTopicMessages(clusterPhyId, topicName, dto, sortDto);
9698
}
9799

98100
@ApiOperation(value = "Topic-ACL信息", notes = "")

0 commit comments

Comments
 (0)