|
41 | 41 | @Slf4j(topic = "net") |
42 | 42 | @Component |
43 | 43 | public class AdvService { |
44 | | - |
45 | 44 | private final int MAX_INV_TO_FETCH_CACHE_SIZE = 100_000; |
46 | 45 | private final int MAX_TRX_CACHE_SIZE = 50_000; |
47 | 46 | private final int MAX_BLOCK_CACHE_SIZE = 10; |
48 | 47 | private final int MAX_SPREAD_SIZE = 1_000; |
| 48 | + private final long TIMEOUT = MSG_CACHE_DURATION_IN_BLOCKS * BLOCK_PRODUCED_INTERVAL; |
49 | 49 |
|
50 | 50 | @Autowired |
51 | 51 | private TronNetDelegate tronNetDelegate; |
@@ -264,30 +264,30 @@ private void consumerInvToFetch() { |
264 | 264 | Collection<PeerConnection> peers = tronNetDelegate.getActivePeer().stream() |
265 | 265 | .filter(peer -> peer.isIdle()) |
266 | 266 | .collect(Collectors.toList()); |
267 | | - |
268 | 267 | InvSender invSender = new InvSender(); |
269 | | - long now = System.currentTimeMillis(); |
270 | 268 | synchronized (this) { |
271 | 269 | if (invToFetch.isEmpty() || peers.isEmpty()) { |
272 | 270 | return; |
273 | 271 | } |
| 272 | + long now = System.currentTimeMillis(); |
274 | 273 | invToFetch.forEach((item, time) -> { |
275 | | - if (time < now - MSG_CACHE_DURATION_IN_BLOCKS * BLOCK_PRODUCED_INTERVAL) { |
| 274 | + if (time < now - TIMEOUT) { |
276 | 275 | logger.info("This obj is too late to fetch, type: {} hash: {}", item.getType(), |
277 | 276 | item.getHash()); |
278 | 277 | invToFetch.remove(item); |
279 | 278 | invToFetchCache.invalidate(item); |
280 | 279 | return; |
281 | 280 | } |
282 | | - peers.stream().filter(peer -> peer.getAdvInvReceive().getIfPresent(item) != null |
283 | | - && invSender.getSize(peer) < MAX_TRX_FETCH_PER_PEER) |
284 | | - .sorted(Comparator.comparingInt(peer -> invSender.getSize(peer))) |
285 | | - .findFirst().ifPresent(peer -> { |
286 | | - if (peer.checkAndPutAdvInvRequest(item, now)) { |
287 | | - invSender.add(item, peer); |
288 | | - } |
289 | | - invToFetch.remove(item); |
290 | | - }); |
| 281 | + peers.stream().filter(peer -> { |
| 282 | + Long t = peer.getAdvInvReceive().getIfPresent(item); |
| 283 | + return t != null && now - t < TIMEOUT && invSender.getSize(peer) < MAX_TRX_FETCH_PER_PEER; |
| 284 | + }).sorted(Comparator.comparingInt(peer -> invSender.getSize(peer))) |
| 285 | + .findFirst().ifPresent(peer -> { |
| 286 | + if (peer.checkAndPutAdvInvRequest(item, now)) { |
| 287 | + invSender.add(item, peer); |
| 288 | + } |
| 289 | + invToFetch.remove(item); |
| 290 | + }); |
291 | 291 | }); |
292 | 292 | } |
293 | 293 |
|
|
0 commit comments