Skip to content

Commit 228ae9f

Browse files
passerbyYSQ姚世权
andauthored
重构 IM 部分的代码设计 (#18)
* 优化代码设计 * 重构IM部分的代码 * fix compile * fix * fix-2 * fix-3 * 升版 --------- Co-authored-by: 姚世权 <yaoshiquan@zwcad.com>
1 parent 9ec1b33 commit 228ae9f

37 files changed

+1070
-847
lines changed

pom.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,17 @@
1111
</parent>
1212
<groupId>top.ysqorz</groupId>
1313
<artifactId>forum</artifactId>
14-
<version>1.1.0</version>
14+
<version>1.1.1</version>
1515
<!-- 打包方式为war包 -->
1616
<packaging>jar</packaging>
1717
<name>forum</name>
1818
<description>旨在打造一个功能齐全的论坛。It is not a small project!</description>
1919

2020
<properties>
21+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
22+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
23+
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
24+
<argLine>-Dfile.encoding=UTF-8</argLine>
2125
<java.version>1.8</java.version>
2226
<netty.version>4.1.50.Final</netty.version>
2327
<!-- 版本过高,连接时会出现两次报错,然后才连接成功 -->

src/main/java/top/ysqorz/forum/controller/IMController.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package top.ysqorz.forum.controller;
22

3-
import org.springframework.stereotype.Component;
43
import org.springframework.stereotype.Controller;
54
import org.springframework.validation.annotation.Validated;
65
import org.springframework.web.bind.annotation.*;
@@ -9,9 +8,8 @@
98
import top.ysqorz.forum.common.StatusCode;
109
import top.ysqorz.forum.im.entity.MsgModel;
1110
import top.ysqorz.forum.im.entity.MsgType;
12-
import top.ysqorz.forum.im.handler.MsgCenter;
11+
import top.ysqorz.forum.im.handler.MsgCenterImpl;
1312
import top.ysqorz.forum.service.IMService;
14-
import top.ysqorz.forum.shiro.ShiroUtils;
1513
import top.ysqorz.forum.utils.JsonUtils;
1614

1715
import javax.annotation.Resource;
@@ -44,7 +42,7 @@ public StatusCode sendMsg(@NotBlank String msgJson, @NotBlank String channelId)
4442
if (MsgType.isFunctionalType(MsgType.valueOf(msg.getMsgType()))) { // 如果非法type会抛出异常
4543
return StatusCode.NOT_SUPPORT_FUNC_TYPE;
4644
}
47-
MsgCenter.getInstance().remoteDispatch(msg, channelId, ShiroUtils.getToken());
45+
imService.handleMsg(msg, channelId);
4846
return StatusCode.SUCCESS;
4947
}
5048

