Skip to content

Commit d0202f3

Browse files
authored
Merge pull request #332 from tronprotocol/p2p_retry
mdf message queue support retry
2 parents 754af3d + 00dcaf0 commit d0202f3

File tree

5 files changed

+42
-28
lines changed

5 files changed

+42
-28
lines changed

src/main/java/org/tron/common/overlay/server/ChannelManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class ChannelManager {
5151

5252
private Args args = Args.getInstance();
5353

54-
private int maxActivePeers = args.getNodeMaxActiveNodes() > 0 ? args.getNodeMaxActiveNodes() : 3000;;
54+
private int maxActivePeers = args.getNodeMaxActiveNodes() > 0 ? args.getNodeMaxActiveNodes() : 30;
5555

5656
private PeerServer peerServer;
5757

src/main/java/org/tron/common/overlay/server/MessageQueue.java

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,15 @@
1919

2020
import io.netty.channel.ChannelFutureListener;
2121
import io.netty.channel.ChannelHandlerContext;
22-
import java.util.Queue;
23-
import java.util.concurrent.ConcurrentLinkedQueue;
24-
import java.util.concurrent.Executors;
25-
import java.util.concurrent.ScheduledExecutorService;
26-
import java.util.concurrent.ScheduledFuture;
27-
import java.util.concurrent.ThreadFactory;
28-
import java.util.concurrent.TimeUnit;
29-
import java.util.concurrent.atomic.AtomicInteger;
3022
import org.slf4j.Logger;
3123
import org.slf4j.LoggerFactory;
3224
import org.springframework.context.annotation.Scope;
3325
import org.springframework.stereotype.Component;
34-
import org.tron.common.overlay.message.DisconnectMessage;
35-
import org.tron.common.overlay.message.Message;
36-
import org.tron.common.overlay.message.PingMessage;
37-
import org.tron.common.overlay.message.ReasonCode;
38-
import org.tron.common.overlay.message.StaticMessages;
26+
import org.tron.common.overlay.message.*;
27+
28+
import java.util.Queue;
29+
import java.util.concurrent.*;
30+
import java.util.concurrent.atomic.AtomicInteger;
3931

4032
/**
4133
* This class contains the logic for sending messages in a queue
@@ -121,6 +113,8 @@ private void disconnect(DisconnectMessage msg) {
121113

122114
public void receivedMessage(Message msg) throws InterruptedException {
123115

116+
logger.info("rcv from peer[{}], size:{} data:{}", ctx.channel().remoteAddress(), msg.getSendData().readableBytes(), msg.toString());
117+
124118
if (requestQueue.peek() != null) {
125119
MessageRoundtrip messageRoundtrip = requestQueue.peek();
126120
Message waitingMessage = messageRoundtrip.getMsg();
@@ -135,7 +129,7 @@ public void receivedMessage(Message msg) throws InterruptedException {
135129
}
136130

137131
private void removeAnsweredMessage(MessageRoundtrip messageRoundtrip) {
138-
if (messageRoundtrip != null && messageRoundtrip.isAnswered())
132+
if (messageRoundtrip != null && messageRoundtrip.isAnswered())
139133
requestQueue.remove();
140134
}
141135

@@ -149,22 +143,32 @@ private void nudgeQueue() {
149143

150144
private void sendToWire(MessageRoundtrip messageRoundtrip) {
151145

152-
if (messageRoundtrip != null && messageRoundtrip.getRetryTimes() == 0) {
153-
// TODO: retry logic || messageRoundtrip.hasToRetry()){
146+
if (messageRoundtrip == null){
147+
return;
148+
}
149+
150+
if (messageRoundtrip.getRetryTimes() > 0 && !messageRoundtrip.hasToRetry()){
151+
return;
152+
}
154153

155-
Message msg = messageRoundtrip.getMsg();
154+
if (messageRoundtrip.getRetryTimes() > 0){
155+
logger.info("send msg timeout. close channel {}.", ctx.channel().remoteAddress());
156+
ctx.close();
157+
return;
158+
}
156159

157-
//TODO#p2p#peerDel : let node know
158-
logger.info(msg.toString());
160+
Message msg = messageRoundtrip.getMsg();
159161

160-
ctx.writeAndFlush(msg.getSendData())
161-
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
162+
ctx.writeAndFlush(msg.getSendData())
163+
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
162164

163-
if (msg.getAnswerMessage() != null) {
164-
messageRoundtrip.incRetryTimes();
165-
messageRoundtrip.saveTime();
166-
}
165+
if (msg.getAnswerMessage() != null) {
166+
messageRoundtrip.incRetryTimes();
167+
messageRoundtrip.saveTime();
167168
}
169+
170+
logger.info("send to peer[{}] retry[{}], length:{} data:{}", ctx.channel().remoteAddress(),
171+
messageRoundtrip.getRetryTimes(), msg.getSendData().readableBytes(), msg.toString());
168172
}
169173

170174
public void close() {

src/main/java/org/tron/common/overlay/server/MessageRoundtrip.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ public void saveTime() {
5959
lastTimestamp = System.currentTimeMillis();
6060
}
6161

62+
public long getTime() {
63+
return lastTimestamp;
64+
}
65+
6266
public boolean hasToRetry() {
6367
return 20000 < System.currentTimeMillis() - lastTimestamp;
6468
}

src/main/java/org/tron/core/net/message/SyncBlockChainMessage.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package org.tron.core.net.message;
22

3-
import java.util.List;
43
import org.tron.core.capsule.BlockCapsule.BlockId;
54
import org.tron.protos.Protocol.BlockInventory.Type;
65

6+
import java.util.List;
7+
78
public class SyncBlockChainMessage extends BlockInventoryMessage {
89

910
public SyncBlockChainMessage(byte[] packed) {
@@ -20,4 +21,9 @@ public SyncBlockChainMessage(List<BlockId> blockIds) {
2021
public MessageTypes getType() {
2122
return MessageTypes.fromByte(this.type);
2223
}
24+
25+
@Override
26+
public Class<?> getAnswerMessage() {
27+
return ChainInventoryMessage.class;
28+
}
2329
}

src/main/resources/config.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ node {
2828
maxActiveNodes = 30
2929

3030
p2p {
31-
version = 1 # 1: testnet; 101: debug
31+
version = 31 # 1: testnet; 101: debug
3232
}
3333

3434
}

0 commit comments

Comments
 (0)