Skip to content

Commit cee68aa

Browse files
author
Evan Hu
committed
客户端服务端消息通讯
1 parent 570ffd2 commit cee68aa

File tree

12 files changed

+107
-130
lines changed

12 files changed

+107
-130
lines changed

src/main/java/info/xiaomo/gengine/ai/telegram/MessageDispatcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ public void dispatchMessage(int delay, Telegraph sender, Telegraph receiver, int
549549
// float currentTime = GdxAI.getTimepiece().getTime();
550550
// GdxAI.getLogger().info(LOG_TAG, "Instant telegram dispatched at time: " +
551551
// currentTime + " by " + sender
552-
// + " for " + receiver + ". Message code is " + telegram);
552+
// + " for " + receiver + ". MsgPack code is " + telegram);
553553
// }
554554

555555
// Send the telegram to the recipient
@@ -566,7 +566,7 @@ public void dispatchMessage(int delay, Telegraph sender, Telegraph receiver, int
566566
if (!added) {
567567
POOL.put(telegram);
568568
LOGGER.debug("Delayed telegram from " + sender + " for " + receiver
569-
+ " rejected by the queue. Message code is " + msg);
569+
+ " rejected by the queue. MsgPack code is " + msg);
570570
}
571571
}
572572
}
@@ -602,7 +602,7 @@ public void update() {
602602
// if (debugEnabled) {
603603
// GdxAI.getLogger().info(LOG_TAG, "Queued telegram ready for dispatch: Sent to
604604
// " + telegram.receiver
605-
// + ". Message code is " + telegram.message);
605+
// + ". MsgPack code is " + telegram.message);
606606
// }
607607

608608
// Send the telegram to the recipient

src/main/java/info/xiaomo/gengine/network/INetworkConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public interface INetworkConsumer {
1313
* @param msg msg
1414
* @param channel channel
1515
*/
16-
void consume(Message msg, Channel channel);
16+
void consume(MsgPack msg, Channel channel);
1717

1818

1919
}

src/main/java/info/xiaomo/gengine/network/Message.java

Lines changed: 0 additions & 29 deletions
This file was deleted.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package info.xiaomo.gengine.network;
2+
3+
import lombok.Data;
4+
5+
@Data
6+
public class MsgPack {
7+
public static final byte HEAD_TCP = -128;
8+
9+
private final byte head;
10+
private final int msgId;
11+
private final byte[] bytes;
12+
private Object msg;
13+
14+
public MsgPack(byte head, int msgId, byte[] bytes) {
15+
this.head = head;
16+
this.msgId = msgId;
17+
this.bytes = bytes;
18+
}
19+
20+
public MsgPack(byte head, int msgId, byte[] bytes, Object msg) {
21+
this.head = head;
22+
this.msgId = msgId;
23+
this.bytes = bytes;
24+
this.msg = msg;
25+
}
26+
}

src/main/java/info/xiaomo/gengine/network/NetworkServiceImpl.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import io.netty.channel.*;
66
import io.netty.channel.nio.NioEventLoopGroup;
77
import io.netty.channel.socket.nio.NioServerSocketChannel;
8-
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
9-
import io.netty.handler.codec.LengthFieldPrepender;
108
import io.netty.handler.codec.http.HttpObjectAggregator;
119
import io.netty.handler.codec.http.HttpServerCodec;
1210
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
@@ -126,7 +124,7 @@ protected void initChannel(Channel ch) {
126124
// pip.addLast(new LengthFieldBasedFrameDecoder(maxLength, offset, lengthFieldLength, ignoreLength, lengthFieldLength));
127125
pip.addLast(new MessageDecoder(builder.getUpLimit()));
128126
// pip.addLast(new LengthFieldPrepender(4, true));
129-
pip.addLast(new MessageEncoder(builder.getDownLimit()));
127+
pip.addLast(new MessageEncoder());
130128
pip.addLast(new MessageExecutor(builder.getConsumer(), builder.getListener()));
131129
for (ChannelHandler handler : builder.getExtraHandlers()) {
132130
pip.addLast(handler);

src/main/java/info/xiaomo/gengine/network/client/Client.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.google.protobuf.AbstractMessage;
44
import com.google.protobuf.Descriptors;
5-
import info.xiaomo.gengine.network.Message;
5+
import info.xiaomo.gengine.network.MsgPack;
66
import info.xiaomo.gengine.network.handler.MessageDecoder;
77
import info.xiaomo.gengine.network.handler.MessageEncoder;
88
import io.netty.bootstrap.Bootstrap;
@@ -91,7 +91,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
9191
pip.addLast("Idle", new IdleStateHandler(builder.getMaxIdleTime(), 0, 0));
9292
}
9393
pip.addLast("NettyMessageDecoder", new MessageDecoder(builder.getUpLimit()));
94-
pip.addLast("NettyMessageEncoder", new MessageEncoder(builder.getDownLimit()));
94+
pip.addLast("NettyMessageEncoder", new MessageEncoder());
9595
pip.addLast("NettyMessageExecutor", new ClientMessageExecutor(
9696
builder.getConsumer(),
9797
builder.getEventListener(),
@@ -149,7 +149,7 @@ public boolean sendMsg(AbstractMessage message) {
149149
Channel channel = getChannel(Thread.currentThread().getId());
150150
if (channel != null && channel.isActive()) {
151151
int cmd = getMessageID(message);
152-
Message packet = new Message(Message.HEAD_TCP, cmd, message.toByteArray());
152+
MsgPack packet = new MsgPack(MsgPack.HEAD_TCP, cmd, message.toByteArray());
153153
channel.writeAndFlush(packet);
154154
return true;
155155
}

src/main/java/info/xiaomo/gengine/network/client/NetworkConsumerAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package info.xiaomo.gengine.network.client;
22

3+
import info.xiaomo.gengine.network.MsgPack;
34
import info.xiaomo.gengine.network.INetworkConsumer;
4-
import info.xiaomo.gengine.network.Message;
55
import io.netty.channel.Channel;
66

77
/**
@@ -13,7 +13,7 @@
1313
public class NetworkConsumerAdapter implements INetworkConsumer {
1414

1515
@Override
16-
public void consume(Message msg, Channel channel) {
16+
public void consume(MsgPack msg, Channel channel) {
1717
// Nothing to do
1818
}
1919
}
Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
package info.xiaomo.gengine.network.handler;
22

3-
import info.xiaomo.gengine.network.Message;
3+
import java.util.List;
4+
import info.xiaomo.gengine.network.MsgPack;
45
import io.netty.buffer.ByteBuf;
56
import io.netty.channel.ChannelHandlerContext;
67
import io.netty.handler.codec.ByteToMessageDecoder;
78
import lombok.extern.slf4j.Slf4j;
89

9-
import java.util.List;
10-
11-
/**
12-
* @author xiaomo
13-
*/
10+
/** @author xiaomo */
1411
@Slf4j
1512
public class MessageDecoder extends ByteToMessageDecoder {
1613

@@ -21,23 +18,23 @@ public MessageDecoder(int upLimit) {
2118
}
2219

2320
@Override
24-
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
21+
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
22+
throws Exception {
2523
if (in.readableBytes() < 7) {
2624
return;
2725
}
2826

2927
in.markReaderIndex();
3028
byte head = in.readByte();
3129
short length = in.readShort();
32-
if ((length <= 0) || (length > this.upLimit))
33-
throw new IllegalArgumentException();
30+
if ((length <= 0) || (length > this.upLimit)) throw new IllegalArgumentException();
3431
int cmd = in.readInt();
3532
if (in.readableBytes() < length - 4) {
3633
in.resetReaderIndex();
3734
return;
3835
}
3936
byte[] bytes = new byte[length - 4];
4037
in.readBytes(bytes);
41-
out.add(new Message(head, cmd, bytes));
38+
out.add(new MsgPack(head, cmd, bytes));
4239
}
4340
}
Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,30 @@
11
package info.xiaomo.gengine.network.handler;
22

3-
import info.xiaomo.gengine.network.Message;
3+
import info.xiaomo.gengine.network.MsgPack;
44
import io.netty.buffer.ByteBuf;
55
import io.netty.channel.ChannelHandlerContext;
66
import io.netty.handler.codec.MessageToByteEncoder;
77
import lombok.extern.slf4j.Slf4j;
88

99
/**
10-
* @author xiaomo
11-
* 我们的数据包(即一条游戏前后端通信的消息长度)可以定义如下:
12-
* 数据包 = 1字节标志位 + 2字节消息体长度 + 4字节协议号长度 + N消息体
13-
* 比如客户端请求登录的Protobuf协议如下:
10+
* @author xiaomo 我们的数据包(即一条游戏前后端通信的消息长度)可以定义如下: 数据包 = 1字节标志位 + 2字节消息体长度 + 4字节协议号长度 + N消息体
11+
* 比如客户端请求登录的Protobuf协议如下:
1412
*/
1513
@Slf4j
16-
public class MessageEncoder extends MessageToByteEncoder<Message> {
17-
18-
19-
private final int downLimit;
20-
21-
public MessageEncoder(int downLimit) {
22-
this.downLimit = downLimit;
23-
}
14+
public class MessageEncoder extends MessageToByteEncoder<MsgPack> {
2415

16+
public MessageEncoder() {}
2517

2618
@Override
27-
protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf buf) {
28-
if ((message.getBytes().length > this.downLimit) && (log.isWarnEnabled()))
29-
log.warn("packet size[" + message.getBytes().length + "] is over limit[" + this.downLimit + "]");
30-
31-
buf.writeByte(message.getHead());
32-
buf.writeShort(message.getBytes().length + 4);
33-
buf.writeInt(message.getMsgId());
34-
buf.writeBytes(message.getBytes());
19+
protected void encode(ChannelHandlerContext ctx, MsgPack msgPack, ByteBuf buf) {
20+
buf.writeByte(msgPack.getHead());
21+
buf.writeShort(msgPack.getBytes().length + 4);
22+
buf.writeInt(msgPack.getMsgId());
23+
buf.writeBytes(msgPack.getBytes());
3524
}
3625

3726
@Override
3827
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
3928
super.exceptionCaught(ctx, cause);
4029
}
41-
4230
}
Lines changed: 51 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,61 @@
11
package info.xiaomo.gengine.network.handler;
22

33
import com.google.protobuf.AbstractMessage;
4+
import com.google.protobuf.Message;
45
import java.lang.reflect.Method;
56
import info.xiaomo.gengine.network.INetworkConsumer;
67
import info.xiaomo.gengine.network.INetworkEventListener;
7-
import info.xiaomo.gengine.network.Message;
8+
import info.xiaomo.gengine.network.MsgPack;
89
import info.xiaomo.gengine.network.pool.MessageAndHandlerPool;
910
import info.xiaomo.gengine.utils.ClassUtil;
1011
import io.netty.channel.ChannelHandlerContext;
11-
import io.netty.channel.ChannelInboundHandlerAdapter;
12-
13-
/**
14-
* @author xiaomo
15-
*/
16-
public class MessageExecutor extends ChannelInboundHandlerAdapter {
17-
18-
private final INetworkConsumer consumer;
19-
20-
protected final INetworkEventListener listener;
21-
22-
23-
public MessageExecutor(INetworkConsumer consumer, INetworkEventListener listener) {
24-
this.consumer = consumer;
25-
this.listener = listener;
26-
}
27-
28-
29-
@Override
30-
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
31-
listener.onExceptionOccur(ctx, cause);
32-
}
33-
34-
@Override
35-
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
36-
if (msg instanceof Message) {
37-
Message packet = (Message) msg;
38-
System.out.println("\n<<<<<<<<<<<<收到服务端协议:" + packet.getMsgId() + "<<<<<<<<<<<<");
39-
40-
AbstractMessage clazz = MessageAndHandlerPool.messages.get(packet.getMsgId());
41-
42-
Method m = ClassUtil.findMethod(clazz.getClass(), "getDefaultInstance");
43-
if (m != null) {
44-
AbstractMessage message = (AbstractMessage) m.invoke(null);
45-
msg = message.newBuilderForType().mergeFrom(packet.getBytes()).build();
46-
consumer.consume((Message) msg, ctx.channel());
47-
}
48-
}
49-
}
50-
51-
@Override
52-
public void channelActive(ChannelHandlerContext ctx) {
53-
if (this.listener != null) {
54-
this.listener.onConnected(ctx);
55-
}
56-
}
57-
58-
@Override
59-
public void channelInactive(ChannelHandlerContext ctx) {
60-
if (this.listener != null) {
61-
this.listener.onDisconnected(ctx);
62-
}
63-
}
12+
import io.netty.channel.SimpleChannelInboundHandler;
13+
import lombok.extern.slf4j.Slf4j;
14+
15+
/** @author xiaomo */
16+
@Slf4j
17+
public class MessageExecutor extends SimpleChannelInboundHandler<MsgPack> {
18+
19+
private final INetworkConsumer consumer;
20+
21+
protected final INetworkEventListener listener;
22+
23+
public MessageExecutor(INetworkConsumer consumer, INetworkEventListener listener) {
24+
this.consumer = consumer;
25+
this.listener = listener;
26+
}
27+
28+
@Override
29+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
30+
listener.onExceptionOccur(ctx, cause);
31+
}
32+
33+
@Override
34+
public void channelRead0(ChannelHandlerContext ctx, MsgPack msgPack) throws Exception {
35+
AbstractMessage clazz = MessageAndHandlerPool.messages.get(msgPack.getMsgId());
36+
37+
Method m = ClassUtil.findMethod(clazz.getClass(), "getDefaultInstance");
38+
if (m != null) {
39+
AbstractMessage message = (AbstractMessage) m.invoke(null);
40+
Message msg = message.newBuilderForType().mergeFrom(msgPack.getBytes()).build();
41+
msgPack.setMsg(msg);
42+
consumer.consume(msgPack, ctx.channel());
43+
} else {
44+
log.error("找有找到消息体:{}", msgPack.getMsgId());
45+
}
46+
}
47+
48+
@Override
49+
public void channelActive(ChannelHandlerContext ctx) {
50+
if (this.listener != null) {
51+
this.listener.onConnected(ctx);
52+
}
53+
}
54+
55+
@Override
56+
public void channelInactive(ChannelHandlerContext ctx) {
57+
if (this.listener != null) {
58+
this.listener.onDisconnected(ctx);
59+
}
60+
}
6461
}

0 commit comments

Comments
 (0)