@@ -53,7 +51,7 @@ public StatusCode sendMsg(@NotBlank String msgJson, @NotBlank String channelId)
5351
*/
5452
@PostMapping("/push")
5553
public StatusCode pushMsg(@RequestBody MsgModel msg, String channelId) { // source channel
56-
MsgCenter.getInstance().push(msg, channelId);
54+
MsgCenterImpl.getInstance().push(msg, channelId);
5755
return StatusCode.SUCCESS;
5856
}
5957

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package top.ysqorz.forum.im;
2+
3+
import top.ysqorz.forum.im.entity.ChannelMap;
4+
import top.ysqorz.forum.im.handler.BindEventCallback;
5+
import top.ysqorz.forum.im.handler.BindMsgHandler;
6+
7+
/**
8+
* @author passerbyYSQ
9+
* @create 2025-02-01 22:24
10+
*/
11+
public interface MsgCenter extends MsgOperator, BindEventCallback {
12+
/**
13+
* 添加消息处理器
14+
*/
15+
MsgCenter addHandler(MsgHandler<?> handler);
16+
17+
/**
18+
* 获取对应类型的所有通道
19+
*/
20+
ChannelMap getChannelMap(String channelType);
21+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package top.ysqorz.forum.im;
2+
3+
import io.netty.channel.Channel;
4+
import top.ysqorz.forum.im.entity.ChannelMap;
5+
import top.ysqorz.forum.im.entity.ChannelType;
6+
import top.ysqorz.forum.im.entity.MsgModel;
7+
import top.ysqorz.forum.im.entity.MsgType;
8+
9+
import java.util.concurrent.Executor;
10+
11+
12+
/**
13+
* @author passerbyYSQ
14+
* @create 2025-02-01 22:32
15+
*/
16+
public interface MsgHandler<DataType> extends MsgOperator {
17+
/**
18+
* 当前处理器是否支持处理此消息
19+
*/
20+
boolean support(MsgModel msg, Channel channel);
21+
22+
/**
23+
* 消息的POJO类
24+
*/
25+
Class<DataType> getDataClass();
26+
27+
/**
28+
* 当前消息处理器支持的消息类型
29+
*/
30+
MsgType getMsgType();
31+
32+
/**
33+
* 当前消息处理器支持的通道类型
34+
*/
35+
ChannelType getChannelType();
36+
37+
/**
38+
* 设置异步线程池
39+
*/
40+
void setExecutor(Executor executor);
41+
42+
void setChannelMap(ChannelMap channelMap);
43+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package top.ysqorz.forum.im;
2+
3+
import java.util.Iterator;
4+
5+
/**
6+
* ...
7+
*
8+
* @author yaoshiquan
9+
* @date 2025/1/27
10+
*/
11+
public interface MsgHandlerPipeline extends Iterable<MsgHandler<?>>, Iterator<MsgHandler<?>> {
12+
default MsgHandlerPipeline addHandler(MsgHandler<?> handler) {
13+
return addHandlerAtTail(handler);
14+
}
15+
16+
MsgHandlerPipeline addHandlerAtHead(MsgHandler<?> handler);
17+
18+
MsgHandlerPipeline addHandlerAtTail(MsgHandler<?> handler);
19+
20+
MsgHandlerPipeline addHandlerAtIndex(int index, MsgHandler<?> handler);
21+
22+
MsgHandler<?> getHeadHandler();
23+
24+
MsgHandler<?> getTailHandler();
25+
26+
MsgHandler<?> getHandler(int index);
27+
28+
void removeHandler(MsgHandler<?> handler);
29+
30+
void removeHandler(int index);
31+
32+
boolean exist(MsgHandler<?> handler);
33+
34+
boolean isEmpty();
35+
36+
int size();
37+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package top.ysqorz.forum.im;
2+
3+
import io.netty.channel.Channel;
4+
import top.ysqorz.forum.im.entity.MsgModel;
5+
6+
/**
7+
* @author passerbyYSQ
8+
* @create 2025-02-01 22:54
9+
*/
10+
public interface MsgOperator {
11+
/**
12+
* 处理消息
13+
*/
14+
boolean handle(MsgModel msg, Channel channel);
15+
16+
/**
17+
* 在当前服务节点,推送消息到客户端
18+
*/
19+
boolean push(MsgModel msg, String channelId);
20+
}

src/main/java/top/ysqorz/forum/im/entity/AsyncInsertTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
* @create 2022-01-16 1:03
88
*/
99
public class AsyncInsertTask<T> implements Runnable { // T:PO的类型
10-
private BaseMapper<T> mapper;
11-
private T data;
12-
private int retryCount;
10+
private final BaseMapper<T> mapper;
11+
private final T data;
12+
private final int retryCount;
1313

1414
public AsyncInsertTask(BaseMapper<T> mapper, T data) {
1515
this(mapper, data, 3);

src/main/java/top/ysqorz/forum/im/entity/ChannelMap.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.util.HashSet;
99
import java.util.Map;
10+
import java.util.Objects;
1011
import java.util.Set;
1112
import java.util.concurrent.ConcurrentHashMap;
1213
import java.util.concurrent.atomic.AtomicInteger;
@@ -33,10 +34,10 @@ public void bind(String token, String groupId, Channel channel) {
3334
channel.attr(IMUtils.GROUP_ID_KEY).set(groupId);
3435
channel.attr(IMUtils.CHANNEL_TYPE_KEY).set(channelType.name());
3536
// 双重检测锁
36-
Set<Channel> channels = channelMap.get(groupId);
37-
if (channels == null) {
37+
Set<Channel> channels;
38+
if (Objects.isNull(channels = channelMap.get(groupId))) {
3839
synchronized (this) {
39-
if (channels == null) {
40+
if (Objects.isNull(channels = channelMap.get(groupId))) {
4041
channels = new HashSet<>();
4142
channelMap.put(groupId, channels);
4243
}
@@ -76,6 +77,9 @@ public void pushToGroup(MsgType msgType, Object data, String sourceChannelId, St
7677

7778
public boolean isBound(Channel channel) {
7879
String token = IMUtils.getTokenFromChannel(channel);
80+
if (channel instanceof FakeChannel && Objects.nonNull(token)) {
81+
return true;
82+
}
7983
String channelType = IMUtils.getChannelTypeFromChannel(channel);
8084
String groupId = IMUtils.getGroupIdFromChannel(channel);
8185
if (token == null || channelType == null || groupId == null) {

0 commit comments

Comments
 (0)