Skip to content
This repository was archived by the owner on Jul 1, 2025. It is now read-only.

Commit 16de069

Browse files
Matilda-Clerkegaryschulte
authored andcommitted
Replace waitforpeertask with ethpeers method (#8009)
* 7582: Add waitForPeer method to PeerSelector and EthPeers Signed-off-by: Matilda Clerke <[email protected]> * 7582: Replace all usages of WaitForPeer[s]Task with new EthPeers.waitForPeer method Signed-off-by: Matilda Clerke <[email protected]> * 7582: Fix PivotBlockConfirmerTest Signed-off-by: Matilda Clerke <[email protected]> * 7582: spotless Signed-off-by: Matilda Clerke <[email protected]> * 7582: Fix broken PivotBlockRetrieverTest Signed-off-by: Matilda Clerke <[email protected]> * 7582: Fix broken FastSyncActionsTest Signed-off-by: Matilda Clerke <[email protected]> * 7582: spotless Signed-off-by: Matilda Clerke <[email protected]> * 7582: Fix issues after merge Signed-off-by: Matilda Clerke <[email protected]> * 7582: Put AbstractSyncTargetManager.waitForPeerAndThenSetSyncTarget code back separate thread to avoid infinite loop waiting for peers during acceptance tests Signed-off-by: Matilda Clerke <[email protected]> * 7582: Remove pivot block checks when waiting for peer in FastSyncActions Signed-off-by: Matilda Clerke <[email protected]> * 7582: Remove estimated chain height check from PivotBlockConfirmer when waiting for peers Signed-off-by: Matilda Clerke <[email protected]> * 7582: Fix broken PivotBlockRetrieverTest Signed-off-by: Matilda Clerke <[email protected]> * Use isSuitablePeer as peer selection criteria when waiting for a peer in AbstractRetryingPeerTask Signed-off-by: Matilda Clerke <[email protected]> * Remove MetricsSystem from PivotSelectorFromPeers Signed-off-by: Matilda Clerke <[email protected]> --------- Signed-off-by: Matilda Clerke <[email protected]>
1 parent c9a9f76 commit 16de069

File tree

18 files changed

+120
-538
lines changed

18 files changed

+120
-538
lines changed

besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -923,7 +923,6 @@ private PivotBlockSelector createPivotSelector(
923923
ethContext,
924924
syncConfig,
925925
syncState,
926-
metricsSystem,
927926
protocolContext,
928927
nodeKey,
929928
blockchain.getChainHeadHeader());
@@ -953,7 +952,7 @@ private PivotBlockSelector createPivotSelector(
953952
unsubscribeForkchoiceListener);
954953
} else {
955954
LOG.info("TTD difficulty is not present, creating initial sync phase for PoW");
956-
return new PivotSelectorFromPeers(ethContext, syncConfig, syncState, metricsSystem);
955+
return new PivotSelectorFromPeers(ethContext, syncConfig, syncState);
957956
}
958957
}
959958

consensus/qbft/src/main/java/org/hyperledger/besu/consensus/qbft/BFTPivotSelectorFromPeers.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.hyperledger.besu.ethereum.eth.sync.fastsync.NoSyncRequiredException;
2727
import org.hyperledger.besu.ethereum.eth.sync.fastsync.PivotSelectorFromPeers;
2828
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
29-
import org.hyperledger.besu.plugin.services.MetricsSystem;
3029

