Skip to content
This repository was archived by the owner on Jul 1, 2025. It is now read-only.

Commit 506fa71

Browse files
Matilda-Clerkemacfarlapinges
authored andcommitted
Add GetBodiesFromPeerTask (#8040)
* 7311: Add PeerTask system for use in future PRs Signed-off-by: Matilda Clerke <[email protected]> * 7311: Clean up some warnings Signed-off-by: Matilda Clerke <[email protected]> * 7311: Add feature toggle for enabling use of the peertask system where available Signed-off-by: Matilda Clerke <[email protected]> * 7311: Remove log used for testing, apply spotless Signed-off-by: Matilda Clerke <[email protected]> * 7311: Add private constructor to PeerTaskFeatureToggle to prevent instantiation Signed-off-by: Matilda Clerke <[email protected]> * 7311: Switch to logging a warning instead of throwing an exception when initializing PeerTaskFeatureToggle multiple times Signed-off-by: Matilda Clerke <[email protected]> * 7311: Update javadoc to match previous commit Signed-off-by: Matilda Clerke <[email protected]> * 7311: spotless Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix broken BesuCommandTest Signed-off-by: Matilda Clerke <[email protected]> * 7311: add class Signed-off-by: Matilda Clerke <[email protected]> * 7311: Move PeerTaskFeatureToggle to more appropriate location Signed-off-by: Matilda Clerke <[email protected]> * 7311: add X prefix to peertask-system-enabled Signed-off-by: Matilda Clerke <[email protected]> * 7311: Move --Xpeertask-system-enabled out of BesuCommand and make hidden Signed-off-by: Matilda Clerke <[email protected]> * 7311: spotless Signed-off-by: Matilda Clerke <[email protected]> * 7311: Add GetReceiptsFromPeerTask Signed-off-by: Matilda Clerke <[email protected]> * 7311: Move isPeerTaskSystemEnabled to SynchronizerOptions Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix javadoc issue Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix javadoc issue Signed-off-by: Matilda Clerke <[email protected]> * 7311: Move PeerTaskFeatureToggleTestHelper to TestUtil and fix RunnerTest Signed-off-by: Matilda Clerke <[email protected]> * 7311: spotless Signed-off-by: Matilda Clerke <[email protected]> * 7311: Remove PeerTaskFeatureToggle in favor of including isPeerTaskSystemEnabled in SynchronizerConfiguration Signed-off-by: Matilda Clerke <[email protected]> * 7311: Adjust to the removal of PeerTaskFeatureToggle and use SynchronizerConfiguration to get the toggle instead Signed-off-by: Matilda Clerke <[email protected]> * 7311: Reduce timeout in PeerTaskRequestSender to 5s Signed-off-by: Matilda Clerke <[email protected]> * 7311: Refactor PeerManager to be an interface Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix up compile errors after merge Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix MetricsAcceptanceTest Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix MetricsAcceptanceTest Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix DownloadReceiptsStep when using peer task system Signed-off-by: Matilda Clerke <[email protected]> * 7311: Rename PeerManager to PeerSelector Signed-off-by: Matilda Clerke <[email protected]> * 7311: Reword PeerSelector javadoc to avoid implementation details Signed-off-by: Matilda Clerke <[email protected]> * 7311: Use ConcurrentHashMap in DefaultPeerSelector Signed-off-by: Matilda Clerke <[email protected]> * 7311: Reword trace log in DefaultPeerSelector Signed-off-by: Matilda Clerke <[email protected]> * 7311: Remove unused imports Signed-off-by: Matilda Clerke <[email protected]> * 7311: Use a 1 second delay between retries in PeerTaskExecutor to match old implementation Signed-off-by: Matilda Clerke <[email protected]> * 7311: Add testGetPeerButNoPeerMatchesFilter to DefaultPeerSelectorTest Signed-off-by: Matilda Clerke <[email protected]> * 7311: Add testGetPeerButNoPeerMatchesFilter to DefaultPeerSelectorTest Signed-off-by: Matilda Clerke <[email protected]> * 7311: spotless Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix MetricsAcceptanceTest Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix MetricsAcceptanceTest Signed-off-by: Matilda Clerke <[email protected]> * 7311: Modify PeerTaskExecutor metric to include response time from peer Signed-off-by: Matilda Clerke <[email protected]> * 7311: Use SubProtocol instead of subprotocol name string in PeerTask Signed-off-by: Matilda Clerke <[email protected]> * 7311: rename timing context to ignored to prevent intellij warnings Signed-off-by: Matilda Clerke <[email protected]> * 7311: Use constants for number of retries Signed-off-by: Matilda Clerke <[email protected]> * 7311: Convert PeerTaskExecutorResult to a record Signed-off-by: Matilda Clerke <[email protected]> * 7311: Rename PeerTaskBehavior to PeerTaskRetryBehavior Signed-off-by: Matilda Clerke <[email protected]> * 7311: Move peer selection logic to PeerSelector Signed-off-by: Matilda Clerke <[email protected]> * 7311: spotless Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix up everything broken after merge Signed-off-by: Matilda Clerke <[email protected]> * 7311: Attempt to improve performance of peer task system in pipeline Signed-off-by: Matilda Clerke <[email protected]> * 7311: fix compile check Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix broken workflow Signed-off-by: Matilda Clerke <[email protected]> * 7311: Reduce logging in JsonRpcExecutor to trace level Signed-off-by: Matilda Clerke <[email protected]> * 7311: More changes in DownloadReceiptsStep Signed-off-by: Matilda Clerke <[email protected]> * 7311: Rework DownloadReceiptsStep Signed-off-by: Matilda Clerke <[email protected]> * 7311: Make changes as discussed in walkthrough meeting Remove DefaultPeerSelector, make EthPeers implement PeerSelector interface, and add PeerTask.getPeerRequirementFilter Signed-off-by: Matilda Clerke <[email protected]> * 7311: Update after merge and make discussed changes from walkthrough discussion Signed-off-by: Matilda Clerke <[email protected]> * 7311: Change to regular HashMap Signed-off-by: Matilda Clerke <[email protected]> * 7311: Remove runtime exception again Signed-off-by: Matilda Clerke <[email protected]> * 7311: Rename getPeerTaskBehavior to getPeerTaskRetryBehavior Signed-off-by: Matilda Clerke <[email protected]> * 7311: Rename getPeerTaskBehavior to getPeerTaskRetryBehavior Signed-off-by: Matilda Clerke <[email protected]> * 7311: Rework PeerTaskExecutor retry system to be 0-based Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix up compile errors after merge Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix broken DownloadReceiptsStepTest test Signed-off-by: Matilda Clerke <[email protected]> * 7311: Move GetReceipts to services worker for parallelism Signed-off-by: Matilda Clerke <[email protected]> * 7311: Refactor peer task system usage in DownloadReceiptsStep to better match old system Signed-off-by: Matilda Clerke <[email protected]> * 7311: Remove unused async methods in PeerTaskExecutor Signed-off-by: Matilda Clerke <[email protected]> * 7311: Return Optional<EthPeer> in PeerSelector.getPeer and utilise existing peer selection behavior in EthPeers Signed-off-by: Matilda Clerke <[email protected]> * 7311: Update after merge Signed-off-by: Matilda Clerke <[email protected]> * 7311: Redo getPeer again to include hasAvailableRequestCapacity check Signed-off-by: Matilda Clerke <[email protected]> * 7311: Add protocol spec supplier to GetReceiptsFromPeerTask Signed-off-by: Matilda Clerke <[email protected]> * 7311: Rework getPeer again to use LEAST_TO_MOST_BUSY comparator Signed-off-by: Matilda Clerke <[email protected]> * 7311: Import PeerNotConnected class instead of using fully qualified class name Signed-off-by: Matilda Clerke <[email protected]> * 7311: Change to specifying retry counts in PeerTask instead of behavior enums Signed-off-by: Matilda Clerke <[email protected]> * 7311: clean up after merge Signed-off-by: Matilda Clerke <[email protected]> * 7311: clean up after merge Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix up javadoc Signed-off-by: Matilda Clerke <[email protected]> * 7311: Add additional metrics to PeerTaskExecutor Signed-off-by: Matilda Clerke <[email protected]> * 7311: Add Predicate to PeerTask to check for partial success Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix incorrect name on isPartialSuccessTest Signed-off-by: Matilda Clerke <[email protected]> * 7311: Implement isPartialSuccess and add unit tests Signed-off-by: Matilda Clerke <[email protected]> * 7311: Add partialSuccessCounter and inflightRequestGauge in PeerTaskExecutor Signed-off-by: Matilda Clerke <[email protected]> * 7311: Also filter by whether a peer is fully validated Signed-off-by: Matilda Clerke <[email protected]> * 7311: Remove unneeded throws in RunnerTest Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix up inflight requests gauge in PeerTaskExecutor Signed-off-by: Matilda Clerke <[email protected]> * 7311: Update plugin api hash Signed-off-by: Matilda Clerke <[email protected]> * 7311: Update plugin api hash Signed-off-by: Matilda Clerke <[email protected]> * 7311: Add javadoc to LabelledGauge.isLabelsObserved Signed-off-by: Matilda Clerke <[email protected]> * 7311: Update plugin-api hash Signed-off-by: Matilda Clerke <[email protected]> * 7311: Update changelog Signed-off-by: Matilda Clerke <[email protected]> * 7311: Handle headers with no receipts as a special case in DownloadReceiptsStep Signed-off-by: Matilda Clerke <[email protected]> * 7311: Complete merge Signed-off-by: Matilda Clerke <[email protected]> * 7311: Use taskName instead of className for labelNames Signed-off-by: Matilda Clerke <[email protected]> * 7311: Use snake_case for metric names Signed-off-by: Matilda Clerke <[email protected]> * 7311: Use _total metric name suffix Signed-off-by: Matilda Clerke <[email protected]> * 7311: rework partial success handling Signed-off-by: Matilda Clerke <[email protected]> * 7311: Update GetReceiptsFromPeerTask with partialSuccess changes Signed-off-by: Matilda Clerke <[email protected]> * 7311: Add default implementation to LabelledGauge.isLabelsObserved Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix broken unit test Signed-off-by: Matilda Clerke <[email protected]> * 7311: Rename parseResponse to processResponse Signed-off-by: Matilda Clerke <[email protected]> * add possibility to use the new peer task system when downloading the bodies Signed-off-by: [email protected] <[email protected]> * fix loop Signed-off-by: [email protected] <[email protected]> * 7311: Wrap peer task system usage in ethScheduler call to match other usages Signed-off-by: Matilda Clerke <[email protected]> * small fixes Signed-off-by: [email protected] <[email protected]> * update API change Signed-off-by: [email protected] <[email protected]> * spotless Signed-off-by: [email protected] <[email protected]> * 7311: apply spotless Signed-off-by: Matilda Clerke <[email protected]> * 7311: Move check for empty trie hash into GetReceiptsFromPeerTask and update unit test to test for this functionality Signed-off-by: Matilda Clerke <[email protected]> * 7311: spotless Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix compile issue after merge Signed-off-by: Matilda Clerke <[email protected]> * 7311: Remove BodyValidator and update code and test to match Signed-off-by: Matilda Clerke <[email protected]> * 7311: spotless Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix up pre-fill and add test to test failure scenario Signed-off-by: Matilda Clerke <[email protected]> * 7311: Use ProtocolSchedule.anyMatch to find if any ProtocolSpecs are PoS, remove new usages of currentProtocolSpecSupplier Signed-off-by: Matilda Clerke <[email protected]> * 7311: Only attempt to remove headers on successful requests Signed-off-by: Matilda Clerke <[email protected]> * 7311: Fix broken stuff after merge Signed-off-by: Matilda Clerke <[email protected]> * spotless Signed-off-by: [email protected] <[email protected]> * Fix up compile errors after merge Signed-off-by: Matilda Clerke <[email protected]> * Add PeerTaskExecutor usage for GetBodies in DownloadHeaderSequenceTask Signed-off-by: Matilda Clerke <[email protected]> * Add PeerTaskExecutor usage for GetBodies in ForwardSyncStep and apply spotless Signed-off-by: Matilda Clerke <[email protected]> * Allow custom retries against other peers in GetBodiesFromPeerTask Signed-off-by: Matilda Clerke <[email protected]> * Fix infinite loop in CheckPointSyncChainDownloaderTest Signed-off-by: Matilda Clerke <[email protected]> * spotless Signed-off-by: Matilda Clerke <[email protected]> * Update CompleteBlocksWithPeerTask.getBlocks to retrieveBlocksFromPeers and add javadoc Signed-off-by: Matilda Clerke <[email protected]> * Add javadoc to GetBodiesFromPeerTask Signed-off-by: Matilda Clerke <[email protected]> * 7582: Simplify withdrawals validation Signed-off-by: Matilda Clerke <[email protected]> --------- Signed-off-by: Matilda Clerke <[email protected]> Signed-off-by: [email protected] <[email protected]> Co-authored-by: Sally MacFarlane <[email protected]> Co-authored-by: [email protected] <[email protected]>
1 parent 11bf6aa commit 506fa71

19 files changed

+970
-51
lines changed
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Copyright contributors to Hyperledger Besu.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*
13+
* SPDX-License-Identifier: Apache-2.0
14+
*/
15+
package org.hyperledger.besu.ethereum.eth.manager.peertask.task;
16+
17+
import org.hyperledger.besu.ethereum.core.Block;
18+
import org.hyperledger.besu.ethereum.core.BlockBody;
19+
import org.hyperledger.besu.ethereum.core.BlockHeader;
20+
import org.hyperledger.besu.ethereum.eth.EthProtocol;
21+
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
22+
import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException;
23+
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask;
24+
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskValidationResponse;
25+
import org.hyperledger.besu.ethereum.eth.messages.BlockBodiesMessage;
26+
import org.hyperledger.besu.ethereum.eth.messages.GetBlockBodiesMessage;
27+
import org.hyperledger.besu.ethereum.mainnet.BodyValidation;
28+
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
29+
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
30+
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;
31+
32+
import java.util.ArrayList;
33+
import java.util.List;
34+
import java.util.function.Predicate;
35+
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
39+
/**
40+
* Implements PeerTask for getting block bodies from peers, and matches headers to bodies to supply
41+
* full blocks
42+
*/
43+
public class GetBodiesFromPeerTask implements PeerTask<List<Block>> {
44+
45+
private static final Logger LOG = LoggerFactory.getLogger(GetBodiesFromPeerTask.class);
46+
47+
private static final int DEFAULT_RETRIES_AGAINST_OTHER_PEERS = 5;
48+
49+
private final List<BlockHeader> blockHeaders;
50+
private final ProtocolSchedule protocolSchedule;
51+
private final int allowedRetriesAgainstOtherPeers;
52+
53+
private final long requiredBlockchainHeight;
54+
private final List<Block> blocks = new ArrayList<>();
55+
private final boolean isPoS;
56+
57+
public GetBodiesFromPeerTask(
58+
final List<BlockHeader> blockHeaders, final ProtocolSchedule protocolSchedule) {
59+
this(blockHeaders, protocolSchedule, DEFAULT_RETRIES_AGAINST_OTHER_PEERS);
60+
}
61+
62+
public GetBodiesFromPeerTask(
63+
final List<BlockHeader> blockHeaders,
64+
final ProtocolSchedule protocolSchedule,
65+
final int allowedRetriesAgainstOtherPeers) {
66+
if (blockHeaders == null || blockHeaders.isEmpty()) {
67+
throw new IllegalArgumentException("Block headers must not be empty");
68+
}
69+
70+
this.blockHeaders = blockHeaders;
71+
this.protocolSchedule = protocolSchedule;
72+
this.allowedRetriesAgainstOtherPeers = allowedRetriesAgainstOtherPeers;
73+
74+
this.requiredBlockchainHeight =
75+
blockHeaders.stream()
76+
.mapToLong(BlockHeader::getNumber)
77+
.max()
78+
.orElse(BlockHeader.GENESIS_BLOCK_NUMBER);
79+
this.isPoS = protocolSchedule.getByBlockHeader(blockHeaders.getLast()).isPoS();
80+
}
81+
82+
@Override
83+
public SubProtocol getSubProtocol() {
84+
return EthProtocol.get();
85+
}
86+
87+
@Override
88+
public MessageData getRequestMessage() {
89+
return GetBlockBodiesMessage.create(
90+
blockHeaders.stream().map(BlockHeader::getBlockHash).toList());
91+
}
92+
93+
@Override
94+
public List<Block> processResponse(final MessageData messageData)
95+
throws InvalidPeerTaskResponseException {
96+
// Blocks returned by this method are in the same order as the headers, but might not be
97+
// complete
98+
if (messageData == null) {
99+
throw new InvalidPeerTaskResponseException();
100+
}
101+
final BlockBodiesMessage blocksMessage = BlockBodiesMessage.readFrom(messageData);
102+
final List<BlockBody> blockBodies = blocksMessage.bodies(protocolSchedule);
103+
if (blockBodies.isEmpty() || blockBodies.size() > blockHeaders.size()) {
104+
throw new InvalidPeerTaskResponseException();
105+
}
106+
107+
for (int i = 0; i < blockBodies.size(); i++) {
108+
final BlockBody blockBody = blockBodies.get(i);
109+
final BlockHeader blockHeader = blockHeaders.get(i);
110+
if (!blockBodyMatchesBlockHeader(blockBody, blockHeader)) {
111+
LOG.atDebug().setMessage("Received block body does not match block header").log();
112+
throw new InvalidPeerTaskResponseException();
113+
}
114+
115+
blocks.add(new Block(blockHeader, blockBody));
116+
}
117+
return blocks;
118+
}
119+
120+
@Override
121+
public int getRetriesWithOtherPeer() {
122+
return allowedRetriesAgainstOtherPeers;
123+
}
124+
125+
private boolean blockBodyMatchesBlockHeader(
126+
final BlockBody blockBody, final BlockHeader blockHeader) {
127+
// this method validates that the block body matches the block header by calculating the roots
128+
// of the block body and comparing them to the roots in the block header
129+
if (!BodyValidation.transactionsRoot(blockBody.getTransactions())
130+
.equals(blockHeader.getTransactionsRoot())) {
131+
return false;
132+
}
133+
if (!BodyValidation.ommersHash(blockBody.getOmmers()).equals(blockHeader.getOmmersHash())) {
134+
return false;
135+
}
136+
if (!blockBody
137+
.getWithdrawals()
138+
.map(BodyValidation::withdrawalsRoot)
139+
.equals(blockHeader.getWithdrawalsRoot())) {
140+
return false;
141+
}
142+
143+
return true;
144+
}
145+
146+
@Override
147+
public Predicate<EthPeer> getPeerRequirementFilter() {
148+
return (ethPeer) ->
149+
isPoS || ethPeer.chainState().getEstimatedHeight() >= requiredBlockchainHeight;
150+
}
151+
152+
@Override
153+
public PeerTaskValidationResponse validateResult(final List<Block> result) {
154+
if (result.isEmpty()) {
155+
return PeerTaskValidationResponse.NO_RESULTS_RETURNED;
156+
}
157+
return PeerTaskValidationResponse.RESULTS_VALID_AND_GOOD;
158+
}
159+
160+
public List<BlockHeader> getBlockHeaders() {
161+
return blockHeaders;
162+
}
163+
}

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ public DefaultSynchronizer(
137137
syncState,
138138
metricsSystem,
139139
terminationCondition,
140+
peerTaskExecutor,
140141
syncDurationMetrics));
141142

