1313import java .util .Queue ;
1414import java .util .Set ;
1515import java .util .concurrent .ConcurrentHashMap ;
16+ import java .util .concurrent .Executors ;
1617import java .util .concurrent .LinkedBlockingQueue ;
18+ import java .util .concurrent .ScheduledExecutorService ;
19+ import java .util .concurrent .TimeUnit ;
1720import javafx .util .Pair ;
1821import lombok .extern .slf4j .Slf4j ;
1922import org .springframework .beans .factory .annotation .Autowired ;
2023import org .springframework .stereotype .Component ;
2124import org .tron .common .overlay .discover .NodeHandler ;
2225import org .tron .common .overlay .message .Message ;
26+ import org .tron .common .overlay .message .ReasonCode ;
2327import org .tron .common .overlay .server .Channel .TronState ;
24- import org .tron .common .overlay .server .ChannelManager ;
2528import org .tron .common .overlay .server .SyncPool ;
2629import org .tron .common .utils .ExecutorLoop ;
2730import org .tron .common .utils .Sha256Hash ;
2831import org .tron .common .utils .Time ;
2932import org .tron .core .capsule .BlockCapsule ;
3033import org .tron .core .capsule .BlockCapsule .BlockId ;
34+ import org .tron .core .config .Parameter .BlockConstant ;
35+ import org .tron .core .config .Parameter .NetConstants ;
3136import org .tron .core .config .Parameter .NodeConstant ;
3237import org .tron .core .exception .BadBlockException ;
3338import org .tron .core .exception .BadTransactionException ;
@@ -56,9 +61,6 @@ public class NodeImpl extends PeerConnectionDelegate implements Node {
5661 @ Autowired
5762 private SyncPool pool ;
5863
59- @ Autowired
60- private ChannelManager channelManager ;
61-
6264 class InvToSend {
6365
6466 private HashMap <PeerConnection , HashMap <InventoryType , LinkedList <Sha256Hash >>> send
@@ -112,6 +114,8 @@ public void sendFetch() {
112114
113115 private volatile boolean isHandleSyncBlockActive ;
114116
117+ private ScheduledExecutorService disconnectInactiveExecutor = Executors .newSingleThreadScheduledExecutor ();
118+
115119 //broadcast
116120 private ConcurrentHashMap <Sha256Hash , InventoryType > advObjToSpread = new ConcurrentHashMap <>();
117121
@@ -225,6 +229,7 @@ public void close() throws InterruptedException {
225229 advertiseLoopThread .join ();
226230 advObjFetchLoopThread .join ();
227231 handleSyncBlockLoop .join ();
232+ disconnectInactiveExecutor .shutdown ();
228233 }
229234
230235 @ Override
@@ -379,8 +384,46 @@ private void activeTronPump() {
379384 advertiseLoopThread .start ();
380385 advObjFetchLoopThread .start ();
381386 handleSyncBlockLoop .start ();
387+
388+ //terminate inactive loop
389+ disconnectInactiveExecutor .scheduleWithFixedDelay (() -> {
390+ disconnectInactive ();
391+ }, 30000 , BlockConstant .BLOCK_INTERVAL / 2 , TimeUnit .MILLISECONDS );
392+
393+ }
394+
395+ private void disconnectInactive () {
396+ getActivePeer ().forEach (peer -> {
397+
398+ final boolean [] isDisconnected = {false };
399+
400+ peer .getAdvObjWeRequested ().values ().stream ()
401+ .filter (time -> time < Time .getCurrentMillis () - NetConstants .ADV_TIME_OUT )
402+ .findFirst ().ifPresent (time -> isDisconnected [0 ] = true );
403+
404+ if (!isDisconnected [0 ]) {
405+ peer .getSyncBlockRequested ().values ().stream ()
406+ .filter (time -> time < Time .getCurrentMillis () - NetConstants .SYNC_TIME_OUT )
407+ .findFirst ().ifPresent (time -> isDisconnected [0 ] = true );
408+ }
409+
410+ //TODO:optimize here
411+ if (!isDisconnected [0 ]) {
412+ if (del .getHeadBlockId ().getNum () - peer .getHeadBlockWeBothHave ().getNum () > NetConstants .HEAD_NUM_MAX_DELTA
413+ && peer .getConnectTime () < Time .getCurrentMillis () - NetConstants .HEAD_NUM_CHECK_TIME ) {
414+ isDisconnected [0 ] = true ;
415+ }
416+ }
417+
418+
419+ if (isDisconnected [0 ]) {
420+ disconnectPeer (peer , ReasonCode .RESET );
421+ }
422+ });
382423 }
383424
425+
426+
384427 private void onHandleInventoryMessage (PeerConnection peer , InventoryMessage msg ) {
385428 //logger.info("on handle advertise inventory message");
386429 peer .cleanInvGarbage ();
@@ -613,7 +656,7 @@ private void onHandleFetchDataMessage(PeerConnection peer, FetchInvDataMessage f
613656 }
614657
615658 private void banTraitorPeer (PeerConnection peer ) {
616- onDisconnectPeer (peer );
659+ disconnectPeer (peer , ReasonCode . BAD_PROTOCOL ); //TODO: ban it
617660 }
618661
619662 private void onHandleChainInventoryMessage (PeerConnection peer , ChainInventoryMessage msg ) {
@@ -848,7 +891,7 @@ private void syncNextBatchChainIds(PeerConnection peer) {
848891 peer .sendMessage (new SyncBlockChainMessage ((LinkedList <BlockId >) chainSummary ));
849892 } catch (Exception e ) { //TODO: use tron excpetion here
850893 logger .debug (e .getMessage (), e );
851- onDisconnectPeer (peer );
894+ disconnectPeer (peer , ReasonCode . BAD_PROTOCOL ); //TODO: unlink?
852895 }
853896
854897 }
@@ -858,6 +901,7 @@ public void onConnectPeer(PeerConnection peer) {
858901 //TODO:when use new p2p framework, remove this
859902 logger .info ("start sync with::" + peer );
860903 peer .setTronState (TronState .START_TO_SYNC );
904+ peer .setConnectTime (Time .getCurrentMillis ());
861905 startSyncWithPeer (peer );
862906// if (mapPeer.containsKey(peer.getAddress())) {
863907// return;
@@ -873,7 +917,11 @@ public void onConnectPeer(PeerConnection peer) {
873917 @ Override
874918 public void onDisconnectPeer (PeerConnection peer ) {
875919 //TODO:when use new p2p framework, remove this
876- //mapPeer.remove(peer.getAddress());
920+ //peer.disconnect(reason);
921+ }
922+
923+ private void disconnectPeer (PeerConnection peer , ReasonCode reason ) {
924+ peer .disconnect (reason );
877925 }
878926}
879927
0 commit comments