Skip to content

Commit 5262ae8

Browse files
author
zengqiao
committed
采样调整
1 parent 7f25167 commit 5262ae8

File tree

3 files changed

+8
-7
lines changed

3 files changed

+8
-7
lines changed

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,13 @@ public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String to
142142
// 创建kafka-consumer
143143
kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords()));
144144

145+
kafkaConsumer.assign(endOffsetsMapResult.getData().keySet());
146+
for (Map.Entry<TopicPartition, Long> entry: endOffsetsMapResult.getData().entrySet()) {
147+
kafkaConsumer.seek(entry.getKey(), Math.max(0, entry.getValue() - dto.getMaxRecords()));
148+
}
149+
145150
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间
146151
while (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS <= dto.getPullTimeoutUnitMs() && voList.size() < dto.getMaxRecords()) {
147-
for (Map.Entry<TopicPartition, Long> entry: endOffsetsMapResult.getData().entrySet()) {
148-
kafkaConsumer.assign(Arrays.asList(entry.getKey()));
149-
kafkaConsumer.seek(entry.getKey(), Math.max(0, entry.getValue() - dto.getMaxRecords()));
150-
151152
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS));
152153
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
153154
if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) {
@@ -165,7 +166,6 @@ public Result<List<TopicRecordVO>> getTopicMessages(Long clusterPhyId, String to
165166
|| voList.size() > dto.getMaxRecords()) {
166167
break;
167168
}
168-
}
169169
}
170170

171171
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/record/RecordHeaderKS.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@
1010
public class RecordHeaderKS {
1111
private String key;
1212

13-
private byte[] value;
13+
private String value;
1414
}

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/TopicVOConverter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.apache.kafka.clients.consumer.ConsumerRecord;
1919
import org.apache.kafka.common.header.Header;
2020

21+
import java.nio.charset.StandardCharsets;
2122
import java.util.*;
2223

2324
public class TopicVOConverter {
@@ -51,7 +52,7 @@ public static TopicRecordVO convert2TopicRecordVO(String topicName, ConsumerReco
5152
vo.setValue(consumerRecord.value());
5253
vo.setHeaderList(new ArrayList<>());
5354
for (Header header : consumerRecord.headers().toArray()) {
54-
vo.getHeaderList().add(new RecordHeaderKS(header.key(), header.value()));
55+
vo.getHeaderList().add(new RecordHeaderKS(header.key(), new String(header.value(), StandardCharsets.UTF_8)));
5556
}
5657
return vo;
5758
}

0 commit comments

Comments
 (0)