142143
if (SyncMode.FAST.equals(syncConfig.getSyncMode())) {

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DownloadBodiesStep.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.hyperledger.besu.ethereum.core.BlockHeader;
1919
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
2020
import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksTask;
21+
import org.hyperledger.besu.ethereum.eth.sync.tasks.CompleteBlocksWithPeerTask;
2122
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
2223
import org.hyperledger.besu.plugin.services.MetricsSystem;
2324

@@ -31,19 +32,38 @@ public class DownloadBodiesStep
3132
private final ProtocolSchedule protocolSchedule;
3233
private final EthContext ethContext;
3334
private final MetricsSystem metricsSystem;
35+
private final SynchronizerConfiguration synchronizerConfiguration;
3436

3537
public DownloadBodiesStep(
3638
final ProtocolSchedule protocolSchedule,
3739
final EthContext ethContext,
40+
final SynchronizerConfiguration synchronizerConfiguration,
3841
final MetricsSystem metricsSystem) {
3942
this.protocolSchedule = protocolSchedule;
4043
this.ethContext = ethContext;
44+
this.synchronizerConfiguration = synchronizerConfiguration;
4145
this.metricsSystem = metricsSystem;
4246
}
4347

4448
@Override
4549
public CompletableFuture<List<Block>> apply(final List<BlockHeader> blockHeaders) {
46-
return CompleteBlocksTask.forHeaders(protocolSchedule, ethContext, blockHeaders, metricsSystem)
47-
.run();
50+
if (synchronizerConfiguration.isPeerTaskSystemEnabled()) {
51+
return ethContext
52+
.getScheduler()
53+
.scheduleServiceTask(() -> getBodiesWithPeerTaskSystem(blockHeaders));
54+
} else {
55+
return CompleteBlocksTask.forHeaders(
56+
protocolSchedule, ethContext, blockHeaders, metricsSystem)
57+
.run();
58+
}
59+
}
60+
61+
private CompletableFuture<List<Block>> getBodiesWithPeerTaskSystem(
62+
final List<BlockHeader> headers) {
63+
64+
final CompleteBlocksWithPeerTask completeBlocksWithPeerTask =
65+
new CompleteBlocksWithPeerTask(protocolSchedule, headers, ethContext.getPeerTaskExecutor());
66+
final List<Block> blocks = completeBlocksWithPeerTask.retrieveBlocksFromPeers();
67+
return CompletableFuture.completedFuture(blocks);
4868
}
4969
}

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStep.java

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
import org.hyperledger.besu.ethereum.core.Block;
1818
import org.hyperledger.besu.ethereum.core.BlockHeader;
19+
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResponseCode;
20+
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutorResult;
21+
import org.hyperledger.besu.ethereum.eth.manager.peertask.task.GetBodiesFromPeerTask;
1922
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask;
2023
import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetBlocksFromPeersTask;
2124

