Skip to content

Commit 0bf85c9

Browse files
authored
Merge pull request #555 from superspeedone/dev
Dev
2 parents 630e582 + 405e6e0 commit 0bf85c9

File tree

7 files changed

+87
-13
lines changed

7 files changed

+87
-13
lines changed

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/TopicStateManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.xiaojukeji.know.streaming.km.biz.topic;
22

3+
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
34
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
45
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
56
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO;

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,26 @@
2222
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
2323
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
2424
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
25-
import com.xiaojukeji.know.streaming.km.common.converter.PartitionConverter;
2625
import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter;
26+
import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum;
27+
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
2728
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
2829
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
2930
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
31+
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
3032
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
3133
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
3234
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
3335
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
34-
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
3536
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
37+
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
3638
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
3739
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
3840
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems;
41+
import org.apache.commons.lang3.ObjectUtils;
42+
import org.apache.commons.lang3.StringUtils;
3943
import org.apache.kafka.clients.admin.OffsetSpec;
40-
import org.apache.kafka.clients.consumer.ConsumerConfig;
41-
import org.apache.kafka.clients.consumer.ConsumerRecord;
42-
import org.apache.kafka.clients.consumer.ConsumerRecords;
43-
import org.apache.kafka.clients.consumer.KafkaConsumer;
44+
import org.apache.kafka.clients.consumer.*;
4445
import org.apache.kafka.common.TopicPartition;
4546
import org.apache.kafka.common.config.TopicConfig;
4647
import org.springframework.beans.factory.annotation.Autowired;
@@ -160,8 +161,31 @@ public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String to
160161
}
161162
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
162163
kafkaConsumer.assign(partitionList);
164+
165+
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetAndTimestampMap = new HashMap<>();
166+
// 获取指定时间每个分区的offset(按指定开始时间查询消息时)
167+
if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
168+
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
169+
partitionList.forEach(topicPartition -> {
170+
timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs());
171+
});
172+
partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
173+
}
174+
163175
for (TopicPartition partition : partitionList) {
164-
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
176+
if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) {
177+
// 重置到最旧
178+
kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition));
179+
} else if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
180+
// 重置到指定时间
181+
kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset());
182+
} else if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) {
183+
// 重置到指定位置
184+
185+
} else {
186+
// 默认,重置到最新
187+
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
188+
}
165189
}
166190

167191
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间
@@ -185,6 +209,15 @@ public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String to
185209
}
186210
}
187211

