Skip to content

Commit 0e49002

Browse files
committed
Topic消息查询支持Timestamp排序,接口支持按指定日期查询
1 parent 8e50d14 commit 0e49002

File tree

6 files changed

+50
-21
lines changed

6 files changed

+50
-21
lines changed

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
public interface TopicStateManager {
1717
TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, String searchBrokerHost) throws NotExistException;
1818

19-
Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException;
19+
Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException;
2020

2121
Result<TopicStateVO> getTopicState(Long clusterPhyId, String topicName);
2222

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.didiglobal.logi.log.ILog;
44
import com.didiglobal.logi.log.LogFactory;
55
import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager;
6-
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
76
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
87
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
98
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
@@ -40,10 +39,7 @@
4039
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems;
4140
import org.apache.commons.lang3.ObjectUtils;
4241
import org.apache.kafka.clients.admin.OffsetSpec;
43-
import org.apache.kafka.clients.consumer.ConsumerConfig;
44-
import org.apache.kafka.clients.consumer.ConsumerRecord;
45-
import org.apache.kafka.clients.consumer.ConsumerRecords;
46-
import org.apache.kafka.clients.consumer.KafkaConsumer;
42+
import org.apache.kafka.clients.consumer.*;
4743
import org.apache.kafka.common.TopicPartition;
4844
import org.apache.kafka.common.config.TopicConfig;
4945
import org.springframework.beans.factory.annotation.Autowired;
@@ -123,7 +119,7 @@ public TopicBrokerAllVO getTopicBrokerAll(Long clusterPhyId, String topicName, S
123119
}
124120

125121
@Override
126-
public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto, PaginationSortDTO sortDto) throws AdminOperateException {
122+
public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String topicName, TopicRecordDTO dto) throws AdminOperateException {
127123
long startTime = System.currentTimeMillis();
128124

129125
// 获取集群
@@ -163,10 +159,29 @@ public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String to
163159
}
164160
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
165161
kafkaConsumer.assign(partitionList);
162+
163+
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetAndTimestampMap = new HashMap<>();
164+
// 获取指定时间每个分区的offset(按指定开始时间查询消息时)
165+
if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
166+
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
167+
partitionList.forEach(topicPartition -> {
168+
timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs());
169+
});
170+
partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
171+
}
172+
166173
for (TopicPartition partition : partitionList) {
167174
if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) {
175+
// 重置到最旧
168176
kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition));
177+
} else if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
178+
// 重置到指定时间
179+
kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset());
180+
} else if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) {
181+
// 重置到指定位置
182+
169183
} else {
184+
// 默认,重置到最新
170185
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
171186
}
172187
}
@@ -194,7 +209,7 @@ public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String to
194209

195210
// 排序
196211
if (ObjectUtils.isNotEmpty(voList)) {
197-
PaginationUtil.pageBySort(voList, sortDto.getSortField(), sortDto.getSortField());
212+
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType());
198213
}
199214

