Skip to content

Commit 97d75f5

Browse files
committed
add sliding window to contorl water line
1 parent 02d752a commit 97d75f5

File tree

3 files changed

+126
-30
lines changed

3 files changed

+126
-30
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package org.tron.common.utils;
2+
3+
import java.util.Arrays;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
/**
7+
* Created by olivier on 2018/06/01
8+
*/
9+
class SlotBaseCounter {
10+
private int slotSize;
11+
private AtomicInteger[] slotCounter;
12+
13+
public SlotBaseCounter(int slotSize) {
14+
slotSize = slotSize < 1 ? 1 : slotSize;
15+
this.slotSize = slotSize;
16+
this.slotCounter = new AtomicInteger[slotSize];
17+
for (int i = 0; i < this.slotSize; i++) {
18+
slotCounter[i] = new AtomicInteger(0);
19+
}
20+
}
21+
22+
public void increaseSlot(int slotSize) {
23+
slotCounter[slotSize].incrementAndGet();
24+
}
25+
26+
public void wipeSlot(int slotSize) {
27+
slotCounter[slotSize].set(0);
28+
}
29+
30+
public int totalCount() {
31+
return Arrays.stream(slotCounter).mapToInt(slotCounter -> slotCounter.get()).sum();
32+
}
33+
34+
@Override
35+
public String toString() {
36+
return Arrays.toString(slotCounter);
37+
}
38+
}
39+
40+
/**
41+
* Created by olivier on 2018/06/01
42+
*/
43+
public class SlidingWindowCounter {
44+
private volatile SlotBaseCounter slotBaseCounter;
45+
private volatile int windowSize;
46+
private volatile int head;
47+
48+
public SlidingWindowCounter(int windowSize) {
49+
resizeWindow(windowSize);
50+
}
51+
52+
public synchronized void resizeWindow(int windowSize) {
53+
this.windowSize = windowSize;
54+
this.slotBaseCounter = new SlotBaseCounter(windowSize);
55+
this.head = 0;
56+
}
57+
58+
public void increase() {
59+
slotBaseCounter.increaseSlot(head);
60+
}
61+
62+
public int totalAndAdvance() {
63+
int total = totalCount();
64+
advance();
65+
return total;
66+
}
67+
68+
public void advance() {
69+
int tail = (head + 1) % windowSize;
70+
slotBaseCounter.wipeSlot(tail);
71+
head = tail;
72+
}
73+
74+
public int totalCount() {
75+
return slotBaseCounter.totalCount();
76+
}
77+
78+
@Override
79+
public String toString() {
80+
return "total = " + totalCount() + " head = " + head + " >> " + slotBaseCounter;
81+
}
82+
}

src/main/java/org/tron/core/net/node/NodeDelegateImpl.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static org.tron.core.config.Parameter.ChainConstant.BLOCK_PRODUCED_INTERVAL;
44
import static org.tron.core.config.Parameter.ChainConstant.BLOCK_SIZE;
5+
import static org.tron.core.config.Parameter.NodeConstant.MAX_TRANSACTION_PENDING;
56

