Skip to content

Commit fa7ad64

Browse files
committed
Topic消息查询支持Timestamp排序,支持查询最新消息或最早消息 #534
1 parent 47065c8 commit fa7ad64

File tree

4 files changed

+56
-2
lines changed

4 files changed

+56
-2
lines changed

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
3737
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
3838
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems;
39+
import org.apache.commons.lang3.ObjectUtils;
3940
import org.apache.kafka.clients.admin.OffsetSpec;
4041
import org.apache.kafka.clients.consumer.ConsumerConfig;
4142
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -161,7 +162,11 @@ public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String to
161162
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
162163
kafkaConsumer.assign(partitionList);
163164
for (TopicPartition partition : partitionList) {
164-
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
165+
if (Constant.EARLIEST.equals(dto.getFilterOffsetReset())) {
166+
kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition));
167+
} else {
168+
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
169+
}
165170
}
166171

167172
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间
@@ -185,6 +190,15 @@ public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String to
185190
}
186191
}
187192

193+
// 排序
194+
if (ObjectUtils.isNotEmpty(voList)) {
195+
if (Constant.ASC.equals(dto.getSortType())) {
196+
voList.sort((o1, o2) -> (int) (o1.getTimestampUnitMs() - o2.getTimestampUnitMs()));
197+
} else if (Constant.DESC.equals(dto.getSortType())) {
198+
voList.sort((o1, o2) -> (int) (o2.getTimestampUnitMs() - o1.getTimestampUnitMs()));
199+
}
200+
}
201+
188202
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));
189203
} catch (Exception e) {
190204
log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhyId, topicName, dto, e);

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/topic/TopicRecordDTO.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,11 @@ public class TopicRecordDTO extends BaseDTO {
3434

3535
@ApiModelProperty(value = "预览超时时间", example = "10000")
3636
private Long pullTimeoutUnitMs = 8000L;
37+
38+
@ApiModelProperty(value = "排序", example = "desc")
39+
private String sortType;
40+
41+
@ApiModelProperty(value = "offset", example = "latest")
42+
private String filterOffsetReset;
43+
3744
}

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,17 @@ private Constant() {}
6464
public static final Float COLLECT_METRICS_ERROR_COST_TIME = -1.0F;
6565

6666
public static final Integer DEFAULT_RETRY_TIME = 3;
67+
68+
/**
69+
* 排序
70+
*/
71+
public static final String ASC = "asc";
72+
public static final String DESC = "desc";
73+
74+
/**
75+
* 消费策略
76+
*/
77+
public static final String LATEST = "latest";
78+
public static final String EARLIEST = "earliest";
79+
6780
}

km-console/packages/layout-clusters-fe/src/pages/TopicDetail/Messages.tsx

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const defaultParams: any = {
1010
maxRecords: 100,
1111
pullTimeoutUnitMs: 5000,
1212
// filterPartitionId: 1,
13+
filterOffsetReset: 'latest'
1314
};
1415
const defaultpaPagination = {
1516
current: 1,
@@ -29,6 +30,12 @@ const TopicMessages = (props: any) => {
2930
const [pagination, setPagination] = useState<any>(defaultpaPagination);
3031
const [form] = Form.useForm();
3132

33+
// 获取消息开始位置
34+
const offsetResetList = [
35+
{ 'label': 'latest', value: 'latest' },
36+
{ 'label': 'earliest', value: 'earliest' }
37+
];
38+
3239
// 默认排序
3340
const defaultSorter = {
3441
sortField: 'timestampUnitMs',
@@ -88,7 +95,10 @@ const TopicMessages = (props: any) => {
8895
};
8996

9097
const onTableChange = (pagination: any, filters: any, sorter: any) => {
98+
defaultSorter.sortField = sorter.field || '';
99+
defaultSorter.sortType = sorter.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : '';
91100
setPagination(pagination);
101+
genData();
92102
// const asc = sorter?.order && sorter?.order === 'ascend' ? true : false;
93103
// const sortColumn = sorter.field && toLine(sorter.field);
94104
// genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, asc, sortColumn, queryTerm: searchResult, ...allParams });
@@ -119,6 +129,15 @@ const TopicMessages = (props: any) => {
119129
</div>
120130
<div className="messages-query">
121131
<Form form={form} layout="inline" onFinish={onFinish}>
132+
<Form.Item name="filterOffsetReset">
133+
<Select
134+
options={offsetResetList}
135+
size="small"
136+
style={{ width: '120px' }}
137+
className={'detail-table-select'}
138+
placeholder="请选择offset"
139+
/>
140+
</Form.Item>
122141
<Form.Item name="filterPartitionId">
123142
<Select
124143
options={partitionIdList}
@@ -158,7 +177,7 @@ const TopicMessages = (props: any) => {
158177
showQueryForm={false}
159178
tableProps={{
160179
showHeader: false,
161-
rowKey: 'path',
180+
rowKey: 'offset',
162181
loading: loading,
163182
columns: getTopicMessagesColmns(),
164183
dataSource: data,
@@ -169,6 +188,7 @@ const TopicMessages = (props: any) => {
169188
bordered: false,
170189
onChange: onTableChange,
171190
scroll: { x: 'max-content' },
191+
sortDirections: ['descend', 'ascend', 'default']
172192
},
173193
}}
174194
/>

0 commit comments

Comments
 (0)