Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cfg/xrpld-example.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,13 @@
#
# Configures the number of threads for performing nodestore prefetching.
#
# [path_workers]
#
# Configures the maximum number of concurrently running jobs of type
# jtUPDATE_PF in the JobQueue. This impacts path update and full order book
# update throughput. Default: 2.
#
# Maximum value is 3/4 of [workers] (with a minimum of 2).
#
#
Comment on lines +823 to 825
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example config says the maximum is “3/4 of [workers]”, but the code enforces the limit against the effective job-queue worker count (which is auto-derived when [workers] isn’t explicitly set). To avoid misleading operators, consider documenting that the cap is based on the effective worker count (explicit [workers] or the auto-selected default).

Suggested change
# Maximum value is 3/4 of [workers] (with a minimum of 2).
#
#
# Maximum value is 3/4 of the effective JobQueue worker count (explicit
# [workers] or the auto-selected default), with a minimum of 2.
#

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think this is more confusing.

# [network_id]
Expand Down
11 changes: 10 additions & 1 deletion include/xrpl/core/JobQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class JobQueue : private Workers::Callback

JobQueue(
int threadCount,
int updatePathsJobLimit,
beast::insight::Collector::ptr const& collector,
beast::Journal journal,
Logs& logs,
Expand Down Expand Up @@ -181,6 +182,12 @@ class JobQueue : private Workers::Callback
int
getJobCountGE(JobType t) const;

int
getUpdatePathsJobLimit() const
{
return updatePathsJobLimit_;
}

/** Return a scoped LoadEvent.
*/
std::unique_ptr<LoadEvent>
Expand Down Expand Up @@ -238,6 +245,8 @@ class JobQueue : private Workers::Callback
// The number of suspended coroutines
int nSuspend_ = 0;

int updatePathsJobLimit_;

Workers m_workers;

// Statistics tracking
Expand Down Expand Up @@ -316,7 +325,7 @@ class JobQueue : private Workers::Callback
// Returns the limit of running jobs for the given job type.
// For jobs with no limit, we return the largest int. Hopefully that
// will be enough.
static int
int
getJobLimit(JobType type);
};

