Skip to content

Commit efe61c6

Browse files
committed
kafka发的消息固定到分区0以避免顺序错误
1 parent 86701c9 commit efe61c6

File tree

2 files changed

+18
-3
lines changed

2 files changed

+18
-3
lines changed

addons-kafka/src/main/java/org/wowtools/hppt/addons/kafka/KafkaUtil.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@
1010
import org.apache.kafka.clients.producer.KafkaProducer;
1111
import org.apache.kafka.clients.producer.Producer;
1212
import org.apache.kafka.clients.producer.ProducerRecord;
13+
import org.apache.kafka.common.TopicPartition;
1314
import org.wowtools.hppt.common.util.ResourcesReader;
1415

1516
import java.time.Duration;
17+
import java.util.Arrays;
1618
import java.util.Collections;
19+
import java.util.List;
1720
import java.util.Properties;
1821

1922
/**
@@ -57,7 +60,7 @@ public interface BytesFunction {
5760
public static BytesFunction buildProducer(String topic) {
5861
Producer<String, byte[]> producer = new KafkaProducer<>(buildProperties());
5962
return (bytes -> {
60-
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, bytes);
63+
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic,0,"x", bytes);
6164
producer.send(record);
6265
});
6366
}
@@ -75,7 +78,12 @@ public static void buildConsumer(String groupId, String topic, BytesFunction cb)
7578
props.put("auto.offset.reset", "latest");
7679
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
7780
// 订阅主题
78-
consumer.subscribe(Collections.singletonList(topic));
81+
TopicPartition partition = new TopicPartition(topic, 0);
82+
try {
83+
consumer.seekToEnd(List.of(partition));
84+
} catch (java.lang.IllegalStateException e) {
85+
consumer.assign(List.of(partition));
86+
}
7987
{
8088
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
8189
for (ConsumerRecord<String, byte[]> record : records) {

run/src/main/java/org/wowtools/hppt/run/sc/common/PortReceiver.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,14 @@ public PortReceiver(ScConfig config, ClientSessionService clientSessionService)
104104
@Override
105105
public void receiveServerBytes(byte[] bytes) throws Exception {
106106
if (noLogin) {
107-
bytes = GridAesCipherUtil.decrypt(bytes);
107+
try {
108+
bytes = GridAesCipherUtil.decrypt(bytes);
109+
} catch (Exception e) {
110+
if (log.isDebugEnabled()) {
111+
log.debug("无效的字节,舍弃, bytes {}", new String(bytes, StandardCharsets.UTF_8), e);
112+
}
113+
return;
114+
}
108115
String s = new String(bytes, StandardCharsets.UTF_8);
109116
String[] cmd = s.split(" ", 2);
110117
switch (cmd[0]) {

0 commit comments

Comments
 (0)