Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cfg/xrpld-example.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,13 @@
#
# Configures the number of threads for performing nodestore prefetching.
#
# [path_workers]
#
# Configures the maximum number of concurrently running jobs of type
# jtUPDATE_PF in the JobQueue. This impacts path update and full order book
# update throughput. Default: 2.
#
# Maximum value is 3/4 of [workers] (with a minimum of 2).
#
#
Comment on lines +823 to 825
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example config says the maximum is “3/4 of [workers]”, but the code enforces the limit against the effective job-queue worker count (which is auto-derived when [workers] isn’t explicitly set). To avoid misleading operators, consider documenting that the cap is based on the effective worker count (explicit [workers] or the auto-selected default).

Suggested change
# Maximum value is 3/4 of [workers] (with a minimum of 2).
#
#
# Maximum value is 3/4 of the effective JobQueue worker count (explicit
# [workers] or the auto-selected default), with a minimum of 2.
#

Copilot uses AI. Check for mistakes.
# [network_id]
Expand Down
11 changes: 10 additions & 1 deletion include/xrpl/core/JobQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class JobQueue : private Workers::Callback

JobQueue(
int threadCount,
int updatePathsJobLimit,
beast::insight::Collector::ptr const& collector,
beast::Journal journal,
Logs& logs,
Expand Down Expand Up @@ -181,6 +182,12 @@ class JobQueue : private Workers::Callback
int
getJobCountGE(JobType t) const;

int
getUpdatePathsJobLimit() const
{
return updatePathsJobLimit_;
}

/** Return a scoped LoadEvent.
*/
std::unique_ptr<LoadEvent>
Expand Down Expand Up @@ -238,6 +245,8 @@ class JobQueue : private Workers::Callback
// The number of suspended coroutines
int nSuspend_ = 0;

int updatePathsJobLimit_;

Workers m_workers;

// Statistics tracking
Expand Down Expand Up @@ -316,7 +325,7 @@ class JobQueue : private Workers::Callback
// Returns the limit of running jobs for the given job type.
// For jobs with no limit, we return the largest int. Hopefully that
// will be enough.
static int
int
getJobLimit(JobType type);
};

