Skip to content

Commit 710754d

Browse files
authored
Merge pull request #493 from tronprotocol/sync_cache
sync: add send cache & solve synchro information inconsistency
2 parents 34a75eb + eb9e9cd commit 710754d

File tree

3 files changed

+43
-20
lines changed

3 files changed

+43
-20
lines changed

src/main/java/org/tron/common/overlay/server/P2pHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
9595

9696
@Override
9797
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
98-
logger.info("exception caught, {} {}", ctx.channel().remoteAddress(), cause.getMessage());
98+
logger.error("exception caught, {}", ctx.channel().remoteAddress(), cause);
9999
ctx.close();
100+
closeChannel(ctx);
100101
}
101102

102103
public void closeChannel(ChannelHandlerContext ctx) {

src/main/java/org/tron/core/net/node/NodeImpl.java

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import static org.tron.core.config.Parameter.NodeConstant.MAX_BLOCKS_IN_PROCESS;
55
import static org.tron.core.config.Parameter.NodeConstant.MAX_BLOCKS_SYNC_FROM_ONE_PEER;
66

7+
import com.google.common.cache.Cache;
8+
import com.google.common.cache.CacheBuilder;
79
import com.google.common.collect.Iterables;
810
import io.netty.util.internal.ConcurrentSet;
911
import java.util.ArrayList;
@@ -70,6 +72,14 @@ public class NodeImpl extends PeerConnectionDelegate implements Node {
7072
@Lazy
7173
private SyncPool pool;
7274

75+
Cache<Sha256Hash, TransactionMessage> TrxCache = CacheBuilder.newBuilder()
76+
.maximumSize(10000).expireAfterWrite(60, TimeUnit.SECONDS)
77+
.recordStats().build();
78+
79+
Cache<Sha256Hash, BlockMessage> BlockCache = CacheBuilder.newBuilder()
80+
.maximumSize(10).expireAfterWrite(60, TimeUnit.SECONDS)
81+
.recordStats().build();
82+
7383
class InvToSend {
7484

7585
private HashMap<PeerConnection, HashMap<InventoryType, LinkedList<Sha256Hash>>> send
@@ -241,10 +251,10 @@ public void broadcast(Message msg) {
241251
if (msg instanceof BlockMessage) {
242252
logger.info("Ready to broadcast a block, Its hash is " + msg.getMessageId());
243253
freshBlockId.offer(((BlockMessage) msg).getBlockId());
244-
blockToAdvertise.add(((BlockMessage) msg).getBlockId());
254+
BlockCache.put(msg.getMessageId(), (BlockMessage) msg);
245255
type = InventoryType.BLOCK;
246256
} else if (msg instanceof TransactionMessage) {
247-
trxToAdvertise.add(msg.getMessageId());
257+
TrxCache.put(msg.getMessageId(), (TransactionMessage)msg);
248258
type = InventoryType.TRX;
249259
} else {
250260
return;
@@ -776,24 +786,36 @@ private void onHandleFetchDataMessage(PeerConnection peer, FetchInvDataMessage f
776786
MessageTypes type = fetchInvDataMsg.getInvMessageType();
777787

778788
//TODO:maybe can use message cache here
779-
final BlockCapsule[] blocks = {del.getGenesisBlock()};
789+
BlockCapsule block = null;
780790
//get data and send it one by one
781-
fetchInvDataMsg.getHashList()
782-
.forEach(hash -> {
783-
if (del.contain(hash, type)) {
784-
Message msg = del.getData(hash, type);
785-
if (type.equals(MessageTypes.BLOCK)) {
786-
blocks[0] = ((BlockMessage) msg).getBlockCapsule();
787-
}
788-
peer.sendMessage(msg);
789-
} else {
790-
peer.sendMessage(new ItemNotFound());
791-
}
792-
});
791+
for (Sha256Hash hash : fetchInvDataMsg.getHashList()){
792+
793+
Message msg;
794+
795+
if (type == MessageTypes.BLOCK){
796+
msg = BlockCache.getIfPresent(hash);
797+
}else {
798+
msg = TrxCache.getIfPresent(hash);
799+
}
800+
801+
if (msg == null){
802+
msg = del.getData(hash, type);
803+
}
804+
805+
if (msg != null) {
806+
if (type.equals(MessageTypes.BLOCK)) {
807+
block = ((BlockMessage) msg).getBlockCapsule();
808+
}
809+
peer.sendMessage(msg);
810+
} else {
811+
logger.error("fetch message {} {} failed.", type, hash);
812+
peer.sendMessage(new ItemNotFound());
813+
}
814+
}
793815

794-
if (blocks[0] != null) {
795-
peer.setHeadBlockWeBothHave(blocks[0].getBlockId());
796-
peer.setHeadBlockTimeWeBothHave(blocks[0].getTimeStamp());
816+
if (block != null) {
817+
peer.setHeadBlockWeBothHave(block.getBlockId());
818+
peer.setHeadBlockTimeWeBothHave(block.getTimeStamp());
797819
}
798820
}
799821

src/main/java/org/tron/core/net/peer/TronHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void channelRead0(final ChannelHandlerContext ctx, TronMessage msg) throw
6161

6262
@Override
6363
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
64-
logger.info("exception caught, {} {}", ctx.channel().remoteAddress(), cause.getMessage());
64+
logger.error("exception caught, {}", ctx.channel().remoteAddress(), cause);
6565
ctx.close();
6666
}
6767

0 commit comments

Comments
 (0)