Skip to content

Commit 9a5f66d

Browse files
committed
调整kafkademo
1 parent d61f32c commit 9a5f66d

File tree

13 files changed

+205
-75
lines changed

13 files changed

+205
-75
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ logs
3939

4040

4141
/run/src/main/resources/*.yml
42+
/kafkademo/src/main/resources/*.yml
4243

4344
dependency-reduced-pom.xml
4445

kafkademo/pom.xml

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,23 @@
1212
<artifactId>kafkademo</artifactId>
1313

1414
<dependencies>
15+
<dependency>
16+
<groupId>org.projectlombok</groupId>
17+
<artifactId>lombok</artifactId>
18+
<version>1.18.30</version>
19+
</dependency>
20+
<dependency>
21+
<groupId>org.slf4j</groupId>
22+
<artifactId>slf4j-api</artifactId>
23+
<version>2.0.12</version>
24+
</dependency>
25+
<dependency>
26+
<groupId>ch.qos.logback</groupId>
27+
<artifactId>logback-classic</artifactId>
28+
<version>1.4.11</version>
29+
</dependency>
30+
31+
1532
<dependency>
1633
<groupId>org.wowtools.hppt</groupId>
1734
<artifactId>run</artifactId>
@@ -23,5 +40,57 @@
2340
<version>3.4.0</version>
2441
</dependency>
2542
</dependencies>
26-
43+
<build>
44+
<resources>
45+
<resource>
46+
<directory>src/main/resources</directory>
47+
</resource>
48+
</resources>
49+
<plugins>
50+
<plugin>
51+
<groupId>org.apache.maven.plugins</groupId>
52+
<artifactId>maven-compiler-plugin</artifactId>
53+
<configuration>
54+
<source>${java.version}</source>
55+
<target>${java.version}</target>
56+
<encoding>UTF-8</encoding>
57+
<compilerArgument>-Xlint:unchecked</compilerArgument>
58+
</configuration>
59+
</plugin>
60+
<plugin>
61+
<groupId>org.apache.maven.plugins</groupId>
62+
<artifactId>maven-shade-plugin</artifactId>
63+
<executions>
64+
<execution>
65+
<phase>package</phase>
66+
<goals>
67+
<goal>shade</goal>
68+
</goals>
69+
<configuration>
70+
<finalName>kafkademo</finalName>
71+
<filters>
72+
<filter>
73+
<artifact>*:*</artifact>
74+
<excludes>
75+
<exclude>*.yml</exclude>
76+
<exclude>META-INF/*.SF</exclude>
77+
<exclude>META-INF/*.DSA</exclude>
78+
<exclude>META-INF/*.RSA</exclude>
79+
</excludes>
80+
</filter>
81+
</filters>
82+
<transformers>
83+
<transformer
84+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
85+
<mainClass>org.wowtools.hppt.kafkademo.ServerDemo</mainClass>
86+
</transformer>
87+
<transformer
88+
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
89+
</transformers>
90+
</configuration>
91+
</execution>
92+
</executions>
93+
</plugin>
94+
</plugins>
95+
</build>
2796
</project>

kafkademo/src/main/java/org/wowtools/hppt/kafkademo/ClientDemo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ public static void main(String[] args) throws Exception{
4545
cfg.clientUser = "user1";
4646
cfg.clientPassword = "12345";
4747
ScConfig.Forward forward = new ScConfig.Forward();
48-
forward.localPort = 10022;
49-
forward.remoteHost = "wsl";
48+
forward.localPort = 22022;
49+
forward.remoteHost = "127.0.0.1";
5050
forward.remotePort = 22;
5151
cfg.forwards = new ArrayList<>();
5252
cfg.forwards.add(forward);
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.wowtools.hppt.kafkademo;
2+
3+
import java.util.Map;
4+
5+
/**
6+
* @author liuyu
7+
* @date 2025/7/7
8+
*/
9+
public class Config {
10+
//客户端发数据的topic
11+
public String clientSendTopic = "client-send-01";
12+
//服务端发数据的topic
13+
public String serverSendTopic = "server-send-01";
14+
15+
public Map<String,Object> properties;
16+
}

kafkademo/src/main/java/org/wowtools/hppt/kafkademo/KafkaUtil.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package org.wowtools.hppt.kafkademo;
22

3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
6+
import lombok.extern.slf4j.Slf4j;
37
import org.apache.kafka.clients.consumer.ConsumerRecord;
48
import org.apache.kafka.clients.consumer.ConsumerRecords;
59
import org.apache.kafka.clients.consumer.KafkaConsumer;
610
import org.apache.kafka.clients.producer.KafkaProducer;
711
import org.apache.kafka.clients.producer.Producer;
812
import org.apache.kafka.clients.producer.ProducerRecord;
13+
import org.wowtools.hppt.common.util.ResourcesReader;
914

