Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
395 changes: 392 additions & 3 deletions src/herder/ParallelTxSetBuilder.cpp

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions src/herder/ParallelTxSetBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "herder/TxSetFrame.h"
#include "ledger/NetworkConfig.h"
#include "main/Config.h"
#include "util/BitSet.h"

namespace stellar
{
Expand All @@ -26,4 +27,24 @@ TxStageFrameList buildSurgePricedParallelSorobanPhase(
std::shared_ptr<SurgePricingLaneConfig> laneConfig,
std::vector<bool>& hadTxNotFittingLane, uint32_t ledgerVersion);

#ifdef BUILD_TESTS
// Test function that builds a simple TxStageFrameList with fixed parallelism
// and target number of stages, ignoring resource limits and surge pricing.
// - txFrames: input transactions to partition
// - clustersPerStage: fixed number of clusters per stage
// - targetStageCount: target number of stages (actual may be more if txs spill
// over) Assigns transactions to non-conflicting clusters sequentially,
// buffering any that don't fit due to conflicts for the next stage.
TxStageFrameList buildSimpleParallelTxStages(TxFrameList const& txFrames,
uint32_t clustersPerStage,
uint32_t targetStageCount);

// Test-only export of the core algorithm that works with indices
// Returns stages[stage][cluster][tx_index]
std::vector<std::vector<std::vector<size_t>>>
testBuildSimpleParallelStagesFromIndices(
std::vector<BitSet> const& conflictSets, uint32_t clustersPerStage,
uint32_t targetStageCount);
#endif

} // namespace stellar
135 changes: 75 additions & 60 deletions src/herder/TxSetFrame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,65 +475,6 @@ computeLaneBaseFee(TxSetPhase phase, LedgerHeader const& ledgerHeader,
return laneBaseFee;
}

std::shared_ptr<SurgePricingLaneConfig>
createSurgePricingLangeConfig(TxSetPhase phase, Application& app)
{
ZoneScoped;
releaseAssert(threadIsMain());
releaseAssert(!app.getLedgerManager().isApplying());

auto const& lclHeader =
app.getLedgerManager().getLastClosedLedgerHeader().header;
std::vector<bool> hadTxNotFittingLane;
std::shared_ptr<SurgePricingLaneConfig> surgePricingLaneConfig;
if (phase == TxSetPhase::CLASSIC)
{
auto maxOps = Resource(
{static_cast<uint32_t>(
app.getLedgerManager().getLastMaxTxSetSizeOps()),
static_cast<uint32_t>(app.getConfig().getClassicByteAllowance())});
std::optional<Resource> dexOpsLimit;
if (app.getConfig().MAX_DEX_TX_OPERATIONS_IN_TX_SET)
{
// DEX operations limit implies that DEX transactions should
// compete with each other in in a separate fee lane, which
// is only possible with generalized tx set.
dexOpsLimit =
Resource({*app.getConfig().MAX_DEX_TX_OPERATIONS_IN_TX_SET,
MAX_CLASSIC_BYTE_ALLOWANCE});
}

surgePricingLaneConfig =
std::make_shared<DexLimitingLaneConfig>(maxOps, dexOpsLimit);
}
else
{
releaseAssert(phase == TxSetPhase::SOROBAN);

auto limits = app.getLedgerManager().maxLedgerResources(
/* isSoroban */ true);
// When building Soroban tx sets with parallel execution support,
// instructions are accounted for by the build logic, not by the surge
// pricing config, so we need to relax the instruction limit in surge
// pricing logic.
if (protocolVersionStartsFrom(lclHeader.ledgerVersion,
PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION))
{
limits.setVal(Resource::Type::INSTRUCTIONS,
std::numeric_limits<int64_t>::max());
}

auto byteLimit = std::min(
static_cast<int64_t>(app.getConfig().getSorobanByteAllowance()),
limits.getVal(Resource::Type::TX_BYTE_SIZE));
limits.setVal(Resource::Type::TX_BYTE_SIZE, byteLimit);

surgePricingLaneConfig =
std::make_shared<SorobanGenericLaneConfig>(limits);
}
return surgePricingLaneConfig;
}