Expand Down
5 changes: 5 additions & 0 deletions src/libxrpl/core/detail/JobQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace xrpl {

JobQueue::JobQueue(
int threadCount,
int updatePathsJobLimit,
beast::insight::Collector::ptr const& collector,
beast::Journal journal,
Logs& logs,
Expand All @@ -16,6 +17,7 @@ JobQueue::JobQueue(
, m_lastJob(0)
, m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
, m_processCount(0)
, updatePathsJobLimit_(updatePathsJobLimit > 0 ? updatePathsJobLimit : 1)
, m_workers(*this, &perfLog, "JobQueue", threadCount)
, perfLog_(perfLog)
, m_collector(collector)
Expand Down Expand Up @@ -381,6 +383,9 @@ JobQueue::processTask(int instance)
int
JobQueue::getJobLimit(JobType type)
{
if (type == jtUPDATE_PF)
return updatePathsJobLimit_;

JobTypeInfo const& j(JobTypes::instance().get(type));
XRPL_ASSERT(j.type() != jtINVALID, "xrpl::JobQueue::getJobLimit : valid job type");

Expand Down
9 changes: 8 additions & 1 deletion src/xrpld/app/ledger/detail/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1472,7 +1472,14 @@ LedgerMaster::newOrderBookDB()
bool
LedgerMaster::newPFWork(char const* name, std::unique_lock<std::recursive_mutex>&)
{
if (!app_.isStopping() && mPathFindThread < 2 && app_.getPathRequestManager().requestsPending())
auto const maxPathFindThreads = std::max(
1,
std::min(
app_.config().PATH_WORKERS,
app_.getJobQueue().getUpdatePathsJobLimit()));

if (!app_.isStopping() && mPathFindThread < maxPathFindThreads &&
app_.getPathRequestManager().requestsPending())
{
JLOG(m_journal.debug()) << "newPFWork: Creating job. path find threads: "
<< mPathFindThread;
Expand Down
59 changes: 32 additions & 27 deletions src/xrpld/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,36 @@ class ApplicationImp : public Application, public BasicApp
#endif
}

static int
jobQueueThreads(std::unique_ptr<Config> const& config)
{
if (config->standalone() && !config->FORCE_MULTI_THREAD)
return 1;

if (config->WORKERS)
return config->WORKERS;

auto count = static_cast<int>(std::thread::hardware_concurrency());

// Be more aggressive about the number of threads to use
// for the job queue if the server is configured as
// "large" or "huge" if there are enough cores.
if (config->NODE_SIZE >= 4 && count >= 16)
count = 6 + std::min(count, 8);
else if (config->NODE_SIZE >= 3 && count >= 8)
count = 4 + std::min(count, 6);
else
count = 2 + std::min(count, 4);

return count;
}

static int
maxUpdatePfLimit(std::unique_ptr<Config> const& config)
{
return std::max(2, (jobQueueThreads(config) * 3) / 4);
}

//--------------------------------------------------------------------------

ApplicationImp(
Expand Down Expand Up @@ -266,33 +296,8 @@ class ApplicationImp : public Application, public BasicApp

, m_jobQueue(
std::make_unique<JobQueue>(
[](std::unique_ptr<Config> const& config) {
if (config->standalone() && !config->FORCE_MULTI_THREAD)
return 1;

if (config->WORKERS)
return config->WORKERS;

auto count = static_cast<int>(std::thread::hardware_concurrency());

// Be more aggressive about the number of threads to use
// for the job queue if the server is configured as
// "large" or "huge" if there are enough cores.
if (config->NODE_SIZE >= 4 && count >= 16)
{
count = 6 + std::min(count, 8);
}
else if (config->NODE_SIZE >= 3 && count >= 8)
{
count = 4 + std::min(count, 6);
}
else
{
count = 2 + std::min(count, 4);
}

return count;
}(config_),
jobQueueThreads(config_),
std::min(config_->PATH_WORKERS, maxUpdatePfLimit(config_)),
m_collectorManager->group("jobq"),
logs_->journal("JobQueue"),
*logs_,
Expand Down
1 change: 1 addition & 0 deletions src/xrpld/core/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class Config : public BasicConfig
int PATH_SEARCH = 2;
int PATH_SEARCH_FAST = 2;
int PATH_SEARCH_MAX = 3;
int PATH_WORKERS = 2;

// Validation
std::optional<std::size_t> VALIDATION_QUORUM; // validations to consider ledger authoritative
Expand Down
1 change: 1 addition & 0 deletions src/xrpld/core/ConfigSections.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct ConfigSection
#define SECTION_PATH_SEARCH "path_search"
#define SECTION_PATH_SEARCH_FAST "path_search_fast"
#define SECTION_PATH_SEARCH_MAX "path_search_max"
#define SECTION_PATH_WORKERS "path_workers"
#define SECTION_PEER_PRIVATE "peer_private"
#define SECTION_PEERS_MAX "peers_max"
#define SECTION_PEERS_IN_MAX "peers_in_max"
Expand Down
34 changes: 34 additions & 0 deletions src/xrpld/core/detail/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,14 @@ Config::loadFromString(std::string const& fileContents)
PATH_SEARCH_FAST = beast::lexicalCastThrow<int>(strTemp);
if (getSingleSection(secConfig, SECTION_PATH_SEARCH_MAX, strTemp, j_))
PATH_SEARCH_MAX = beast::lexicalCastThrow<int>(strTemp);
if (getSingleSection(secConfig, SECTION_PATH_WORKERS, strTemp, j_))
{
PATH_WORKERS = beast::lexicalCastThrow<int>(strTemp);

if (PATH_WORKERS < 2)
Throw<std::runtime_error>("Invalid " SECTION_PATH_WORKERS
": must be greater than or equal to 2.");
}
Comment on lines +708 to +715
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New config parsing/validation for [path_workers] is introduced here (min 2, max 3/4 of effective workers), but there’s no corresponding unit coverage in the existing config tests. Please add tests that (a) accept valid values and (b) reject values <2 and >max, so future changes don’t silently break config enforcement.

Copilot uses AI. Check for mistakes.

if (getSingleSection(secConfig, SECTION_DEBUG_LOGFILE, strTemp, j_))
DEBUG_LOGFILE = strTemp;
Expand All @@ -731,6 +739,32 @@ Config::loadFromString(std::string const& fileContents)
}
}

auto const effectiveWorkers = [&]() {
if (standalone() && !FORCE_MULTI_THREAD)
return 1;

if (WORKERS)
return WORKERS;

auto count = static_cast<int>(std::thread::hardware_concurrency());

if (NODE_SIZE >= 4 && count >= 16)
count = 6 + std::min(count, 8);
else if (NODE_SIZE >= 3 && count >= 8)
count = 4 + std::min(count, 6);
else
count = 2 + std::min(count, 4);

return count;
}();
Comment on lines +742 to +759
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The job-queue thread heuristic is duplicated here and in ApplicationImp::jobQueueThreads() (same logic using standalone/FORCE_MULTI_THREAD/WORKERS/NODE_SIZE/hardware_concurrency). Keeping two copies risks future divergence and inconsistent validation vs runtime behavior. Consider factoring this into a single shared helper (e.g., on Config or a small utility) and reusing it in both places.

Copilot uses AI. Check for mistakes.

auto const maxUpdatePfLimit = std::max(2, (effectiveWorkers * 3) / 4);
if (PATH_WORKERS > maxUpdatePfLimit)
Throw<std::runtime_error>(
"Invalid " SECTION_PATH_WORKERS
": must be less than or equal to 3/4 of effective job queue "
"workers (minimum maximum of 2).");
Comment on lines +763 to +766
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The path_workers upper-bound error message doesn’t include the computed effectiveWorkers / maximum allowed value, which makes it harder for operators to correct their config. Consider including the configured PATH_WORKERS and the computed maximum in the exception text.

Suggested change
Throw<std::runtime_error>(
"Invalid " SECTION_PATH_WORKERS
": must be less than or equal to 3/4 of effective job queue "
"workers (minimum maximum of 2).");
Throw<std::runtime_error>(boost::str(boost::format(
"Invalid %1%: configured value %2% exceeds maximum %3% "
"(3/4 of effective job queue workers = %4%, minimum maximum of 2).") %
SECTION_PATH_WORKERS % PATH_WORKERS % maxUpdatePfLimit % effectiveWorkers));

Copilot uses AI. Check for mistakes.

if (getSingleSection(secConfig, SECTION_IO_WORKERS, strTemp, j_))
{
IO_WORKERS = beast::lexicalCastThrow<int>(strTemp);
Expand Down
Loading