Skip to content

Commit 1b926ed

Browse files
author
Evan Hu
committed
编码解码器
1 parent f6fff7a commit 1b926ed

File tree

9 files changed

+47
-187
lines changed

9 files changed

+47
-187
lines changed

src/main/java/info/xiaomo/gengine/network/MsgPack.java

Lines changed: 0 additions & 21 deletions
This file was deleted.

src/main/java/info/xiaomo/gengine/network/NetworkServiceImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,11 @@ static class SocketHandler extends ChannelInitializer<Channel> {
171171
@Override
172172
protected void initChannel(Channel ch) {
173173
ChannelPipeline pip = ch.pipeline();
174-
// pip.addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
174+
int maxLength = 1048576;
175+
int lengthFieldLength = 4;
176+
int ignoreLength = 0;
177+
int offset = 0;
178+
pip.addLast(new LengthFieldBasedFrameDecoder(maxLength, offset, lengthFieldLength, ignoreLength, lengthFieldLength));
175179
pip.addLast(new DefaultProtobufDecoder(builder.getMessagePool()));
176180
pip.addLast(new LengthFieldPrepender(4));
177181
pip.addLast(new DefaultProtobufEncoder(builder.getMessagePool()));

src/main/java/info/xiaomo/gengine/network/client/Client.java

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
package info.xiaomo.gengine.network.client;
22

33
import com.google.protobuf.AbstractMessage;
4-
import com.google.protobuf.Descriptors;
4+
import com.google.protobuf.Message;
55
import java.io.IOException;
66
import java.util.List;
77
import java.util.Map;
88
import java.util.concurrent.*;
9-
import info.xiaomo.gengine.network.MsgPack;
109
import info.xiaomo.gengine.network.client.listener.ChannelConnectListener;
1110
import info.xiaomo.gengine.network.client.listener.ChannelDisconnectedListener;
12-
import info.xiaomo.gengine.network.handler.MessageDecoder;
13-
import info.xiaomo.gengine.network.handler.MessageEncoder;
11+
import info.xiaomo.gengine.network.handler.DefaultProtobufDecoder;
12+
import info.xiaomo.gengine.network.handler.DefaultProtobufEncoder;
1413
import io.netty.bootstrap.Bootstrap;
1514
import io.netty.channel.*;
1615
import io.netty.channel.nio.NioEventLoopGroup;
1716
import io.netty.channel.socket.SocketChannel;
1817
import io.netty.channel.socket.nio.NioSocketChannel;
18+
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
19+
import io.netty.handler.codec.LengthFieldPrepender;
1920
import io.netty.handler.timeout.IdleStateHandler;
2021
import io.netty.util.AttributeKey;
2122
import lombok.Data;
@@ -31,20 +32,32 @@
3132
public class Client {
3233

3334
protected static final Logger LOGGER = LoggerFactory.getLogger(Client.class);
35+
3436
public static ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
37+
3538
public final AttributeKey<Integer> ATTR_INDEX = AttributeKey.valueOf("INDEX");
39+
3640
private final Object seq_lock = new Object();
41+
3742
protected ClientBuilder builder;
43+
3844
protected Channel channel;
45+
3946
protected Bootstrap bootstrap;
47+
4048
protected EventLoopGroup group;
41-
private Map<Short, ClientFuture<MsgPack>> futureMap = new ConcurrentHashMap<>();
49+
50+
private Map<Short, ClientFuture<Message>> futureMap = new ConcurrentHashMap<>();
51+
4252
protected boolean stopped = false;
53+
4354
/** 是否已经连接(调用connect)方法 */
4455
protected boolean connected = false;
4556

4657
private boolean needReconnect;
58+
4759
private short sequence = 0;
60+
4861
/** 重连延迟 */
4962
private int reconnectDelay = 2;
5063

@@ -71,15 +84,16 @@ public Client(final ClientBuilder builder) {
7184
bootstrap.handler(
7285
new ChannelInitializer<SocketChannel>() {
7386
@Override
74-
protected void initChannel(SocketChannel ch) throws Exception {
87+
protected void initChannel(SocketChannel ch) {
7588
ChannelPipeline pip = ch.pipeline();
7689
if (idleCheck) {
7790
pip.addLast(
7891
"Idle", new IdleStateHandler(builder.getMaxIdleTime(), 0, 0));
7992
}
80-
pip.addLast(
81-
"NettyMessageDecoder", new MessageDecoder(builder.getUpLimit()));
82-
pip.addLast("NettyMessageEncoder", new MessageEncoder());
93+
pip.addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
94+
pip.addLast(new DefaultProtobufDecoder(builder.getMsgPool()));
95+
pip.addLast(new LengthFieldPrepender(4));
96+
pip.addLast(new DefaultProtobufEncoder(builder.getMsgPool()));
8397
pip.addLast(
8498
"NettyMessageExecutor",
8599
new ClientMessageExecutor(
@@ -109,18 +123,6 @@ protected void initChannel(SocketChannel ch) throws Exception {
109123
}
110124
}
111125

112-
public static int getMessageID(com.google.protobuf.Message msg) {
113-
for (Map.Entry<Descriptors.FieldDescriptor, Object> fieldDescriptorObjectEntry :
114-
msg.getAllFields().entrySet()) {
115-
if (fieldDescriptorObjectEntry.getKey().getName().equals("msgId")) {
116-
return ((Descriptors.EnumValueDescriptor) fieldDescriptorObjectEntry.getValue())
117-
.getNumber();
118-
}
119-
}
120-
LOGGER.error("【{}】中未设置msgId, 内容:{}", msg.getClass().getSimpleName(), msg);
121-
return 0;
122-
}
123-
124126
/**
125127
* 发送消息列表
126128
*
@@ -134,7 +136,6 @@ public boolean sendMsg(List<AbstractMessage> list) {
134136
for (AbstractMessage message : list) {
135137
channel.writeAndFlush(message);
136138
}
137-
138139
return true;
139140
}
140141
} catch (Exception e) {
@@ -149,15 +150,13 @@ public boolean sendMsg(List<AbstractMessage> list) {
149150
* @param message
150151
* @return
151152
*/
152-
public boolean sendMsg(AbstractMessage message) {
153+
public void sendMsg(Message message) {
153154
Channel channel = getChannel(Thread.currentThread().getId());
154155
if (channel != null && channel.isActive()) {
155-
int msgId = getMessageID(message);
156-
MsgPack packet = new MsgPack(MsgPack.HEAD_TCP, msgId, message.toByteArray());
157-
channel.writeAndFlush(packet);
158-
return true;
156+
channel.writeAndFlush(message);
157+
} else {
158+
LOGGER.error("channel为空或者处于未激活状态");
159159
}
160-
return false;
161160
}
162161

163162
/**

src/main/java/info/xiaomo/gengine/network/client/ClientMessageExecutor.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,20 @@
11
package info.xiaomo.gengine.network.client;
22

3+
import com.google.protobuf.Message;
34
import java.util.Map;
45
import info.xiaomo.gengine.network.IMessagePool;
56
import info.xiaomo.gengine.network.INetworkConsumer;
67
import info.xiaomo.gengine.network.INetworkEventListener;
7-
import info.xiaomo.gengine.network.MsgPack;
88
import info.xiaomo.gengine.network.handler.MessageExecutor;
99
import io.netty.channel.ChannelHandlerContext;
1010
import io.netty.handler.timeout.IdleState;
1111
import io.netty.handler.timeout.IdleStateEvent;
1212
import lombok.extern.slf4j.Slf4j;
13-
import org.slf4j.Logger;
14-
import org.slf4j.LoggerFactory;
1513

1614
@Slf4j
1715
public class ClientMessageExecutor extends MessageExecutor {
1816

19-
protected Map<Short, ClientFuture<MsgPack>> futureMap;
17+
protected Map<Short, ClientFuture<Message>> futureMap;
2018

2119
protected IMessagePool pool;
2220

@@ -26,25 +24,24 @@ public ClientMessageExecutor(
2624
INetworkConsumer consumer,
2725
INetworkEventListener listener,
2826
IMessagePool pool,
29-
Map<Short, ClientFuture<MsgPack>> futureMap,
27+
Map<Short, ClientFuture<Message>> futureMap,
3028
boolean idleCheck) {
3129
super(consumer, listener, pool);
3230
this.futureMap = futureMap;
3331
this.idleCheck = idleCheck;
3432
}
3533

3634
@Override
37-
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
38-
39-
MsgPack m = (MsgPack) msg;
40-
ClientFuture<MsgPack> f = futureMap.get(m.getSequence());
41-
if (f != null) {
42-
if (!f.isCancelled()) {
43-
f.result(m);
44-
}
45-
} else {
46-
super.channelRead(ctx, msg);
47-
}
35+
public void channelRead0(ChannelHandlerContext ctx, Message msg) {
36+
37+
// ClientFuture<MsgPack> f = futureMap.get(m.getSequence());
38+
// if (f != null) {
39+
// if (!f.isCancelled()) {
40+
// f.result(m);
41+
// }
42+
// } else {
43+
// super.channelRead(ctx, msg);
44+
// }
4845
}
4946

5047
@Override

src/main/java/info/xiaomo/gengine/network/handler/DefaultProtobufEncoder.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ public DefaultProtobufEncoder(IMessagePool pool) {
1818

1919
@Override
2020
protected void encode(
21-
ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf)
22-
throws Exception {
21+
ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) {
2322
int msgId = pool.getMessageId(message);
2423
if (msgId == 0) {
2524
log.error("消息未注册:{}", message.getClass().getSimpleName());

src/main/java/info/xiaomo/gengine/network/handler/MessageDecoder.java

Lines changed: 0 additions & 40 deletions
This file was deleted.

src/main/java/info/xiaomo/gengine/network/handler/MessageEncoder.java

Lines changed: 0 additions & 31 deletions
This file was deleted.

src/main/java/info/xiaomo/gengine/network/handler/MessageExecutor.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
package info.xiaomo.gengine.network.handler;
22

3-
import com.google.protobuf.AbstractMessage;
43
import com.google.protobuf.Message;
5-
import java.lang.reflect.Method;
64
import info.xiaomo.gengine.network.IMessagePool;
75
import info.xiaomo.gengine.network.INetworkConsumer;
86
import info.xiaomo.gengine.network.INetworkEventListener;
9-
import info.xiaomo.gengine.utils.ClassUtil;
107
import io.netty.channel.ChannelHandlerContext;
118
import io.netty.channel.SimpleChannelInboundHandler;
129
import lombok.extern.slf4j.Slf4j;
@@ -32,7 +29,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
3229
}
3330

3431
@Override
35-
public void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {
32+
public void channelRead0(ChannelHandlerContext ctx, Message message) {
3633
int msgId = pool.getMessageId(message);
3734

3835
if (msgId == 0) {

src/main/java/info/xiaomo/gengine/utils/ClassUtil.java

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import java.util.*;
1313
import java.util.jar.JarEntry;
1414
import java.util.jar.JarFile;
15-
import info.xiaomo.gengine.network.MsgPack;
1615

1716
/**
1817
* class加载工具类
@@ -270,49 +269,6 @@ private static List<Class<?>> getClasses(File dir, String pk) throws ClassNotFou
270269
return classes;
271270
}
272271

273-
/**
274-
* 迭代组装协议
275-
*
276-
* @param packageName
277-
* @param clazz
278-
* @param delimiter
279-
* @return
280-
* @throws ClassNotFoundException
281-
*/
282-
public static Map<Integer, Class<?>> getClasses(
283-
String packageName, Class<?> clazz, String delimiter) throws ClassNotFoundException {
284-
Map<Integer, Class<?>> map = new HashMap<>();
285-
String path = packageName.replace('.', '/');
286-
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
287-
URL url = classloader.getResource(path);
288-
for (Class<?> c : getClasses(new File(url.getFile()), packageName)) {
289-
if (MsgPack.class.isAssignableFrom(c) && !MsgPack.class.equals(c)) {
290-
if (c.getSimpleName().contains(delimiter)) {
291-
int protocol =
292-
Integer.parseInt(
293-
c.getSimpleName()
294-
.substring(
295-
c.getSimpleName().indexOf(delimiter)
296-
+ delimiter.length()));
297-
map.put(protocol, c);
298-
}
299-
}
300-
}
301-
return map;
302-
}
303-
304-
public static Method findProtobufMsg(Class clazz) {
305-
return findMethod(clazz,"getDefaultInstance");
306-
}
307-
308-
public static Method findMethod(Class clazz, String methodName) {
309-
Method[] methods = clazz.getDeclaredMethods();
310-
for (Method method : methods) {
311-
if (method.getName().equals(methodName)) return method;
312-
}
313-
return null;
314-
}
315-
316272
/**********************************************************************
317273
* 加载时判断指定包
318274
* @param filePath

0 commit comments

Comments
 (0)