3130
import java.util.Optional;
3231
import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,7 +55,6 @@ public class BFTPivotSelectorFromPeers extends PivotSelectorFromPeers {
5655
* @param ethContext the eth context
5756
* @param syncConfig the sync config
5857
* @param syncState the sync state
59-
* @param metricsSystem the metrics
6058
* @param protocolContext the protocol context
6159
* @param nodeKey the node key
6260
* @param blockHeader the block header
@@ -65,11 +63,10 @@ public BFTPivotSelectorFromPeers(
6563
final EthContext ethContext,
6664
final SynchronizerConfiguration syncConfig,
6765
final SyncState syncState,
68-
final MetricsSystem metricsSystem,
6966
final ProtocolContext protocolContext,
7067
final NodeKey nodeKey,
7168
final BlockHeader blockHeader) {
72-
super(ethContext, syncConfig, syncState, metricsSystem);
69+
super(ethContext, syncConfig, syncState);
7370
this.protocolContext = protocolContext;
7471
this.blockHeader = blockHeader;
7572
this.nodeKey = nodeKey;

consensus/qbft/src/test/java/org/hyperledger/besu/consensus/qbft/sync/QbftPivotSelectorTest.java

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
3434
import org.hyperledger.besu.ethereum.eth.sync.fastsync.NoSyncRequiredException;
3535
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
36-
import org.hyperledger.besu.plugin.services.MetricsSystem;
3736

3837
import java.util.ArrayList;
3938
import java.util.List;
@@ -55,7 +54,6 @@ public class QbftPivotSelectorTest {
5554
@Mock private ProtocolContext protocolContext;
5655
@Mock private BftContext bftContext;
5756
@Mock private SyncState syncState;
58-
@Mock private MetricsSystem metricsSystem;
5957
@Mock private EthContext ethContext;
6058
@Mock private EthPeers ethPeers;
6159
@Mock private ValidatorProvider validatorProvider;
@@ -80,13 +78,7 @@ public void returnEmptySyncStateIfValidatorWithOtherValidatorsButNoPeers() {
8078
when(validatorProvider.getValidatorsAtHead()).thenReturn(validatorList);
8179
BFTPivotSelectorFromPeers pivotSelector =
8280
new BFTPivotSelectorFromPeers(
83-
ethContext,
84-
syncConfig,
85-
syncState,
86-
metricsSystem,
87-
protocolContext,
88-
nodeKey,
89-
blockHeader);
81+
ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader);
9082
Optional<FastSyncState> pivotState = pivotSelector.selectNewPivotBlock();
9183
assertThat(pivotState.isEmpty()).isTrue();
9284
}
@@ -104,13 +96,7 @@ public void returnNoSyncRequiredIfOnlyValidatorAndNoPeers() {
10496
when(validatorProvider.getValidatorsAtHead()).thenReturn(validatorList);
10597
BFTPivotSelectorFromPeers pivotSelector =
10698
new BFTPivotSelectorFromPeers(
107-
ethContext,
108-
syncConfig,
109-
syncState,
110-
metricsSystem,
111-
protocolContext,
112-
nodeKey,
113-
blockHeader);
99+
ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader);
114100

115101
try {
116102
Optional<FastSyncState> pivotState = pivotSelector.selectNewPivotBlock();
@@ -126,13 +112,7 @@ public void returnEmptySyncStateIfNonValidatorWithNoBestPeer() {
126112
when(validatorProvider.nodeIsValidator(any())).thenReturn(false);
127113
BFTPivotSelectorFromPeers pivotSelector =
128114
new BFTPivotSelectorFromPeers(
129-
ethContext,
130-
syncConfig,
131-
syncState,
132-
metricsSystem,
133-
protocolContext,
134-
nodeKey,
135-
blockHeader);
115+
ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader);
136116

137117
Optional<FastSyncState> pivotState = pivotSelector.selectNewPivotBlock();
138118
assertThat(pivotState.isEmpty()).isTrue();
@@ -145,13 +125,7 @@ public void returnEmptySyncStateIfValidatorAndNotAtGenesisAndOtherValidators() {
145125
when(blockHeader.getNumber()).thenReturn(10L);
146126
BFTPivotSelectorFromPeers pivotSelector =
147127
new BFTPivotSelectorFromPeers(
148-
ethContext,
149-
syncConfig,
150-
syncState,
151-
metricsSystem,
152-
protocolContext,
153-
nodeKey,
154-
blockHeader);
128+
ethContext, syncConfig, syncState, protocolContext, nodeKey, blockHeader);
155129

156130
Optional<FastSyncState> pivotState = pivotSelector.selectNewPivotBlock();
157131
assertThat(pivotState.isEmpty()).isTrue();

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,40 @@ public Optional<EthPeer> getPeer(final Predicate<EthPeer> filter) {
474474
.min(LEAST_TO_MOST_BUSY);
475475
}
476476

477+
// Part of the PeerSelector interface, to be split apart later
478+
@Override
479+
public CompletableFuture<EthPeer> waitForPeer(final Predicate<EthPeer> filter) {
480+
final CompletableFuture<EthPeer> future = new CompletableFuture<>();
481+
LOG.debug("Waiting for peer matching filter. {} peers currently connected.", peerCount());
482+
// check for an existing peer matching the filter and use that if one is found
483+
Optional<EthPeer> maybePeer = getPeer(filter);
484+
if (maybePeer.isPresent()) {
485+
LOG.debug("Found peer matching filter already connected!");
486+
future.complete(maybePeer.get());
487+
} else {
488+
// no existing peer matches our filter. Subscribe to new connections until we find one
489+
LOG.debug("Subscribing to new peer connections to wait until one matches filter");
490+
final long subscriptionId =
491+
subscribeConnect(
492+
(peer) -> {
493+
if (!future.isDone() && filter.test(peer)) {
494+
LOG.debug("Found new peer matching filter!");
495+
future.complete(peer);
496+
} else {
497+
LOG.debug("New peer does not match filter");
498+
}
499+
});
500+
future.handle(
501+
(peer, throwable) -> {
502+
LOG.debug("Unsubscribing from new peer connections with ID {}", subscriptionId);
503+
unsubscribeConnect(subscriptionId);
504+
return null;
505+
});
506+
}
507+
508+
return future;
509+
}
510+
477511
// Part of the PeerSelector interface, to be split apart later
478512
@Override
479513
public Optional<EthPeer> getPeerByPeerId(final PeerId peerId) {

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/peertask/PeerSelector.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;
1919

2020
import java.util.Optional;
21+
import java.util.concurrent.CompletableFuture;
2122
import java.util.function.Predicate;
2223

2324
/** Selects the EthPeers for the PeerTaskExecutor */
@@ -31,6 +32,14 @@ public interface PeerSelector {
3132
*/
3233
Optional<EthPeer> getPeer(final Predicate<EthPeer> filter);
3334

35+
/**
36+
* Waits for a peer matching the supplied filter
37+
*
38+
* @param filter a Predicate\<EthPeer\> matching desirable peers
39+
* @return a CompletableFuture into which a peer will be placed
40+
*/
41+
CompletableFuture<EthPeer> waitForPeer(final Predicate<EthPeer> filter);
42+
3443
/**
3544
* Attempts to get the EthPeer identified by peerId
3645
*

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.time.Duration;
2727
import java.util.Optional;
2828
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.TimeUnit;
2930
import java.util.concurrent.TimeoutException;
3031
import java.util.function.Predicate;
3132

@@ -129,13 +130,12 @@ protected void handleTaskError(final Throwable error) {
129130
"No useful peer found, wait max 5 seconds for new peer to connect: current peers {}",
130131
ethContext.getEthPeers().peerCount());
131132

132-
final WaitForPeerTask waitTask = WaitForPeerTask.create(ethContext, metricsSystem);
133133
executeSubTask(
134134
() ->
135135
ethContext
136-
.getScheduler()
137-
// wait for a new peer for up to 5 seconds
138-
.timeout(waitTask, Duration.ofSeconds(5))
136+
.getEthPeers()
137+
.waitForPeer(this::isSuitablePeer)
138+
.orTimeout(5, TimeUnit.SECONDS)
139139
// execute the task again
140140
.whenComplete((r, t) -> executeTaskTimed()));
141141
return;

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeerTask.java

Lines changed: 0 additions & 64 deletions
This file was deleted.

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/WaitForPeersTask.java

Lines changed: 0 additions & 83 deletions
This file was deleted.

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/AbstractSyncTargetManager.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.hyperledger.besu.ethereum.ProtocolContext;
2020
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
2121
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
22-
import org.hyperledger.besu.ethereum.eth.manager.task.WaitForPeerTask;
2322
import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget;
2423
import org.hyperledger.besu.ethereum.eth.sync.tasks.DetermineCommonAncestorTask;
2524
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
@@ -117,13 +116,16 @@ protected Optional<SyncTarget> finalizeSelectedSyncTarget(final SyncTarget syncT
117116
protected abstract CompletableFuture<Optional<EthPeer>> selectBestAvailableSyncTarget();
118117

119118
private CompletableFuture<SyncTarget> waitForPeerAndThenSetSyncTarget() {
120-
return waitForNewPeer().handle((r, t) -> r).thenCompose((r) -> findSyncTarget());
121-
}
122-
123-
private CompletableFuture<?> waitForNewPeer() {
124119
return ethContext
125120
.getScheduler()
126-
.timeout(WaitForPeerTask.create(ethContext, metricsSystem), Duration.ofSeconds(5));
121+
.scheduleFutureTask(
122+
() ->
123+
ethContext
124+
.getEthPeers()
125+
.waitForPeer((peer) -> true)
126+
.handle((ignored, ignored2) -> null)
127+
.thenCompose((r) -> findSyncTarget()),
128+
Duration.ofSeconds(5));
127129
}
128130

129131
private boolean isCancelled() {

0 commit comments

Comments
 (0)