67
import com.google.common.primitives.Longs;
78
import java.util.ArrayList;
@@ -103,10 +104,10 @@ public synchronized LinkedList<Sha256Hash> handleBlock(BlockCapsule block, boole
103104
@Override
104105
public void handleTransaction(TransactionCapsule trx) throws BadTransactionException {
105106
logger.info("handle transaction");
106-
// if (dbManager.getPendingTransactions().size() > MAX_TRANSACTION_PENDING) {
107-
// logger.warn("The pending txs list is full");
108-
// return;
109-
// }
107+
if (dbManager.getPendingTransactions().size() > MAX_TRANSACTION_PENDING * 2) {
108+
logger.warn("The pending txs list is full");
109+
return;
110+
}
110111

111112
if (dbManager.getTransactionIdCache().getIfPresent(trx.getTransactionId()) != null) {
112113
logger.warn("This transaction has been processed");

src/main/java/org/tron/core/net/node/NodeImpl.java

Lines changed: 39 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.Queue;
2626
import java.util.concurrent.ConcurrentHashMap;
2727
import java.util.concurrent.ConcurrentLinkedQueue;
28-
import java.util.concurrent.ExecutionException;
2928
import java.util.concurrent.ExecutorService;
3029
import java.util.concurrent.Executors;
3130
import java.util.concurrent.ScheduledExecutorService;
@@ -45,6 +44,7 @@
4544
import org.tron.common.overlay.server.SyncPool;
4645
import org.tron.common.utils.ExecutorLoop;
4746
import org.tron.common.utils.Sha256Hash;
47+
import org.tron.common.utils.SlidingWindowCounter;
4848
import org.tron.common.utils.Time;
4949
import org.tron.core.capsule.BlockCapsule;
5050
import org.tron.core.capsule.BlockCapsule.BlockId;
@@ -90,9 +90,14 @@ public class NodeImpl extends PeerConnectionDelegate implements Node {
9090
.maximumSize(10).expireAfterWrite(60, TimeUnit.SECONDS)
9191
.recordStats().build();
9292

93-
private Cache<Long, Long> fetchWaterLine = CacheBuilder.newBuilder()
94-
.expireAfterWrite(BLOCK_PRODUCED_INTERVAL / 1000 * MSG_CACHE_DURATION_IN_BLOCKS, TimeUnit.SECONDS)
95-
.recordStats().build();
93+
// private Cache<Long, Long> fetchWaterLine = CacheBuilder.newBuilder()
94+
// .expireAfterWrite(BLOCK_PRODUCED_INTERVAL / 1000 * MSG_CACHE_DURATION_IN_BLOCKS, TimeUnit.SECONDS)
95+
// .recordStats().build();
96+
97+
private SlidingWindowCounter fetchWaterLine =
98+
new SlidingWindowCounter(BLOCK_PRODUCED_INTERVAL / 100 * MSG_CACHE_DURATION_IN_BLOCKS);
99+
100+
96101

97102
private int maxTrxsSize = 1_000_000;
98103

@@ -287,6 +292,9 @@ public Thread newThread(Runnable r) {
287292
private ScheduledExecutorService handleSyncBlockExecutor = Executors
288293
.newSingleThreadScheduledExecutor();
289294

295+
private ScheduledExecutorService fetchWaterLineExecutor = Executors
296+
.newSingleThreadScheduledExecutor();
297+
290298
private volatile boolean isHandleSyncBlockActive = false;
291299

292300
private AtomicLong fetchSequenceCounter = new AtomicLong(0L);
@@ -473,6 +481,15 @@ private void activeTronPump() {
473481
logger.error("Unhandled exception", t);
474482
}
475483
}, 10, 1, TimeUnit.SECONDS);
484+
485+
//fetchWaterLine:
486+
fetchWaterLineExecutor.scheduleWithFixedDelay(() -> {
487+
try {
488+
fetchWaterLine.advance();
489+
} catch (Throwable t) {
490+
logger.error("Unhandled exception", t);
491+
}
492+
}, 1000, 100, TimeUnit.MILLISECONDS);
476493
}
477494

478495
private void consumerAdvObjToFetch() {
@@ -695,15 +712,16 @@ private void onHandleInventoryMessage(PeerConnection peer, InventoryMessage msg)
695712
if (msg.getInventoryType().equals(InventoryType.TRX)
696713
&& (peer.isAdvInvFull()
697714
|| isFlooded())) {
698-
logger.info("A peer is flooding us, stop handle inv, the peer is:" + peer);
715+
logger.warn("A peer is flooding us, stop handle inv, the peer is: " + peer);
699716
return;
700717
}
701718

702719
peer.getAdvObjSpreadToUs().put(id, System.currentTimeMillis());
703720
if (!requested[0]) {
704721
if (!badAdvObj.containsKey(id)) {
705722
if (!advObjToFetch.contains(id)) {
706-
addWaterLine();
723+
fetchWaterLine.increase();
724+
logger.info("water line:" + fetchWaterLine.totalCount());
707725
this.advObjToFetch.put(id, new PriorItem(new Item(id, msg.getInventoryType()),
708726
fetchSequenceCounter.incrementAndGet()));
709727
} else {
@@ -714,32 +732,27 @@ private void onHandleInventoryMessage(PeerConnection peer, InventoryMessage msg)
714732
}
715733
}
716734
}
717-
logger.info("Peer AdvObjSpreadToUs size: peer:" + peer.getNode().getHost() + "::" + peer.getAdvObjSpreadToUs().size());
718-
logger.info("this advObjToFetch size:" + this.advObjToFetch.size());
719-
logger.info("this advObjToRequest size" + this.advObjWeRequested.size());
720735
}
721736

722737
private boolean isFlooded() {
723-
try {
724-
long value = fetchWaterLine.get(Time.getCurrentMillis() / 1000, () -> 0L);
725-
fetchWaterLine.put(Time.getCurrentMillis() / 1000, value);
726-
} catch (ExecutionException e) {
727-
e.printStackTrace();
728-
}
729-
730-
return fetchWaterLine.asMap().values().stream().mapToLong(Long::longValue).sum()
738+
// try {
739+
// long value = fetchWaterLine.get(Time.getCurrentMillis() / 1000, () -> 0L);
740+
// fetchWaterLine.put(Time.getCurrentMillis() / 1000, value);
741+
// } catch (ExecutionException e) {
742+
// e.printStackTrace();
743+
// }
744+
return fetchWaterLine.totalCount()
731745
> BLOCK_PRODUCED_INTERVAL * NET_MAX_TRX_PER_SECOND * MSG_CACHE_DURATION_IN_BLOCKS / 1000;
732746
}
733747

734-
private void addWaterLine() {
735-
try {
736-
long value = fetchWaterLine.get(Time.getCurrentMillis() / 1000, () -> 0L);
737-
fetchWaterLine.put(Time.getCurrentMillis() / 1000, ++value);
738-
} catch (ExecutionException e) {
739-
e.printStackTrace();
740-
}
741-
logger.info("water line:" + fetchWaterLine.asMap().values().stream().mapToLong(Long::longValue).sum());
742-
}
748+
// private void addWaterLine() {
749+
// try {
750+
// long value = fetchWaterLine.get(Time.getCurrentMillis() / 1000, () -> 0L);
751+
// fetchWaterLine.put(Time.getCurrentMillis() / 1000, ++value);
752+
// } catch (ExecutionException e) {
753+
// e.printStackTrace();
754+
// }
755+
// }
743756

744757
@Override
745758
public void syncFrom(Sha256Hash myHeadBlockHash) {

0 commit comments

Comments
 (0)