Skip to content
This repository was archived by the owner on Jul 1, 2025. It is now read-only.

Commit 910ad30

Browse files
matktgaryschulte
authored andcommitted
add fallback for parallelization (#8084)
Signed-off-by: Karim Taam <[email protected]>
1 parent 92ac37f commit 910ad30

File tree

3 files changed

+130
-58
lines changed

3 files changed

+130
-58
lines changed

besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1816,10 +1816,8 @@ public BesuControllerBuilder setupControllerBuilder() {
18161816
if (DataStorageFormat.BONSAI.equals(getDataStorageConfiguration().getDataStorageFormat())) {
18171817
final DiffBasedSubStorageConfiguration subStorageConfiguration =
18181818
getDataStorageConfiguration().getDiffBasedSubStorageConfiguration();
1819-
if (subStorageConfiguration.getLimitTrieLogsEnabled()) {
1820-
besuControllerBuilder.isParallelTxProcessingEnabled(
1821-
subStorageConfiguration.getUnstable().isParallelTxProcessingEnabled());
1822-
}
1819+
besuControllerBuilder.isParallelTxProcessingEnabled(
1820+
subStorageConfiguration.getUnstable().isParallelTxProcessingEnabled());
18231821
}
18241822
return besuControllerBuilder;
18251823
}

ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/AbstractBlockProcessor.java

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.hyperledger.besu.ethereum.core.Transaction;
2929
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
3030
import org.hyperledger.besu.ethereum.core.Withdrawal;
31+
import org.hyperledger.besu.ethereum.mainnet.AbstractBlockProcessor.PreprocessingFunction.NoPreprocessing;
3132
import org.hyperledger.besu.ethereum.mainnet.requests.ProcessRequestContext;
3233
import org.hyperledger.besu.ethereum.mainnet.requests.RequestProcessorCoordinator;
3334
import org.hyperledger.besu.ethereum.privacy.storage.PrivateMetadataUpdater;
@@ -99,6 +100,26 @@ public BlockProcessingResult processBlock(
99100
final List<BlockHeader> ommers,
100101
final Optional<List<Withdrawal>> maybeWithdrawals,
101102
final PrivateMetadataUpdater privateMetadataUpdater) {
103+
return processBlock(
104+
blockchain,
105+
worldState,
106+
blockHeader,
107+
transactions,
108+
ommers,
109+
maybeWithdrawals,
110+
privateMetadataUpdater,
111+
new NoPreprocessing());
112+
}
113+
114+
protected BlockProcessingResult processBlock(
115+
final Blockchain blockchain,
116+
final MutableWorldState worldState,
117+
final BlockHeader blockHeader,
118+
final List<Transaction> transactions,
119+
final List<BlockHeader> ommers,
120+
final Optional<List<Withdrawal>> maybeWithdrawals,
121+
final PrivateMetadataUpdater privateMetadataUpdater,
122+
final PreprocessingFunction preprocessingBlockFunction) {
102123
final List<TransactionReceipt> receipts = new ArrayList<>();
103124
long currentGasUsed = 0;
104125
long currentBlobGasUsed = 0;
@@ -125,7 +146,7 @@ public BlockProcessingResult processBlock(
125146
.orElse(Wei.ZERO);
126147

127148
final Optional<PreprocessingContext> preProcessingContext =
128-
runBlockPreProcessing(
149+
preprocessingBlockFunction.run(
129150
worldState,
130151
privateMetadataUpdater,
131152
blockHeader,
@@ -255,17 +276,6 @@ public BlockProcessingResult processBlock(
255276
parallelizedTxFound ? Optional.of(nbParallelTx) : Optional.empty());
256277
}
257278

258-
protected Optional<PreprocessingContext> runBlockPreProcessing(
259-
final MutableWorldState worldState,
260-
final PrivateMetadataUpdater privateMetadataUpdater,
261-
final BlockHeader blockHeader,
262-
final List<Transaction> transactions,
263-
final Address miningBeneficiary,
264-
final BlockHashLookup blockHashLookup,
265-
final Wei blobGasPrice) {
266-
return Optional.empty();
267-
}
268-
269279
protected TransactionProcessingResult getTransactionProcessingResult(
270280
final Optional<PreprocessingContext> preProcessingContext,
271281
final MutableWorldState worldState,
@@ -318,5 +328,30 @@ abstract boolean rewardCoinbase(
318328
final boolean skipZeroBlockRewards);
319329

320330
public interface PreprocessingContext {}
321-
;
331+
332+
public interface PreprocessingFunction {
333+
Optional<PreprocessingContext> run(
334+
final MutableWorldState worldState,
335+
final PrivateMetadataUpdater privateMetadataUpdater,
336+
final BlockHeader blockHeader,
337+
final List<Transaction> transactions,
338+
final Address miningBeneficiary,
339+
final BlockHashLookup blockHashLookup,
340+
final Wei blobGasPrice);
341+
342+
class NoPreprocessing implements PreprocessingFunction {
343+
344+
@Override
345+
public Optional<PreprocessingContext> run(
346+
final MutableWorldState worldState,
347+
final PrivateMetadataUpdater privateMetadataUpdater,
348+
final BlockHeader blockHeader,
349+
final List<Transaction> transactions,
350+
final Address miningBeneficiary,
351+
final BlockHashLookup blockHashLookup,
352+
final Wei blobGasPrice) {
353+
return Optional.empty();
354+
}
355+
}
356+
}
322357
}

ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/parallelization/MainnetParallelBlockProcessor.java

Lines changed: 80 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616

1717
import org.hyperledger.besu.datatypes.Address;
1818
import org.hyperledger.besu.datatypes.Wei;
19+
import org.hyperledger.besu.ethereum.BlockProcessingResult;
20+
import org.hyperledger.besu.ethereum.chain.Blockchain;
1921
import org.hyperledger.besu.ethereum.core.BlockHeader;
2022
import org.hyperledger.besu.ethereum.core.MutableWorldState;
2123
import org.hyperledger.besu.ethereum.core.Transaction;
24+
import org.hyperledger.besu.ethereum.core.Withdrawal;
25+
import org.hyperledger.besu.ethereum.mainnet.AbstractBlockProcessor.PreprocessingFunction.NoPreprocessing;
2226
import org.hyperledger.besu.ethereum.mainnet.BlockProcessor;
2327
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockProcessor;
2428
import org.hyperledger.besu.ethereum.mainnet.MainnetTransactionProcessor;
@@ -37,8 +41,13 @@
3741
import java.util.List;
3842
import java.util.Optional;
3943

44+
import org.slf4j.Logger;
45+
import org.slf4j.LoggerFactory;
46+
4047
public class MainnetParallelBlockProcessor extends MainnetBlockProcessor {
4148

49+
private static final Logger LOG = LoggerFactory.getLogger(MainnetParallelBlockProcessor.class);
50+
4251
private final Optional<MetricsSystem> metricsSystem;
4352
private final Optional<Counter> confirmedParallelizedTransactionCounter;
4453
private final Optional<Counter> conflictingButCachedTransactionCounter;
@@ -78,34 +87,6 @@ public MainnetParallelBlockProcessor(
7887
"Counter for the number of conflicted transactions during block processing"));
7988
}
8089

81-
@Override
82-
protected Optional<PreprocessingContext> runBlockPreProcessing(
83-
final MutableWorldState worldState,
84-
final PrivateMetadataUpdater privateMetadataUpdater,
85-
final BlockHeader blockHeader,
86-
final List<Transaction> transactions,
87-
final Address miningBeneficiary,
88-
final BlockHashLookup blockHashLookup,
89-
final Wei blobGasPrice) {
90-
if ((worldState instanceof DiffBasedWorldState)) {
91-
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor =
92-
new ParallelizedConcurrentTransactionProcessor(transactionProcessor);
93-
// runAsyncBlock, if activated, facilitates the non-blocking parallel execution of
94-
// transactions in the background through an optimistic strategy.
95-
parallelizedConcurrentTransactionProcessor.runAsyncBlock(
96-
worldState,
97-
blockHeader,
98-
transactions,
99-
miningBeneficiary,
100-
blockHashLookup,
101-
blobGasPrice,
102-
privateMetadataUpdater);
103-
return Optional.of(
104-
new ParallelizedPreProcessingContext(parallelizedConcurrentTransactionProcessor));
105-
}
106-
return Optional.empty();
107-
}
108-
10990
@Override
11091
protected TransactionProcessingResult getTransactionProcessingResult(
11192
final Optional<PreprocessingContext> preProcessingContext,
@@ -126,7 +107,7 @@ protected TransactionProcessingResult getTransactionProcessingResult(
126107
(ParallelizedPreProcessingContext) preProcessingContext.get();
127108
transactionProcessingResult =
128109
parallelizedPreProcessingContext
129-
.getParallelizedConcurrentTransactionProcessor()
110+
.parallelizedConcurrentTransactionProcessor()
130111
.applyParallelizedTransactionResult(
131112
worldState,
132113
miningBeneficiary,
@@ -154,21 +135,48 @@ protected TransactionProcessingResult getTransactionProcessingResult(
154135
}
155136
}
156137

157-
static class ParallelizedPreProcessingContext implements PreprocessingContext {
158-
final ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor;
159-
160-
public ParallelizedPreProcessingContext(
161-
final ParallelizedConcurrentTransactionProcessor
162-
parallelizedConcurrentTransactionProcessor) {
163-
this.parallelizedConcurrentTransactionProcessor = parallelizedConcurrentTransactionProcessor;
164-
}
165-
166-
public ParallelizedConcurrentTransactionProcessor
167-
getParallelizedConcurrentTransactionProcessor() {
168-
return parallelizedConcurrentTransactionProcessor;
138+
@Override
139+
public BlockProcessingResult processBlock(
140+
final Blockchain blockchain,
141+
final MutableWorldState worldState,
142+
final BlockHeader blockHeader,
143+
final List<Transaction> transactions,
144+
final List<BlockHeader> ommers,
145+
final Optional<List<Withdrawal>> maybeWithdrawals,
146+
final PrivateMetadataUpdater privateMetadataUpdater) {
147+
final BlockProcessingResult blockProcessingResult =
148+
super.processBlock(
149+
blockchain,
150+
worldState,
151+
blockHeader,
152+
transactions,
153+
ommers,
154+
maybeWithdrawals,
155+
privateMetadataUpdater,
156+
new ParallelTransactionPreprocessing());
157+
if (blockProcessingResult.isFailed()) {
158+
// Fallback to non-parallel processing if there is a block processing exception .
159+
LOG.info(
160+
"Parallel transaction processing failure. Falling back to non-parallel processing for block #{} ({})",
161+
blockHeader.getNumber(),
162+
blockHeader.getBlockHash());
163+
return super.processBlock(
164+
blockchain,
165+
worldState,
166+
blockHeader,
167+
transactions,
168+
ommers,
169+
maybeWithdrawals,
170+
privateMetadataUpdater,
171+
new NoPreprocessing());
169172
}
173+
return blockProcessingResult;
170174
}
171175

176+
record ParallelizedPreProcessingContext(
177+
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor)
178+
implements PreprocessingContext {}
179+
172180
public static class ParallelBlockProcessorBuilder
173181
implements ProtocolSpecBuilder.BlockProcessorBuilder {
174182

@@ -196,4 +204,35 @@ public BlockProcessor apply(
196204
metricsSystem);
197205
}
198206
}
207+
208+
class ParallelTransactionPreprocessing implements PreprocessingFunction {
209+
210+
@Override
211+
public Optional<PreprocessingContext> run(
212+
final MutableWorldState worldState,
213+
final PrivateMetadataUpdater privateMetadataUpdater,
214+
final BlockHeader blockHeader,
215+
final List<Transaction> transactions,
216+
final Address miningBeneficiary,
217+
final BlockHashLookup blockHashLookup,
218+
final Wei blobGasPrice) {
219+
if ((worldState instanceof DiffBasedWorldState)) {
220+
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor =
221+
new ParallelizedConcurrentTransactionProcessor(transactionProcessor);
222+
// When enabled, runAsyncBlock performs non-conflicting parallel execution of transactions
223+
// in the background using an optimistic approach.
224+
parallelizedConcurrentTransactionProcessor.runAsyncBlock(
225+
worldState,
226+
blockHeader,
227+
transactions,
228+
miningBeneficiary,
229+
blockHashLookup,
230+
blobGasPrice,
231+
privateMetadataUpdater);
232+
return Optional.of(
233+
new ParallelizedPreProcessingContext(parallelizedConcurrentTransactionProcessor));
234+
}
235+
return Optional.empty();
236+
}
237+
}
199238
}

0 commit comments

Comments
 (0)