Skip to content

Commit 86701c9

Browse files
committed
一些插件化改造代码
1 parent 9a5f66d commit 86701c9

File tree

19 files changed

+453
-12
lines changed

19 files changed

+453
-12
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ logs
3939

4040

4141
/run/src/main/resources/*.yml
42+
/run/src/main/resources/addons/
4243
/kafkademo/src/main/resources/*.yml
4344

4445
dependency-reduced-pom.xml

addons-kafka/pom.xml

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>org.wowtools.hppt</groupId>
8+
<artifactId>hppt</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
</parent>
11+
12+
<artifactId>addons-kafka</artifactId>
13+
14+
<properties>
15+
<maven.compiler.source>21</maven.compiler.source>
16+
<maven.compiler.target>21</maven.compiler.target>
17+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
18+
</properties>
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.projectlombok</groupId>
22+
<artifactId>lombok</artifactId>
23+
<version>1.18.30</version>
24+
<scope>compile</scope>
25+
</dependency>
26+
<dependency>
27+
<groupId>org.slf4j</groupId>
28+
<artifactId>slf4j-api</artifactId>
29+
<version>2.0.12</version>
30+
<scope>compile</scope>
31+
</dependency>
32+
<dependency>
33+
<groupId>ch.qos.logback</groupId>
34+
<artifactId>logback-classic</artifactId>
35+
<version>1.4.11</version>
36+
<scope>compile</scope>
37+
</dependency>
38+
39+
40+
<dependency>
41+
<groupId>org.wowtools.hppt</groupId>
42+
<artifactId>run</artifactId>
43+
<version>1.0-SNAPSHOT</version>
44+
<scope>compile</scope>
45+
</dependency>
46+
<dependency>
47+
<groupId>org.apache.kafka</groupId>
48+
<artifactId>kafka-clients</artifactId>
49+
<version>3.4.0</version>
50+
</dependency>
51+
</dependencies>
52+
<build>
53+
<resources>
54+
<resource>
55+
<directory>src/main/resources</directory>
56+
</resource>
57+
</resources>
58+
<plugins>
59+
<plugin>
60+
<groupId>org.apache.maven.plugins</groupId>
61+
<artifactId>maven-compiler-plugin</artifactId>
62+
<configuration>
63+
<source>${java.version}</source>
64+
<target>${java.version}</target>
65+
<encoding>UTF-8</encoding>
66+
<compilerArgument>-Xlint:unchecked</compilerArgument>
67+
</configuration>
68+
</plugin>
69+
<plugin>
70+
<groupId>org.apache.maven.plugins</groupId>
71+
<artifactId>maven-shade-plugin</artifactId>
72+
<executions>
73+
<execution>
74+
<phase>package</phase>
75+
<goals>
76+
<goal>shade</goal>
77+
</goals>
78+
<configuration>
79+
<finalName>addons-kafka</finalName>
80+
<filters>
81+
<filter>
82+
<artifact>*:*</artifact>
83+
<excludes>
84+
<exclude>*.yml</exclude>
85+
<exclude>addons/*.*</exclude>
86+
<exclude>META-INF/*.SF</exclude>
87+
<exclude>META-INF/*.DSA</exclude>
88+
<exclude>META-INF/*.RSA</exclude>
89+
</excludes>
90+
</filter>
91+
</filters>
92+
<transformers>
93+
<transformer
94+
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
95+
</transformers>
96+
</configuration>
97+
</execution>
98+
</executions>
99+
</plugin>
100+
</plugins>
101+
</build>
102+
</project>
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package org.wowtools.hppt.addons.kafka;
2+
3+
import java.util.Map;
4+
5+
/**
6+
* @author liuyu
7+
* @date 2025/7/7
8+
*/
9+
public class Config {
10+
//客户端发数据的topic
11+
public String clientSendTopic = "client-send-01";
12+
//服务端发数据的topic
13+
public String serverSendTopic = "server-send-01";
14+
15+
public String tag;
16+
17+
public Map<String,Object> properties;
18+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package org.wowtools.hppt.addons.kafka;
2+
3+
import org.wowtools.hppt.run.sc.common.ClientSessionService;
4+
import org.wowtools.hppt.run.sc.pojo.ScConfig;
5+
6+
import java.util.ArrayList;
7+
8+
/**
9+
* 客户端,部署在电脑A上
10+
* @author liuyu
11+
* @date 2024/6/15
12+
*/
13+
public class KafkaClientSessionService extends ClientSessionService {
14+
//TODO 传输文件等大字节数传播的情况下,需处理kafka字节顺序消费问题
15+
public KafkaClientSessionService(ScConfig config) throws Exception {
16+
super(config);
17+
}
18+
19+
private KafkaUtil.BytesFunction sendToServer;
20+
private KafkaUtil.BytesFunction clientConsumer;
21+
@Override
22+
public void connectToServer(ScConfig config, Cb cb) throws Exception {
23+
//初始化时构造好向kafka生产和消费数据的工具
24+
sendToServer = KafkaUtil.buildProducer(KafkaUtil.config.clientSendTopic);
25+
26+
clientConsumer = (bytes) -> {
27+
//消费到客户端的数据,调用receiveServerBytes方法来接收
28+
try {
29+
receiveServerBytes(bytes);
30+
} catch (Exception e) {
31+
throw new RuntimeException(e);
32+
}
33+
};
34+
KafkaUtil.buildConsumer("client", KafkaUtil.config.serverSendTopic, clientConsumer);
35+
cb.end(null);//调用end方法,通知框架连接完成
36+
}
37+
38+
@Override
39+
public void sendBytesToServer(byte[] bytes) {
40+
sendToServer.f(bytes);
41+
}
42+
43+
public static void main(String[] args) throws Exception{
44+
ScConfig cfg = new ScConfig();
45+
cfg.clientUser = "user1";
46+
cfg.clientPassword = "12345";
47+
ScConfig.Forward forward = new ScConfig.Forward();
48+
forward.localPort = 22022;
49+
forward.remoteHost = "127.0.0.1";
50+
forward.remotePort = 22;
51+
cfg.forwards = new ArrayList<>();
52+
cfg.forwards.add(forward);
53+
new KafkaClientSessionService(cfg).sync();
54+
}
55+
56+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package org.wowtools.hppt.addons.kafka;
2+
3+
/**
4+
* @author liuyu
5+
* @date 2024/6/15
6+
*/
7+
public class KafkaCtx {
8+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package org.wowtools.hppt.addons.kafka;
2+
3+
import org.wowtools.hppt.run.ss.common.ServerSessionService;
4+
import org.wowtools.hppt.run.ss.pojo.SsConfig;
5+
6+
import java.util.ArrayList;
7+
8+
/**
9+
* 服务端,部署在电脑B上
10+
*
11+
* @author liuyu
12+
* @date 2024/6/15
13+
*/
14+
public class KafkaServerSessionService extends ServerSessionService<KafkaCtx> {
15+
//TODO 传输文件等大字节数传播的情况下,需处理kafka字节顺序消费问题
16+
/*
17+
* 注:Server类的泛型CTX用以识别客户端的唯一性,所以如果需要支持多个客户端同时访问,考虑从KafkaCtx上下手改造
18+
* 这里简单演示单个客户端的情况
19+
* */
20+
private final KafkaCtx singleCtx = new KafkaCtx();
21+
22+
public KafkaServerSessionService(SsConfig ssConfig) {
23+
super(ssConfig);
24+
}
25+
26+
27+
private KafkaUtil.BytesFunction sendToClient;
28+
private KafkaUtil.BytesFunction clientConsumer;
29+
30+
@Override
31+
protected void init(SsConfig ssConfig) throws Exception {
32+
//初始化时构造好向kafka生产和消费数据的工具
33+
sendToClient = KafkaUtil.buildProducer(KafkaUtil.config.serverSendTopic);
34+
35+
clientConsumer = (bytes) -> {
36+
//消费到客户端的数据,调用receiveClientBytes方法来接收
37+
receiveClientBytes(singleCtx, bytes);
38+
};
39+
KafkaUtil.buildConsumer("server", KafkaUtil.config.clientSendTopic, clientConsumer);
40+
}
41+
42+
@Override
43+
protected void sendBytesToClient(KafkaCtx kafkaCtx, byte[] bytes) {
44+
sendToClient.f(bytes);
45+
}
46+
47+
@Override
48+
protected void closeCtx(KafkaCtx kafkaCtx) throws Exception {
49+
//单个客户端的话这里没什么需要做的,多个的话可能要释放KafkaCtx里的相关资源
50+
}
51+
52+
@Override
53+
protected void onExit() throws Exception {
54+
//TODO 关闭kafka生产者和消费者
55+
}
56+
57+
public static void main(String[] args) throws Exception{
58+
SsConfig cfg = new SsConfig();
59+
SsConfig.Client client = new SsConfig.Client();
60+
client.user = "user1";
61+
client.password = "12345";
62+
cfg.clients = new ArrayList<>(1);
63+
cfg.clients.add(client);
64+
KafkaServerSessionService server = new KafkaServerSessionService(cfg);
65+
server.syncStart(cfg);
66+
}
67+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package org.wowtools.hppt.addons.kafka;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.apache.kafka.clients.consumer.ConsumerRecord;
8+
import org.apache.kafka.clients.consumer.ConsumerRecords;
9+
import org.apache.kafka.clients.consumer.KafkaConsumer;
10+
import org.apache.kafka.clients.producer.KafkaProducer;
11+
import org.apache.kafka.clients.producer.Producer;
12+
import org.apache.kafka.clients.producer.ProducerRecord;
13+
import org.wowtools.hppt.common.util.ResourcesReader;
14+
15+
import java.time.Duration;
16+
import java.util.Collections;
17+
import java.util.Properties;
18+
19+
/**
20+
* kafka工具类
21+
*
22+
* @author liuyu
23+
* @date 2024/6/15
24+
*/
25+
@Slf4j
26+
public class KafkaUtil {
27+
public static final Config config;
28+
29+
static {
30+
try {
31+
config = new ObjectMapper(new YAMLFactory())
32+
.readValue(ResourcesReader.readStr(Config.class, "config-kafka.yml"), Config.class);
33+
} catch (JsonProcessingException e) {
34+
throw new RuntimeException(e);
35+
}
36+
}
37+
38+
39+
//基本的kafka连接配置
40+
private static Properties buildProperties() {
41+
Properties props = new Properties();
42+
props.putAll(config.properties);
43+
return props;
44+
}
45+
46+
@FunctionalInterface
47+
public interface BytesFunction {
48+
void f(byte[] bytes);
49+
}
50+
51+
/**
52+
* 构造一个向指定topic发送bytes数据的工具
53+
*
54+
* @param topic 主题
55+
* @return BytesFunction 调用其f(byte[] bytes)方法发送数据
56+
*/
57+
public static BytesFunction buildProducer(String topic) {
58+
Producer<String, byte[]> producer = new KafkaProducer<>(buildProperties());
59+
return (bytes -> {
60+
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, bytes);
61+
producer.send(record);
62+
});
63+
}
64+
65+
/**
66+
* 消费kafka数据
67+
*
68+
* @param groupId 消费者组
69+
* @param topic 主题
70+
* @param cb 消费到字节时回调
71+
*/
72+
public static void buildConsumer(String groupId, String topic, BytesFunction cb) {
73+
Properties props = buildProperties();
74+
props.put("group.id", groupId + "-" + config.tag);
75+
props.put("auto.offset.reset", "latest");
76+
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
77+
// 订阅主题
78+
consumer.subscribe(Collections.singletonList(topic));
79+
{
80+
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
81+
for (ConsumerRecord<String, byte[]> record : records) {
82+
log.info("测试消费 {}", record.toString());
83+
byte[] value = record.value();
84+
cb.f(value);
85+
}
86+
}
87+
// 消费消息
88+
Thread.startVirtualThread(() -> {
89+
while (true) {
90+
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
91+
for (ConsumerRecord<String, byte[]> record : records) {
92+
byte[] value = record.value();
93+
cb.f(value);
94+
}
95+
}
96+
});
97+
}
98+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
clientSendTopic: "client-send-02"
2+
serverSendTopic: "server-send-02"
3+
tag: tt
4+
properties:
5+
#kafka配置
6+
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer"
7+
"value.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer"
8+
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
9+
"value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
10+
"bootstrap.servers": "wsl:9092"

kafkademo/src/main/java/org/wowtools/hppt/kafkademo/ClientDemo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public ClientDemo(ScConfig config) throws Exception {
2121
@Override
2222
public void connectToServer(ScConfig config, Cb cb) throws Exception {
2323
//初始化时构造好向kafka生产和消费数据的工具
24-
sendToServer = KafkaUtil.buildProducer(KafkaUtil.ClientSendTopic);
24+
sendToServer = KafkaUtil.buildProducer(KafkaUtil.config.clientSendTopic);
2525

2626
clientConsumer = (bytes) -> {
2727
//消费到客户端的数据,调用receiveServerBytes方法来接收
@@ -31,7 +31,7 @@ public void connectToServer(ScConfig config, Cb cb) throws Exception {
3131
throw new RuntimeException(e);
3232
}
3333
};
34-
KafkaUtil.buildConsumer("client", KafkaUtil.ServerSendTopic, clientConsumer);
34+
KafkaUtil.buildConsumer("client", KafkaUtil.config.serverSendTopic, clientConsumer);
3535
cb.end(null);//调用end方法,通知框架连接完成
3636
}
3737

0 commit comments

Comments
 (0)