1015
import java.time.Duration;
1116
import java.util.Collections;
@@ -17,7 +22,19 @@
1722
* @author liuyu
1823
* @date 2024/6/15
1924
*/
25+
@Slf4j
2026
public class KafkaUtil {
27+
private static final Config config;
28+
29+
static {
30+
try {
31+
config = new ObjectMapper(new YAMLFactory())
32+
.readValue(ResourcesReader.readStr(Config.class, "config.yml"), Config.class);
33+
} catch (JsonProcessingException e) {
34+
throw new RuntimeException(e);
35+
}
36+
}
37+
2138

2239
//客户端发数据的topic
2340
public static final String ClientSendTopic = "client-send";
@@ -28,11 +45,7 @@ public class KafkaUtil {
2845
//基本的kafka连接配置
2946
private static Properties buildProperties() {
3047
Properties props = new Properties();
31-
props.put("bootstrap.servers", "wsl:9092"); // 部署在电脑C上的Kafka服务器地址
32-
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
33-
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
34-
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
35-
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
48+
props.putAll(config.properties);
3649
return props;
3750
}
3851

@@ -64,14 +77,23 @@ public static BytesFunction buildProducer(String topic) {
6477
*/
6578
public static void buildConsumer(String groupId, String topic, BytesFunction cb) {
6679
Properties props = buildProperties();
67-
props.put("group.id", groupId+System.currentTimeMillis());
80+
props.put("group.id", groupId);
81+
props.put("auto.offset.reset", "latest");
6882
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
6983
// 订阅主题
7084
consumer.subscribe(Collections.singletonList(topic));
85+
{
86+
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
87+
for (ConsumerRecord<String, byte[]> record : records) {
88+
log.info("测试消费 {}", record.toString());
89+
byte[] value = record.value();
90+
cb.f(value);
91+
}
92+
}
7193
// 消费消息
72-
Thread.startVirtualThread(()->{
94+
Thread.startVirtualThread(() -> {
7395
while (true) {
74-
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(10));
96+
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
7597
for (ConsumerRecord<String, byte[]> record : records) {
7698
byte[] value = record.value();
7799
cb.f(value);
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#调试配置文件,请保证服务端和客户端配置一致,否则可能引起异常
2+
3+
4+
# netty内存泄露检查级别 0 DISABLED 1 SIMPLE 2 ADVANCED 3 PARANOID
5+
NettyResourceLeakDetectorLevel = 3
6+
# 是否开启消息流水号 1为开启 用于调试消息后发先至等问题,非调试时流水号为空
7+
OpenSerialNumber = 0
8+
9+
# 是否开启缓冲池监控 1为开启 用于调试各个生产者、消费者的缓冲池使用情况
10+
OpenBufferPoolDetector = 0
11+
# 缓冲池高水位线,缓冲池中元素个数超过此值且继续向其中添加要素则触发日志
12+
BufferPoolWaterline = 20

kafkademo/src/main/resources/log4j2.xml

Lines changed: 0 additions & 28 deletions
This file was deleted.
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<configuration>
2+
<!-- Define a pattern for log output -->
3+
<property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n" />
4+
5+
<!-- Define the log file name, default to 'application' -->
6+
<property name="LOG_FILE_NAME" value="${logFileName:-application}" />
7+
8+
<!-- Define the rolling policy for daily log files -->
9+
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
10+
<file>logs/${LOG_FILE_NAME}.log</file>
11+
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
12+
<!-- Daily rollover -->
13+
<fileNamePattern>logs/${LOG_FILE_NAME}.%d{yyyy-MM-dd}.log</fileNamePattern>
14+
<!-- Keep 30 days' worth of history -->
15+
<maxHistory>30</maxHistory>
16+
</rollingPolicy>
17+
<encoder>
18+
<pattern>${LOG_PATTERN}</pattern>
19+
</encoder>
20+
</appender>
21+
22+
<!-- Console appender configuration -->
23+
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
24+
<encoder>
25+
<pattern>${LOG_PATTERN}</pattern>
26+
</encoder>
27+
</appender>
28+
29+
<!-- Root logger configuration -->
30+
<root level="INFO">
31+
<appender-ref ref="FILE" />
32+
<appender-ref ref="CONSOLE" />
33+
</root>
34+
35+
<!-- Package-specific logger configuration -->
36+
<logger name="org.eclipse.jetty" level="WARN" />
37+
</configuration>

run/pom.xml

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -102,33 +102,7 @@
102102
</execution>
103103
</executions>
104104
</plugin>
105-
<!-- 打native包 -->
106-
<plugin>
107-
<groupId>org.graalvm.buildtools</groupId>
108-
<artifactId>native-maven-plugin</artifactId>
109-
<version>${native.maven.plugin.version}</version>
110-
<configuration>
111-
<buildArgs>
112-
<arg>-H:+AddAllCharsets</arg>
113-
</buildArgs>
114-
<mainClass>org.wowtools.hppt.run.Run</mainClass>
115-
<imageName>hppt</imageName>
116-
</configuration>
117-
</plugin>
118105
</plugins>
119106
</build>
120107

121-
<pluginRepositories>
122-
<pluginRepository>
123-
<id>graalvm-native-build-tools-snapshots</id>
124-
<name>GraalVM native-build-tools Snapshots</name>
125-
<url>https://raw.githubusercontent.com/graalvm/native-build-tools/snapshots</url>
126-
<releases>
127-
<enabled>false</enabled>
128-
</releases>
129-
<snapshots>
130-
<enabled>true</enabled>
131-
</snapshots>
132-
</pluginRepository>
133-
</pluginRepositories>
134108
</project>

run/src/main/java/org/wowtools/hppt/common/client/ClientSessionManager.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,23 @@ protected void initChannel(SocketChannel ch) {
6262
});
6363
}
6464

65-
public boolean bindPort(int port) {
65+
/**
66+
* 绑定端口, 成功返回true
67+
*
68+
* @param localHost 本机绑定哪个ip,多网卡有冲突时填写,一般填null即可
69+
* @param port 端口
70+
* @return 绑定成功返回true
71+
*/
72+
public boolean bindPort(String localHost, int port) {
6673
synchronized (channels) {
6774
try {
68-
Channel channel = serverBootstrap.bind(port).sync().channel();
69-
channel.closeFuture();
75+
Channel channel;
76+
if (null == localHost) {
77+
channel = serverBootstrap.bind(port).sync().channel();
78+
} else {
79+
channel = serverBootstrap.bind(localHost,port).sync().channel();
80+
}
81+
channel.newSucceededFuture().sync();
7082
channels.add(channel);
7183
log.debug("bindPort {} success", port);
7284
return true;

0 commit comments

Comments
 (0)