Skip to content

Commit b21ca8e

Browse files
authored
Merge pull request #941 from tronprotocol/addTxWaterLine
Add tx water line
2 parents ac1e8af + 5c257a1 commit b21ca8e

File tree

6 files changed

+113
-10
lines changed

6 files changed

+113
-10
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package org.tron.common.utils;
2+
3+
import java.util.Arrays;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
/**
7+
* Created by olivier on 2018/06/01
8+
*/
9+
class SlotBaseCounter {
10+
private int slotSize;
11+
private AtomicInteger[] slotCounter;
12+
13+
public SlotBaseCounter(int slotSize) {
14+
slotSize = slotSize < 1 ? 1 : slotSize;
15+
this.slotSize = slotSize;
16+
this.slotCounter = new AtomicInteger[slotSize];
17+
for (int i = 0; i < this.slotSize; i++) {
18+
slotCounter[i] = new AtomicInteger(0);
19+
}
20+
}
21+
22+
public void increaseSlot(int slotSize) {
23+
slotCounter[slotSize].incrementAndGet();
24+
}
25+
26+
public void wipeSlot(int slotSize) {
27+
slotCounter[slotSize].set(0);
28+
}
29+
30+
public int totalCount() {
31+
return Arrays.stream(slotCounter).mapToInt(slotCounter -> slotCounter.get()).sum();
32+
}
33+
34+
@Override
35+
public String toString() {
36+
return Arrays.toString(slotCounter);
37+
}
38+
}
39+
40+
/**
41+
* Created by olivier on 2018/06/01
42+
*/
43+
public class SlidingWindowCounter {
44+
private volatile SlotBaseCounter slotBaseCounter;
45+
private volatile int windowSize;
46+
private volatile int head;
47+
48+
public SlidingWindowCounter(int windowSize) {
49+
resizeWindow(windowSize);
50+
}
51+
52+
public synchronized void resizeWindow(int windowSize) {
53+
this.windowSize = windowSize;
54+
this.slotBaseCounter = new SlotBaseCounter(windowSize);
55+
this.head = 0;
56+
}
57+
58+
public void increase() {
59+
slotBaseCounter.increaseSlot(head);
60+
}
61+
62+
public int totalAndAdvance() {
63+
int total = totalCount();
64+
advance();
65+
return total;
66+
}
67+
68+
public void advance() {
69+
int tail = (head + 1) % windowSize;
70+
slotBaseCounter.wipeSlot(tail);
71+
head = tail;
72+
}
73+
74+
public int totalCount() {
75+
return slotBaseCounter.totalCount();
76+
}
77+
78+
@Override
79+
public String toString() {
80+
return "total = " + totalCount() + " head = " + head + " >> " + slotBaseCounter;
81+
}
82+
}