200215
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
44
import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO;
5+
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
56
import io.swagger.annotations.ApiModel;
67
import io.swagger.annotations.ApiModelProperty;
78
import lombok.Data;
@@ -15,7 +16,7 @@
1516
@Data
1617
@JsonIgnoreProperties(ignoreUnknown = true)
1718
@ApiModel(description = "Topic记录")
18-
public class TopicRecordDTO extends BaseDTO {
19+
public class TopicRecordDTO extends PaginationSortDTO {
1920
@NotNull(message = "truncate不允许为空")
2021
@ApiModelProperty(value = "是否截断", example = "true")
2122
private Boolean truncate;
@@ -38,4 +39,10 @@ public class TopicRecordDTO extends BaseDTO {
3839
@ApiModelProperty(value = "offset", example = "")
3940
private Integer filterOffsetReset = 0;
4041

42+
@ApiModelProperty(value = "开始日期时间戳", example = "")
43+
private Long startTimestampUnitMs;
44+
45+
@ApiModelProperty(value = "结束日期时间戳", example = "")
46+
private Long utilTimestampUnitMs;
47+
4148
}

km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ const TopicMessages = (props: any) => {
3232

3333
// 获取消息开始位置
3434
const offsetResetList = [
35-
{ 'label': 'latest', value: '0' },
36-
{ 'label': 'earliest', value: '1' }
35+
{ 'label': 'latest', value: 0 },
36+
{ 'label': 'earliest', value: 1 }
3737
];
3838

3939
// 默认排序
@@ -42,6 +42,8 @@ const TopicMessages = (props: any) => {
4242
sortType: 'desc',
4343
};
4444

45+
const [sorter, setSorter] = useState<any>(defaultSorter);
46+
4547
// 请求接口获取数据
4648
const genData = async () => {
4749
if (urlParams?.clusterId === undefined || hashData?.topicName === undefined) return;
@@ -56,7 +58,7 @@ const TopicMessages = (props: any) => {
5658
});
5759
setPartitionIdList(newPartitionIdList || []);
5860
});
59-
request(Api.getTopicMessagesList(hashData?.topicName, urlParams?.clusterId), { data: { ...params, ...defaultSorter }, method: 'POST' })
61+
request(Api.getTopicMessagesList(hashData?.topicName, urlParams?.clusterId), { data: { ...params, ...sorter }, method: 'POST' })
6062
.then((res: any) => {
6163
// setPagination({
6264
// current: res.pagination?.pageNo,
@@ -94,19 +96,24 @@ const TopicMessages = (props: any) => {
9496
history.push(`/cluster/${urlParams?.clusterId}/testing/consumer`);
9597
};
9698

97-
const onTableChange = (pagination: any, filters: any, sorter: any) => {
98-
defaultSorter.sortField = sorter.field || '';
99-
defaultSorter.sortType = sorter.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : '';
99+
const onTableChange = (pagination: any, filters: any, sorter: any, extra: any) => {
100100
setPagination(pagination);
101-
genData();
101+
// 只有排序事件时,触发重新请求后端数据
102+
if(extra.action === 'sort') {
103+
setSorter({
104+
sortField: sorter.field || '',
105+
sortType: sorter.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : ''
106+
});
107+
genData();
108+
}
102109
// const asc = sorter?.order && sorter?.order === 'ascend' ? true : false;
103110
// const sortColumn = sorter.field && toLine(sorter.field);
104111
// genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, asc, sortColumn, queryTerm: searchResult, ...allParams });
105112
};
106113

107114
useEffect(() => {
108115
props.positionType === 'Messages' && genData();
109-
}, [props, params]);
116+
}, [props, params, sorter]);
110117

111118
return (
112119
<>

km-console/packages/layout-clusters-fe/src/pages/TopicDetail/config.tsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ export const getTopicMessagesColmns = () => {
8585
title: 'Timestamp',
8686
dataIndex: 'timestampUnitMs',
8787
key: 'timestampUnitMs',
88-
render: (t: number) => (t ? moment(t).format(timeFormat) : '-'),
88+
sorter: true,
89+
render: (t: number) => (t ? moment(t).format(timeFormat) + '.' + moment(t).millisecond() : '-'),
8990
},
9091
{
9192
title: 'Key',

km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,8 @@ public Result<List<TopicPartitionVO>> getTopicPartitions(@PathVariable Long clus
9292
@ResponseBody
9393
public Result<List<TopicRecordVO>> getTopicMessages(@PathVariable Long clusterPhyId,
9494
@PathVariable String topicName,
95-
@Validated @RequestBody TopicRecordDTO dto,
96-
@Validated PaginationSortDTO sortDto) throws Exception {
97-
return topicStateManager.getTopicMessages(clusterPhyId, topicName, dto, sortDto);
95+
@Validated @RequestBody TopicRecordDTO dto) throws Exception {
96+
return topicStateManager.getTopicMessages(clusterPhyId, topicName, dto);
9897
}
9998

10099
@ApiOperation(value = "Topic-ACL信息", notes = "")

0 commit comments

Comments
 (0)