Expand Down
5 changes: 5 additions & 0 deletions src/libxrpl/core/detail/JobQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace xrpl {

JobQueue::JobQueue(
int threadCount,
int updatePathsJobLimit,
beast::insight::Collector::ptr const& collector,
beast::Journal journal,
Logs& logs,
Expand All @@ -16,6 +17,7 @@ JobQueue::JobQueue(
, m_lastJob(0)
, m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
, m_processCount(0)
, updatePathsJobLimit_(updatePathsJobLimit > 0 ? updatePathsJobLimit : 1)
, m_workers(*this, &perfLog, "JobQueue", threadCount)
, perfLog_(perfLog)
, m_collector(collector)
Expand Down Expand Up @@ -381,6 +383,9 @@ JobQueue::processTask(int instance)
int
JobQueue::getJobLimit(JobType type)
{
if (type == jtUPDATE_PF)
return updatePathsJobLimit_;

JobTypeInfo const& j(JobTypes::instance().get(type));
XRPL_ASSERT(j.type() != jtINVALID, "xrpl::JobQueue::getJobLimit : valid job type");

Expand Down
9 changes: 8 additions & 1 deletion src/xrpld/app/ledger/detail/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1472,7 +1472,14 @@ LedgerMaster::newOrderBookDB()
bool
LedgerMaster::newPFWork(char const* name, std::unique_lock<std::recursive_mutex>&)
{
if (!app_.isStopping() && mPathFindThread < 2 && app_.getPathRequestManager().requestsPending())
auto const maxPathFindThreads = std::max(
1,
std::min(
app_.config().PATH_WORKERS,
app_.getJobQueue().getUpdatePathsJobLimit()));

if (!app_.isStopping() && mPathFindThread < maxPathFindThreads &&
app_.getPathRequestManager().requestsPending())
{
JLOG(m_journal.debug()) << "newPFWork: Creating job. path find threads: "
<< mPathFindThread;
Expand Down
59 changes: 32 additions & 27 deletions src/xrpld/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,36 @@ class ApplicationImp : public Application, public BasicApp
#endif
}

static int
jobQueueThreads(std::unique_ptr<Config> const& config)
{
if (config->standalone() && !config->FORCE_MULTI_THREAD)
return 1;

if (config->WORKERS)
return config->WORKERS;

auto count = static_cast<int>(std::thread::hardware_concurrency());

// Be more aggressive about the number of threads to use
// for the job queue if the server is configured as
// "large" or "huge" if there are enough cores.
if (config->NODE_SIZE >= 4 && count >= 16)
count = 6 + std::min(count, 8);
else if (config->NODE_SIZE >= 3 && count >= 8)
count = 4 + std::min(count, 6);
else
count = 2 + std::min(count, 4);

return count;
}

static int
maxUpdatePfLimit(std::unique_ptr<Config> const& config)
{
return std::max(2, (jobQueueThreads(config) * 3) / 4);
}

//--------------------------------------------------------------------------

ApplicationImp(
Expand Down Expand Up @@ -266,33 +296,8 @@ class ApplicationImp : public Application, public BasicApp

, m_jobQueue(
std::make_unique<JobQueue>(
[](std::unique_ptr<Config> const& config) {
if (config->standalone() && !config->FORCE_MULTI_THREAD)
return 1;

if (config->WORKERS)
return config->WORKERS;

auto count = static_cast<int>(std::thread::hardware_concurrency());

// Be more aggressive about the number of threads to use
// for the job queue if the server is configured as
// "large" or "huge" if there are enough cores.
if (config->NODE_SIZE >= 4 && count >= 16)
{
count = 6 + std::min(count, 8);
}
else if (config->NODE_SIZE >= 3 && count >= 8)
{
count = 4 + std::min(count, 6);
}
else
{
count = 2 + std::min(count, 4);
}

return count;
}(config_),
jobQueueThreads(config_),
std::min(config_->PATH_WORKERS, maxUpdatePfLimit(config_)),
m_collectorManager->group("jobq"),
logs_->journal("JobQueue"),
*logs_,
Expand Down
1 change: 1 addition & 0 deletions src/xrpld/core/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class Config : public BasicConfig
int PATH_SEARCH = 2;
int PATH_SEARCH_FAST = 2;
int PATH_SEARCH_MAX = 3;
int PATH_WORKERS = 2;

// Validation
std::optional<std::size_t> VALIDATION_QUORUM; // validations to consider ledger authoritative
Expand Down
1 change: 1 addition & 0 deletions src/xrpld/core/ConfigSections.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct ConfigSection
#define SECTION_PATH_SEARCH "path_search"
#define SECTION_PATH_SEARCH_FAST "path_search_fast"
#define SECTION_PATH_SEARCH_MAX "path_search_max"
#define SECTION_PATH_WORKERS "path_workers"
#define SECTION_PEER_PRIVATE "peer_private"
#define SECTION_PEERS_MAX "peers_max"
#define SECTION_PEERS_IN_MAX "peers_in_max"
Expand Down
34 changes: 34 additions & 0 deletions src/xrpld/core/detail/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,14 @@ Config::loadFromString(std::string const& fileContents)
PATH_SEARCH_FAST = beast::lexicalCastThrow<int>(strTemp);
if (getSingleSection(secConfig, SECTION_PATH_SEARCH_MAX, strTemp, j_))
PATH_SEARCH_MAX = beast::lexicalCastThrow<int>(strTemp);
if (getSingleSection(secConfig, SECTION_PATH_WORKERS, strTemp, j_))
{
PATH_WORKERS = beast::lexicalCastThrow<int>(strTemp);

if (PATH_WORKERS < 2)
Throw<std::runtime_error>("Invalid " SECTION_PATH_WORKERS
": must be greater than or equal to 2.");
}

if (getSingleSection(secConfig, SECTION_DEBUG_LOGFILE, strTemp, j_))
DEBUG_LOGFILE = strTemp;
Expand All @@ -731,6 +739,32 @@ Config::loadFromString(std::string const& fileContents)
}
}

auto const effectiveWorkers = [&]() {
if (standalone() && !FORCE_MULTI_THREAD)
return 1;

if (WORKERS)
return WORKERS;

auto count = static_cast<int>(std::thread::hardware_concurrency());

if (NODE_SIZE >= 4 && count >= 16)
count = 6 + std::min(count, 8);
else if (NODE_SIZE >= 3 && count >= 8)
count = 4 + std::min(count, 6);
else
count = 2 + std::min(count, 4);

return count;
}();

auto const maxUpdatePfLimit = std::max(2, (effectiveWorkers * 3) / 4);
if (PATH_WORKERS > maxUpdatePfLimit)
Throw<std::runtime_error>(
"Invalid " SECTION_PATH_WORKERS
": must be less than or equal to 3/4 of effective job queue "
"workers (minimum maximum of 2).");

if (getSingleSection(secConfig, SECTION_IO_WORKERS, strTemp, j_))
{
IO_WORKERS = beast::lexicalCastThrow<int>(strTemp);
Expand Down
Loading