src/main/java/org/tron/core/config/Parameter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ interface NetConstants {
4545
long HEAD_NUM_MAX_DELTA = 1000L;
4646
long HEAD_NUM_CHECK_TIME = 60000L;
4747
int MAX_INVENTORY_SIZE_IN_MINUTES = 2;
48-
long NET_MAX_TRX_PER_SECOND = 1000L;
48+
long NET_MAX_TRX_PER_SECOND = 700L;
4949
long MAX_TRX_PER_PEER = 200L;
5050
int NET_MAX_INV_SIZE_IN_MINUTES = 2;
5151
int MSG_CACHE_DURATION_IN_BLOCKS = 5;

src/main/java/org/tron/core/db/Manager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -934,7 +934,7 @@ public synchronized BlockCapsule generateBlock(
934934
TransactionCapsule trx = (TransactionCapsule) iterator.next();
935935
if (DateTime.now().getMillis() - when
936936
> ChainConstant.BLOCK_PRODUCED_INTERVAL * 0.5 * ChainConstant.BLOCK_PRODUCED_TIME_OUT) {
937-
logger.debug("Processing transaction time exceeds the 50% producing time。");
937+
logger.warn("Processing transaction time exceeds the 50% producing time。");
938938
break;
939939
}
940940
// check the block size

src/main/java/org/tron/core/net/node/NodeDelegateImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public synchronized LinkedList<Sha256Hash> handleBlock(BlockCapsule block, boole
102102

103103
@Override
104104
public void handleTransaction(TransactionCapsule trx) throws BadTransactionException {
105-
logger.info("handle transaction");
105+
logger.debug("handle transaction");
106106
if (dbManager.getTransactionIdCache().getIfPresent(trx.getTransactionId()) != null) {
107107
logger.warn("This transaction has been processed");
108108
return;

src/main/java/org/tron/core/net/node/NodeImpl.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static org.tron.core.config.Parameter.ChainConstant.BLOCK_PRODUCED_INTERVAL;
44
import static org.tron.core.config.Parameter.NetConstants.MAX_TRX_PER_PEER;
55
import static org.tron.core.config.Parameter.NetConstants.MSG_CACHE_DURATION_IN_BLOCKS;
6+
import static org.tron.core.config.Parameter.NetConstants.NET_MAX_TRX_PER_SECOND;
67
import static org.tron.core.config.Parameter.NodeConstant.MAX_BLOCKS_ALREADY_FETCHED;
78
import static org.tron.core.config.Parameter.NodeConstant.MAX_BLOCKS_IN_PROCESS;
89
import static org.tron.core.config.Parameter.NodeConstant.MAX_BLOCKS_SYNC_FROM_ONE_PEER;
@@ -43,6 +44,7 @@
4344
import org.tron.common.overlay.server.SyncPool;
4445
import org.tron.common.utils.ExecutorLoop;
4546
import org.tron.common.utils.Sha256Hash;
47+
import org.tron.common.utils.SlidingWindowCounter;
4648
import org.tron.common.utils.Time;
4749
import org.tron.core.capsule.BlockCapsule;
4850
import org.tron.core.capsule.BlockCapsule.BlockId;
@@ -88,11 +90,13 @@ public class NodeImpl extends PeerConnectionDelegate implements Node {
8890
.maximumSize(10).expireAfterWrite(60, TimeUnit.SECONDS)
8991
.recordStats().build();
9092

93+
private SlidingWindowCounter fetchWaterLine =
94+
new SlidingWindowCounter(BLOCK_PRODUCED_INTERVAL * MSG_CACHE_DURATION_IN_BLOCKS / 100);
95+
9196
private int maxTrxsSize = 1_000_000;
9297

9398
private int maxTrxsCnt = 100;
9499

95-
96100
@Getter
97101
class PriorItem implements java.lang.Comparable<PriorItem> {
98102

@@ -281,6 +285,9 @@ public Thread newThread(Runnable r) {
281285
private ScheduledExecutorService handleSyncBlockExecutor = Executors
282286
.newSingleThreadScheduledExecutor();
283287

288+
private ScheduledExecutorService fetchWaterLineExecutor = Executors
289+
.newSingleThreadScheduledExecutor();
290+
284291
private volatile boolean isHandleSyncBlockActive = false;
285292

286293
private AtomicLong fetchSequenceCounter = new AtomicLong(0L);
@@ -467,6 +474,15 @@ private void activeTronPump() {
467474
logger.error("Unhandled exception", t);
468475
}
469476
}, 10, 1, TimeUnit.SECONDS);
477+
478+
//fetchWaterLine:
479+
fetchWaterLineExecutor.scheduleWithFixedDelay(() -> {
480+
try {
481+
fetchWaterLine.advance();
482+
} catch (Throwable t) {
483+
logger.error("Unhandled exception", t);
484+
}
485+
}, 1000, 100, TimeUnit.MILLISECONDS);
470486
}
471487

472488
private void consumerAdvObjToFetch() {
@@ -687,15 +703,18 @@ private void onHandleInventoryMessage(PeerConnection peer, InventoryMessage msg)
687703

688704
//avoid TRX flood attack here.
689705
if (msg.getInventoryType().equals(InventoryType.TRX)
690-
&& peer.isAdvInvFull()) {
691-
logger.info("A peer is flooding us, stop handle inv, the peer is:" + peer);
706+
&& (peer.isAdvInvFull()
707+
|| isFlooded())) {
708+
logger.warn("A peer is flooding us, stop handle inv, the peer is: " + peer);
692709
return;
693710
}
694711

695712
peer.getAdvObjSpreadToUs().put(id, System.currentTimeMillis());
696713
if (!requested[0]) {
697714
if (!badAdvObj.containsKey(id)) {
698715
if (!advObjToFetch.contains(id)) {
716+
fetchWaterLine.increase();
717+
logger.info("water line:" + fetchWaterLine.totalCount());
699718
this.advObjToFetch.put(id, new PriorItem(new Item(id, msg.getInventoryType()),
700719
fetchSequenceCounter.incrementAndGet()));
701720
} else {
@@ -708,6 +727,11 @@ private void onHandleInventoryMessage(PeerConnection peer, InventoryMessage msg)
708727
}
709728
}
710729

730+
private boolean isFlooded() {
731+
return fetchWaterLine.totalCount()
732+
> BLOCK_PRODUCED_INTERVAL * NET_MAX_TRX_PER_SECOND * MSG_CACHE_DURATION_IN_BLOCKS / 1000;
733+
}
734+
711735
@Override
712736
public void syncFrom(Sha256Hash myHeadBlockHash) {
713737
try {
@@ -721,7 +745,6 @@ public void syncFrom(Sha256Hash myHeadBlockHash) {
721745
logger.info("wait end");
722746
}
723747

724-
725748
private void onHandleBlockMessage(PeerConnection peer, BlockMessage blkMsg) {
726749
Map<Item, Long> advObjWeRequested = peer.getAdvObjWeRequested();
727750
Map<BlockId, Long> syncBlockRequested = peer.getSyncBlockRequested();

src/main/java/org/tron/core/net/peer/PeerConnection.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.tron.core.net.peer;
22

3-
import static org.tron.core.config.Parameter.ChainConstant.BLOCK_PRODUCED_INTERVAL;
43
import static org.tron.core.config.Parameter.NetConstants.MAX_INVENTORY_SIZE_IN_MINUTES;
54
import static org.tron.core.config.Parameter.NetConstants.NET_MAX_TRX_PER_SECOND;
65

@@ -169,8 +168,7 @@ private void removeIterator(Iterator<Entry<Sha256Hash, Long>> iterator, long old
169168
}
170169

171170
public boolean isAdvInvFull() {
172-
return advObjSpreadToUs.size() > MAX_INVENTORY_SIZE_IN_MINUTES * 60 * NET_MAX_TRX_PER_SECOND
173-
+ (MAX_INVENTORY_SIZE_IN_MINUTES + 1) * 60 / BLOCK_PRODUCED_INTERVAL;
171+
return advObjSpreadToUs.size() > MAX_INVENTORY_SIZE_IN_MINUTES * 60 * NET_MAX_TRX_PER_SECOND;
174172
}
175173

176174
public boolean isBanned() {

0 commit comments

Comments
 (0)