TxFrameList
buildSurgePricedSequentialPhase(
TxFrameList const& txs,
Expand All @@ -556,7 +497,7 @@ applySurgePricing(TxSetPhase phase, TxFrameList const& txs, Application& app
)
{
ZoneScoped;
auto surgePricingLaneConfig = createSurgePricingLangeConfig(phase, app);
auto surgePricingLaneConfig = createSurgePricingLaneConfig(phase, app);
std::vector<bool> hadTxNotFittingLane;
uint32_t ledgerVersion =
app.getLedgerManager().getLastClosedLedgerHeader().header.ledgerVersion;
Expand Down Expand Up @@ -752,6 +693,74 @@ checkFeeMap(InclusionFeeMap const& feeMap, LedgerHeader const& lclHeader)

} // namespace

std::shared_ptr<SurgePricingLaneConfig>
createSurgePricingLaneConfig(TxSetPhase phase, Application& app
#ifdef BUILD_TESTS
,
bool forceParallel
#endif
)
{
ZoneScoped;
releaseAssert(threadIsMain());
releaseAssert(!app.getLedgerManager().isApplying());

auto const& lclHeader =
app.getLedgerManager().getLastClosedLedgerHeader().header;
std::vector<bool> hadTxNotFittingLane;
std::shared_ptr<SurgePricingLaneConfig> surgePricingLaneConfig;
if (phase == TxSetPhase::CLASSIC)
{
auto maxOps = Resource(
{static_cast<uint32_t>(
app.getLedgerManager().getLastMaxTxSetSizeOps()),
static_cast<uint32_t>(app.getConfig().getClassicByteAllowance())});
std::optional<Resource> dexOpsLimit;
if (app.getConfig().MAX_DEX_TX_OPERATIONS_IN_TX_SET)
{
// DEX operations limit implies that DEX transactions should
// compete with each other in in a separate fee lane, which
// is only possible with generalized tx set.
dexOpsLimit =
Resource({*app.getConfig().MAX_DEX_TX_OPERATIONS_IN_TX_SET,
MAX_CLASSIC_BYTE_ALLOWANCE});
}

surgePricingLaneConfig =
std::make_shared<DexLimitingLaneConfig>(maxOps, dexOpsLimit);
}
else
{
releaseAssert(phase == TxSetPhase::SOROBAN);

auto limits = app.getLedgerManager().maxLedgerResources(
/* isSoroban */ true);
// When building Soroban tx sets with parallel execution support,
// instructions are accounted for by the build logic, not by the surge
// pricing config, so we need to relax the instruction limit in surge
// pricing logic.
if (protocolVersionStartsFrom(lclHeader.ledgerVersion,
PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION)
#ifdef BUILD_TESTS
|| forceParallel
#endif
)
{
limits.setVal(Resource::Type::INSTRUCTIONS,
std::numeric_limits<int64_t>::max());
}

auto byteLimit = std::min(
static_cast<int64_t>(app.getConfig().getSorobanByteAllowance()),
limits.getVal(Resource::Type::TX_BYTE_SIZE));
limits.setVal(Resource::Type::TX_BYTE_SIZE, byteLimit);

surgePricingLaneConfig =
std::make_shared<SorobanGenericLaneConfig>(limits);
}
return surgePricingLaneConfig;
}

TxSetXDRFrame::TxSetXDRFrame(TransactionSet const& xdrTxSet)
: mXDRTxSet(xdrTxSet)
, mEncodedSize(xdr::xdr_argpack_size(xdrTxSet))
Expand Down Expand Up @@ -1652,6 +1661,12 @@ TxSetPhaseFrame::isParallel() const
return mIsParallel;
}

bool
TxSetPhaseFrame::isSoroban() const
{
return mPhase == TxSetPhase::SOROBAN;
}

TxStageFrameList const&
TxSetPhaseFrame::getParallelStages() const
{
Expand Down
10 changes: 10 additions & 0 deletions src/herder/TxSetFrame.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ makeTxSetFromTransactions(
txtest::ParallelSorobanOrder const& parallelSorobanOrder = {});
#endif

std::shared_ptr<SurgePricingLaneConfig>
createSurgePricingLaneConfig(TxSetPhase phase, Application& app
#ifdef BUILD_TESTS
,
bool forceParallel = false
#endif
);

// `TxSetFrame` is a wrapper around `TransactionSet` or
// `GeneralizedTransactionSet` XDR.
//
Expand Down Expand Up @@ -250,6 +258,8 @@ class TxSetPhaseFrame
// PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION protocol
bool isParallel() const;

bool isSoroban() const;

// Returns the parallel stages of this phase.
//
// This may only be called when `isParallel()` is true.
Expand Down
Loading