@@ -53,9 +56,9 @@ public CompletableFuture<Void> possibleRequestBodies(final List<BlockHeader> blo
5356
LOG.atDebug()
5457
.setMessage("Requesting {} blocks {}->{} ({})")
5558
.addArgument(blockHeaders::size)
56-
.addArgument(() -> blockHeaders.get(0).getNumber())
57-
.addArgument(() -> blockHeaders.get(blockHeaders.size() - 1).getNumber())
58-
.addArgument(() -> blockHeaders.get(0).getHash().toHexString())
59+
.addArgument(() -> blockHeaders.getFirst().getNumber())
60+
.addArgument(() -> blockHeaders.getLast().getNumber())
61+
.addArgument(() -> blockHeaders.getFirst().getHash().toHexString())
5962
.log();
6063
return requestBodies(blockHeaders)
6164
.thenApply(this::saveBlocks)
@@ -76,23 +79,47 @@ public CompletableFuture<Void> possibleRequestBodies(final List<BlockHeader> blo
7679

7780
@VisibleForTesting
7881
protected CompletableFuture<List<Block>> requestBodies(final List<BlockHeader> blockHeaders) {
79-
final RetryingGetBlocksFromPeersTask getBodiesFromPeerTask =
80-
RetryingGetBlocksFromPeersTask.forHeaders(
81-
context.getProtocolSchedule(),
82-
context.getEthContext(),
83-
context.getMetricsSystem(),
84-
context.getEthContext().getEthPeers().peerCount(),
85-
blockHeaders);
82+
CompletableFuture<List<Block>> blocksFuture;
83+
if (context.getSynchronizerConfiguration().isPeerTaskSystemEnabled()) {
84+
blocksFuture =
85+
context
86+
.getEthContext()
87+
.getScheduler()
88+
.scheduleServiceTask(
89+
() -> {
90+
GetBodiesFromPeerTask task =
91+
new GetBodiesFromPeerTask(
92+
blockHeaders,
93+
context.getProtocolSchedule(),
94+
context.getEthContext().getEthPeers().peerCount());
95+
PeerTaskExecutorResult<List<Block>> taskResult =
96+
context.getEthContext().getPeerTaskExecutor().execute(task);
97+
if (taskResult.responseCode() == PeerTaskExecutorResponseCode.SUCCESS
98+
&& taskResult.result().isPresent()) {
99+
return CompletableFuture.completedFuture(taskResult.result().get());
100+
} else {
101+
return CompletableFuture.failedFuture(
102+
new RuntimeException(taskResult.responseCode().toString()));
103+
}
104+
});
105+
} else {
106+
final RetryingGetBlocksFromPeersTask getBodiesFromPeerTask =
107+
RetryingGetBlocksFromPeersTask.forHeaders(
108+
context.getProtocolSchedule(),
109+
context.getEthContext(),
110+
context.getMetricsSystem(),
111+
context.getEthContext().getEthPeers().peerCount(),
112+
blockHeaders);
86113

87-
final CompletableFuture<AbstractPeerTask.PeerTaskResult<List<Block>>> run =
88-
getBodiesFromPeerTask.run();
89-
return run.thenApply(AbstractPeerTask.PeerTaskResult::getResult)
90-
.thenApply(
91-
blocks -> {
92-
LOG.debug("Got {} blocks from peers", blocks.size());
93-
blocks.sort(Comparator.comparing(block -> block.getHeader().getNumber()));
94-
return blocks;
95-
});
114+
blocksFuture =
115+
getBodiesFromPeerTask.run().thenApply(AbstractPeerTask.PeerTaskResult::getResult);
116+
}
117+
return blocksFuture.thenApply(
118+
blocks -> {
119+
LOG.debug("Got {} blocks from peers", blocks.size());
120+
blocks.sort(Comparator.comparing(block -> block.getHeader().getNumber()));
121+
return blocks;
122+
});
96123
}
97124

98125
@VisibleForTesting

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public Pipeline<SyncTargetRange> createDownloadPipelineForSyncTarget(final SyncT
140140
final RangeHeadersValidationStep validateHeadersJoinUpStep =
141141
new RangeHeadersValidationStep(protocolSchedule, protocolContext, detachedValidationPolicy);
142142
final DownloadBodiesStep downloadBodiesStep =
143-
new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem);
143+
new DownloadBodiesStep(protocolSchedule, ethContext, syncConfig, metricsSystem);
144144
final DownloadReceiptsStep downloadReceiptsStep =
145145
new DownloadReceiptsStep(protocolSchedule, ethContext, syncConfig, metricsSystem);
146146
final ImportBlocksStep importBlockStep =

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncChainDownloader.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import org.hyperledger.besu.ethereum.ProtocolContext;
1818
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
19+
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
1920
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
2021
import org.hyperledger.besu.ethereum.eth.sync.PipelineChainDownloader;
2122
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
@@ -35,7 +36,8 @@ public static ChainDownloader create(
3536
final SyncState syncState,
3637
final MetricsSystem metricsSystem,
3738
final SyncTerminationCondition terminationCondition,
38-
final SyncDurationMetrics syncDurationMetrics) {
39+
final SyncDurationMetrics syncDurationMetrics,
40+
final PeerTaskExecutor peerTaskExecutor) {
3941

4042
final FullSyncTargetManager syncTargetManager =
4143
new FullSyncTargetManager(

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ public FullSyncDownloadPipelineFactory(
6767
this.ethContext = ethContext;
6868
this.metricsSystem = metricsSystem;
6969
this.fullSyncTerminationCondition = syncTerminationCondition;
70-
betterSyncTargetEvaluator = new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers());
70+
this.betterSyncTargetEvaluator =
71+
new BetterSyncTargetEvaluator(syncConfig, ethContext.getEthPeers());
7172
}
7273

7374
@Override
@@ -105,7 +106,7 @@ public Pipeline<?> createDownloadPipelineForSyncTarget(final SyncTarget target)
105106
final RangeHeadersValidationStep validateHeadersJoinUpStep =
106107
new RangeHeadersValidationStep(protocolSchedule, protocolContext, detachedValidationPolicy);
107108
final DownloadBodiesStep downloadBodiesStep =
108-
new DownloadBodiesStep(protocolSchedule, ethContext, metricsSystem);
109+
new DownloadBodiesStep(protocolSchedule, ethContext, syncConfig, metricsSystem);
109110
final ExtractTxSignaturesStep extractTxSignaturesStep = new ExtractTxSignaturesStep();
110111
final FullImportBlockStep importBlockStep =
111112
new FullImportBlockStep(

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import org.hyperledger.besu.ethereum.ProtocolContext;
1818
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
19+
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
1920
import org.hyperledger.besu.ethereum.eth.sync.ChainDownloader;
2021
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
2122
import org.hyperledger.besu.ethereum.eth.sync.TrailingPeerRequirements;
@@ -45,6 +46,7 @@ public FullSyncDownloader(
4546
final SyncState syncState,
4647
final MetricsSystem metricsSystem,
4748
final SyncTerminationCondition terminationCondition,
49+
final PeerTaskExecutor peerTaskExecutor,
4850
final SyncDurationMetrics syncDurationMetrics) {
4951
this.syncConfig = syncConfig;
5052
this.protocolContext = protocolContext;
@@ -59,7 +61,8 @@ public FullSyncDownloader(
5961
syncState,
6062
metricsSystem,
6163
terminationCondition,
62-
syncDurationMetrics);
64+
syncDurationMetrics,
65+
peerTaskExecutor);
6366
}
6467

6568
public CompletableFuture<Void> start() {

0 commit comments

Comments
 (0)