Skip to content

Commit 220f1c6

Browse files
author
zengqiao
committed
采样优化
1 parent 7a950c6 commit 220f1c6

File tree

1 file changed

+22
-5
lines changed

1 file changed

+22
-5
lines changed

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,12 @@ public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String to
129129
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
130130
}
131131

132-
// 获取分区offset
132+
// 获取分区beginOffset
133+
Result<Map<TopicPartition, Long>> beginOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.earliest(), null);
134+
if (beginOffsetsMapResult.failed()) {
135+
return Result.buildFromIgnoreData(beginOffsetsMapResult);
136+
}
137+
// 获取分区endOffset
133138
Result<Map<TopicPartition, Long>> endOffsetsMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, dto.getFilterPartitionId(), OffsetSpec.latest(), null);
134139
if (endOffsetsMapResult.failed()) {
135140
return Result.buildFromIgnoreData(endOffsetsMapResult);
@@ -142,13 +147,25 @@ public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String to
142147
// 创建kafka-consumer
143148
kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords()));
144149

145-
kafkaConsumer.assign(endOffsetsMapResult.getData().keySet());
146-
for (Map.Entry<TopicPartition, Long> entry: endOffsetsMapResult.getData().entrySet()) {
147-
kafkaConsumer.seek(entry.getKey(), Math.max(0, entry.getValue() - dto.getMaxRecords()));
150+
List<TopicPartition> partitionList = new ArrayList<>();
151+
long maxMessage = 0;
152+
for (Map.Entry<TopicPartition, Long> entry : endOffsetsMapResult.getData().entrySet()) {
153+
long begin = beginOffsetsMapResult.getData().get(entry.getKey());
154+
long end = entry.getValue();
155+
if (begin == end){
156+
continue;
157+
}
158+
maxMessage += end - begin;
159+
partitionList.add(entry.getKey());
160+
}
161+
maxMessage = Math.min(maxMessage, dto.getMaxRecords());
162+
kafkaConsumer.assign(partitionList);
163+
for (TopicPartition partition : partitionList) {
164+
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
148165
}
149166

150167
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间
151-
while (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS <= dto.getPullTimeoutUnitMs() && voList.size() < dto.getMaxRecords()) {
168+
while (System.currentTimeMillis() - startTime <= dto.getPullTimeoutUnitMs() && voList.size() < maxMessage) {
152169
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS));
153170
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
154171
if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) {

0 commit comments

Comments
 (0)