Skip to content

Commit c1f3956

Browse files
authored
Merge pull request #6393 from fyyhtx/rate_limit
feat(net): add rate limiting logic for P2P messages
2 parents 6c29cb2 + 46b1c09 commit c1f3956

File tree

12 files changed

+164
-3
lines changed

12 files changed

+164
-3
lines changed

common/src/main/java/org/tron/common/parameter/CommonParameter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,15 @@ public class CommonParameter {
429429
@Getter
430430
public int rateLimiterGlobalApiQps;
431431
@Getter
432+
@Setter
433+
public double rateLimiterSyncBlockChain;
434+
@Getter
435+
@Setter
436+
public double rateLimiterFetchInvData;
437+
@Getter
438+
@Setter
439+
public double rateLimiterDisconnect;
440+
@Getter
432441
public DbBackupConfig dbBackupConfig;
433442
@Getter
434443
public RocksDbSettings rocksDBCustomSettings;

common/src/main/java/org/tron/core/Constant.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,9 @@ public class Constant {
321321

322322
public static final String RATE_LIMITER_HTTP = "rate.limiter.http";
323323
public static final String RATE_LIMITER_RPC = "rate.limiter.rpc";
324+
public static final String RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN = "rate.limiter.p2p.syncBlockChain";
325+
public static final String RATE_LIMITER_P2P_FETCH_INV_DATA = "rate.limiter.p2p.fetchInvData";
326+
public static final String RATE_LIMITER_P2P_DISCONNECT = "rate.limiter.p2p.disconnect";
324327

325328
public static final String SEED_NODE_IP_LIST = "seed.node.ip.list";
326329
public static final String NODE_METRICS_ENABLE = "node.metricsEnable";

common/src/main/java/org/tron/core/exception/P2pException.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public enum TypeEnum {
5252
PROTOBUF_ERROR(14, "protobuf inconsistent"),
5353
BLOCK_SIGN_ERROR(15, "block sign error"),
5454
BLOCK_MERKLE_ERROR(16, "block merkle error"),
55+
RATE_LIMIT_EXCEEDED(17, "rate limit exceeded"),
5556

5657
DEFAULT(100, "default exception");
5758

framework/src/main/java/org/tron/core/config/args/Args.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,9 @@ public static void clearParam() {
238238
PARAMETER.rateLimiterGlobalQps = 50000;
239239
PARAMETER.rateLimiterGlobalIpQps = 10000;
240240
PARAMETER.rateLimiterGlobalApiQps = 1000;
241+
PARAMETER.rateLimiterSyncBlockChain = 3.0;
242+
PARAMETER.rateLimiterFetchInvData = 3.0;
243+
PARAMETER.rateLimiterDisconnect = 1.0;
241244
PARAMETER.p2pDisable = false;
242245
PARAMETER.dynamicConfigEnable = false;
243246
PARAMETER.dynamicConfigCheckInterval = 600;
@@ -1049,6 +1052,18 @@ public static void setParam(final Config config) {
10491052

10501053
PARAMETER.rateLimiterInitialization = getRateLimiterFromConfig(config);
10511054

1055+
PARAMETER.rateLimiterSyncBlockChain =
1056+
config.hasPath(Constant.RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN) ? config
1057+
.getDouble(Constant.RATE_LIMITER_P2P_SYNC_BLOCK_CHAIN) : 3.0;
1058+
1059+
PARAMETER.rateLimiterFetchInvData =
1060+
config.hasPath(Constant.RATE_LIMITER_P2P_FETCH_INV_DATA) ? config
1061+
.getDouble(Constant.RATE_LIMITER_P2P_FETCH_INV_DATA) : 3.0;
1062+
1063+
PARAMETER.rateLimiterDisconnect =
1064+
config.hasPath(Constant.RATE_LIMITER_P2P_DISCONNECT) ? config
1065+
.getDouble(Constant.RATE_LIMITER_P2P_DISCONNECT) : 1.0;
1066+
10521067
PARAMETER.changedDelegation =
10531068
config.hasPath(Constant.COMMITTEE_CHANGED_DELEGATION) ? config
10541069
.getInt(Constant.COMMITTEE_CHANGED_DELEGATION) : 0;

framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,11 @@ private void processMessage(PeerConnection peer, byte[] data) {
178178
handshakeService.processHelloMessage(peer, (HelloMessage) msg);
179179
break;
180180
case P2P_DISCONNECT:
181-
peer.getChannel().close();
182-
peer.getNodeStatistics()
183-
.nodeDisconnectedRemote(((DisconnectMessage)msg).getReason());
181+
if (peer.getP2pRateLimiter().tryAcquire(type.asByte())) {
182+
peer.getChannel().close();
183+
peer.getNodeStatistics()
184+
.nodeDisconnectedRemote(((DisconnectMessage)msg).getReason());
185+
}
184186
break;
185187
case SYNC_BLOCK_CHAIN:
186188
syncBlockChainMsgHandler.processMessage(peer, msg);
@@ -260,6 +262,7 @@ private void processException(PeerConnection peer, TronMessage msg, Exception ex
260262
code = Protocol.ReasonCode.NO_SUCH_MESSAGE;
261263
break;
262264
case BAD_MESSAGE:
265+
case RATE_LIMIT_EXCEEDED:
263266
code = Protocol.ReasonCode.BAD_PROTOCOL;
264267
break;
265268
case SYNC_FAILED:
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package org.tron.core.net;
2+
3+
import com.google.common.cache.Cache;
4+
import com.google.common.cache.CacheBuilder;
5+
import com.google.common.util.concurrent.RateLimiter;
6+
7+
public class P2pRateLimiter {
8+
private final Cache<Byte, RateLimiter> rateLimiters = CacheBuilder.newBuilder()
9+
.maximumSize(32).build();
10+
11+
public void register(Byte type, double rate) {
12+
RateLimiter rateLimiter = RateLimiter.create(Double.POSITIVE_INFINITY);
13+
rateLimiter.setRate(rate);
14+
rateLimiters.put(type, rateLimiter);
15+
}
16+
17+
public void acquire(Byte type) {
18+
RateLimiter rateLimiter = rateLimiters.getIfPresent(type);
19+
if (rateLimiter == null) {
20+
return;
21+
}
22+
rateLimiter.acquire();
23+
}
24+
25+
public boolean tryAcquire(Byte type) {
26+
RateLimiter rateLimiter = rateLimiters.getIfPresent(type);
27+
if (rateLimiter == null) {
28+
return true;
29+
}
30+
return rateLimiter.tryAcquire();
31+
}
32+
}

framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,14 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr
156156
if (!peer.isNeedSyncFromUs()) {
157157
throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync");
158158
}
159+
if (!peer.getP2pRateLimiter().tryAcquire(fetchInvDataMsg.getType().asByte())) {
160+
throw new P2pException(TypeEnum.RATE_LIMIT_EXCEEDED, fetchInvDataMsg.getType()
161+
+ " message exceeds the rate limit");
162+
}
163+
if (fetchInvDataMsg.getHashList().size() > NetConstants.MAX_BLOCK_FETCH_PER_PEER) {
164+
throw new P2pException(TypeEnum.BAD_MESSAGE, "fetch too many blocks, size:"
165+
+ fetchInvDataMsg.getHashList().size());
166+
}
159167
for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
160168
long blockNum = new BlockId(hash).getNum();
161169
long minBlockNum =

framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
5858
}
5959

6060
private boolean check(PeerConnection peer, SyncBlockChainMessage msg) throws P2pException {
61+
if (peer.getRemainNum() > 0
62+
&& !peer.getP2pRateLimiter().tryAcquire(msg.getType().asByte())) {
63+
// Discard messages that exceed the rate limit
64+
logger.warn("{} message from peer {} exceeds the rate limit",
65+
msg.getType(), peer.getInetSocketAddress());
66+
return false;
67+
}
68+
6169
List<BlockId> blockIds = msg.getBlockIds();
6270
if (CollectionUtils.isEmpty(blockIds)) {
6371
throw new P2pException(TypeEnum.BAD_MESSAGE, "SyncBlockChain blockIds is empty");

framework/src/main/java/org/tron/core/net/peer/PeerConnection.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package org.tron.core.net.peer;
22

3+
import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA;
4+
import static org.tron.core.net.message.MessageTypes.P2P_DISCONNECT;
5+
import static org.tron.core.net.message.MessageTypes.SYNC_BLOCK_CHAIN;
6+
37
import com.google.common.cache.Cache;
48
import com.google.common.cache.CacheBuilder;
59
import com.google.protobuf.ByteString;
@@ -32,6 +36,7 @@
3236
import org.tron.core.config.args.Args;
3337
import org.tron.core.metrics.MetricsKey;
3438
import org.tron.core.metrics.MetricsUtil;
39+
import org.tron.core.net.P2pRateLimiter;
3540
import org.tron.core.net.TronNetDelegate;
3641
import org.tron.core.net.message.adv.InventoryMessage;
3742
import org.tron.core.net.message.adv.TransactionsMessage;
@@ -156,6 +161,8 @@ public class PeerConnection {
156161
@Setter
157162
@Getter
158163
private volatile boolean needSyncFromUs = true;
164+
@Getter
165+
private P2pRateLimiter p2pRateLimiter = new P2pRateLimiter();
159166

160167
public void setChannel(Channel channel) {
161168
this.channel = channel;
@@ -164,6 +171,12 @@ public void setChannel(Channel channel) {
164171
}
165172
this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress());
166173
lastInteractiveTime = System.currentTimeMillis();
174+
p2pRateLimiter.register(SYNC_BLOCK_CHAIN.asByte(),
175+
Args.getInstance().getRateLimiterSyncBlockChain());
176+
p2pRateLimiter.register(FETCH_INV_DATA.asByte(),
177+
Args.getInstance().getRateLimiterFetchInvData());
178+
p2pRateLimiter.register(P2P_DISCONNECT.asByte(),
179+
Args.getInstance().getRateLimiterDisconnect());
167180
}
168181

169182
public void setBlockBothHave(BlockId blockId) {
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.tron.core.net;
2+
3+
import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA;
4+
import static org.tron.core.net.message.MessageTypes.SYNC_BLOCK_CHAIN;
5+
6+
import org.junit.Assert;
7+
import org.junit.Test;
8+
9+
public class P2pRateLimiterTest {
10+
@Test
11+
public void test() {
12+
P2pRateLimiter limiter = new P2pRateLimiter();
13+
limiter.register(SYNC_BLOCK_CHAIN.asByte(), 2);
14+
limiter.acquire(SYNC_BLOCK_CHAIN.asByte());
15+
boolean ret = limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte());
16+
Assert.assertTrue(ret);
17+
limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte());
18+
ret = limiter.tryAcquire(SYNC_BLOCK_CHAIN.asByte());
19+
Assert.assertFalse(ret);
20+
ret = limiter.tryAcquire(FETCH_INV_DATA.asByte());
21+
Assert.assertTrue(ret);
22+
}
23+
}

0 commit comments

Comments
 (0)