212+
// 排序
213+
if (ObjectUtils.isNotEmpty(voList)) {
214+
// 默认按时间倒序排序
215+
if (StringUtils.isBlank(dto.getSortType())) {
216+
dto.setSortType(SortTypeEnum.DESC.getSortType());
217+
}
218+
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType());
219+
}
220+
188221
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));
189222
} catch (Exception e) {
190223
log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhyId, topicName, dto, e);

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
44
import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO;
5+
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
56
import io.swagger.annotations.ApiModel;
67
import io.swagger.annotations.ApiModelProperty;
78
import lombok.Data;
@@ -15,7 +16,7 @@
1516
@Data
1617
@JsonIgnoreProperties(ignoreUnknown = true)
1718
@ApiModel(description = "Topic记录")
18-
public class TopicRecordDTO extends BaseDTO {
19+
public class TopicRecordDTO extends PaginationSortDTO {
1920
@NotNull(message = "truncate不允许为空")
2021
@ApiModelProperty(value = "是否截断", example = "true")
2122
private Boolean truncate;
@@ -34,4 +35,14 @@ public class TopicRecordDTO extends BaseDTO {
3435

3536
@ApiModelProperty(value = "预览超时时间", example = "10000")
3637
private Long pullTimeoutUnitMs = 8000L;
38+
39+
@ApiModelProperty(value = "offset", example = "")
40+
private Integer filterOffsetReset = 0;
41+
42+
@ApiModelProperty(value = "开始日期时间戳", example = "")
43+
private Long startTimestampUnitMs;
44+
45+
@ApiModelProperty(value = "结束日期时间戳", example = "")
46+
private Long utilTimestampUnitMs;
47+
3748
}

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,5 @@ private Constant() {}
6464
public static final Float COLLECT_METRICS_ERROR_COST_TIME = -1.0F;
6565

6666
public static final Integer DEFAULT_RETRY_TIME = 3;
67+
6768
}

km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const defaultParams: any = {
1010
maxRecords: 100,
1111
pullTimeoutUnitMs: 5000,
1212
// filterPartitionId: 1,
13+
filterOffsetReset: 0
1314
};
1415
const defaultpaPagination = {
1516
current: 1,
@@ -29,12 +30,20 @@ const TopicMessages = (props: any) => {
2930
const [pagination, setPagination] = useState<any>(defaultpaPagination);
3031
const [form] = Form.useForm();
3132

33+
// 获取消息开始位置
34+
const offsetResetList = [
35+
{ 'label': 'latest', value: 0 },
36+
{ 'label': 'earliest', value: 1 }
37+
];
38+
3239
// 默认排序
3340
const defaultSorter = {
3441
sortField: 'timestampUnitMs',
3542
sortType: 'desc',
3643
};
3744

45+
const [sorter, setSorter] = useState<any>(defaultSorter);
46+
3847
// 请求接口获取数据
3948
const genData = async () => {
4049
if (urlParams?.clusterId === undefined || hashData?.topicName === undefined) return;
@@ -49,7 +58,7 @@ const TopicMessages = (props: any) => {
4958
});
5059
setPartitionIdList(newPartitionIdList || []);
5160
});
52-
request(Api.getTopicMessagesList(hashData?.topicName, urlParams?.clusterId), { data: { ...params, ...defaultSorter }, method: 'POST' })
61+
request(Api.getTopicMessagesList(hashData?.topicName, urlParams?.clusterId), { data: { ...params, ...sorter }, method: 'POST' })
5362
.then((res: any) => {
5463
// setPagination({
5564
// current: res.pagination?.pageNo,
@@ -87,16 +96,23 @@ const TopicMessages = (props: any) => {
8796
history.push(`/cluster/${urlParams?.clusterId}/testing/consumer`);
8897
};
8998

90-
const onTableChange = (pagination: any, filters: any, sorter: any) => {
99+
const onTableChange = (pagination: any, filters: any, sorter: any, extra: any) => {
91100
setPagination(pagination);
101+
// 只有排序事件时,触发重新请求后端数据
102+
if(extra.action === 'sort') {
103+
setSorter({
104+
sortField: sorter.field || '',
105+
sortType: sorter.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : ''
106+
});
107+
}
92108
// const asc = sorter?.order && sorter?.order === 'ascend' ? true : false;
93109
// const sortColumn = sorter.field && toLine(sorter.field);
94110
// genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, asc, sortColumn, queryTerm: searchResult, ...allParams });
95111
};
96112

97113
useEffect(() => {
98114
props.positionType === 'Messages' && genData();
99-
}, [props, params]);
115+
}, [props, params, sorter]);
100116

101117
return (
102118
<>
@@ -119,6 +135,15 @@ const TopicMessages = (props: any) => {
119135
</div>
120136
<div className="messages-query">
121137
<Form form={form} layout="inline" onFinish={onFinish}>
138+
<Form.Item name="filterOffsetReset">
139+
<Select
140+
options={offsetResetList}
141+
size="small"
142+
style={{ width: '120px' }}
143+
className={'detail-table-select'}
144+
placeholder="请选择offset"
145+
/>
146+
</Form.Item>
122147
<Form.Item name="filterPartitionId">
123148
<Select
124149
options={partitionIdList}
@@ -158,7 +183,7 @@ const TopicMessages = (props: any) => {
158183
showQueryForm={false}
159184
tableProps={{
160185
showHeader: false,
161-
rowKey: 'path',
186+
rowKey: 'offset',
162187
loading: loading,
163188
columns: getTopicMessagesColmns(),
164189
dataSource: data,
@@ -169,6 +194,7 @@ const TopicMessages = (props: any) => {
169194
bordered: false,
170195
onChange: onTableChange,
171196
scroll: { x: 'max-content' },
197+
sortDirections: ['descend', 'ascend', 'default']
172198
},
173199
}}
174200
/>

km-console/packages/layout-clusters-fe/src/pages/TopicDetail/config.tsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ export const getTopicMessagesColmns = () => {
8585
title: 'Timestamp',
8686
dataIndex: 'timestampUnitMs',
8787
key: 'timestampUnitMs',
88-
render: (t: number) => (t ? moment(t).format(timeFormat) : '-'),
88+
sorter: true,
89+
render: (t: number) => (t ? moment(t).format(timeFormat) + '.' + moment(t).millisecond() : '-'),
8990
},
9091
{
9192
title: 'Key',

km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicStateController.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager;
44
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
55
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
6+
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
67
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
78
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
89
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;

0 commit comments

Comments
 (0)