diff --git a/cfg/xrpld-example.cfg b/cfg/xrpld-example.cfg index 995d4e65ffc..339ce349c6a 100644 --- a/cfg/xrpld-example.cfg +++ b/cfg/xrpld-example.cfg @@ -814,6 +814,13 @@ # # Configures the number of threads for performing nodestore prefetching. # +# [path_workers] +# +# Configures the maximum number of concurrently running jobs of type +# jtUPDATE_PF in the JobQueue. This impacts path update and full order book +# update throughput. Default: 2. +# +# Maximum value is 3/4 of [workers] (with a minimum of 2). # # # [network_id] diff --git a/include/xrpl/core/JobQueue.h b/include/xrpl/core/JobQueue.h index 583e8bc26a3..eb198f39a3f 100644 --- a/include/xrpl/core/JobQueue.h +++ b/include/xrpl/core/JobQueue.h @@ -124,6 +124,7 @@ class JobQueue : private Workers::Callback JobQueue( int threadCount, + int updatePathsJobLimit, beast::insight::Collector::ptr const& collector, beast::Journal journal, Logs& logs, @@ -181,6 +182,12 @@ class JobQueue : private Workers::Callback int getJobCountGE(JobType t) const; + int + getUpdatePathsJobLimit() const + { + return updatePathsJobLimit_; + } + /** Return a scoped LoadEvent. */ std::unique_ptr @@ -238,6 +245,8 @@ class JobQueue : private Workers::Callback // The number of suspended coroutines int nSuspend_ = 0; + int updatePathsJobLimit_; + Workers m_workers; // Statistics tracking @@ -316,7 +325,7 @@ class JobQueue : private Workers::Callback // Returns the limit of running jobs for the given job type. // For jobs with no limit, we return the largest int. Hopefully that // will be enough. - static int + int getJobLimit(JobType type); }; diff --git a/src/libxrpl/core/detail/JobQueue.cpp b/src/libxrpl/core/detail/JobQueue.cpp index 2fe5a5b0fd9..c45256e77a9 100644 --- a/src/libxrpl/core/detail/JobQueue.cpp +++ b/src/libxrpl/core/detail/JobQueue.cpp @@ -8,6 +8,7 @@ namespace xrpl { JobQueue::JobQueue( int threadCount, + int updatePathsJobLimit, beast::insight::Collector::ptr const& collector, beast::Journal journal, Logs& logs, @@ -16,6 +17,7 @@ JobQueue::JobQueue( , m_lastJob(0) , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs) , m_processCount(0) + , updatePathsJobLimit_(updatePathsJobLimit > 0 ? updatePathsJobLimit : 1) , m_workers(*this, &perfLog, "JobQueue", threadCount) , perfLog_(perfLog) , m_collector(collector) @@ -381,6 +383,9 @@ JobQueue::processTask(int instance) int JobQueue::getJobLimit(JobType type) { + if (type == jtUPDATE_PF) + return updatePathsJobLimit_; + JobTypeInfo const& j(JobTypes::instance().get(type)); XRPL_ASSERT(j.type() != jtINVALID, "xrpl::JobQueue::getJobLimit : valid job type"); diff --git a/src/test/core/Config_test.cpp b/src/test/core/Config_test.cpp index edcb67b767f..5f2930e431b 100644 --- a/src/test/core/Config_test.cpp +++ b/src/test/core/Config_test.cpp @@ -1537,6 +1537,63 @@ r.ripple.com:51235 BEAST_EXPECT(!testDiverged("901")); } + void + testPathWorkers() + { + testcase("path_workers validation"); + + // Helper to build config string + auto makeConfig = [](int pathWorkers) { + return std::string("[path_workers]\n") + std::to_string(pathWorkers) + "\n"; + }; + + // Compute the max allowed value for path_workers + int effectiveWorkers = xrpl::Config::computeEffectiveWorkers(false, false, 0, 2); // NODE_SIZE=2 (medium) + int maxAllowed = std::max(2, (effectiveWorkers * 3) / 4); + + // Accept valid value (min) + { + Config c; + c.loadFromString(makeConfig(2)); + BEAST_EXPECT(c.PATH_WORKERS == 2); + } + // Accept valid value (max) + { + Config c; + c.loadFromString(makeConfig(maxAllowed)); + BEAST_EXPECT(c.PATH_WORKERS == maxAllowed); + } + // Accept valid value (mid) + if (maxAllowed > 2) { + int mid = (2 + maxAllowed) / 2; + Config c; + c.loadFromString(makeConfig(mid)); + BEAST_EXPECT(c.PATH_WORKERS == mid); + } + // Reject value < 2 + { + Config c; + bool threw = false; + try { + c.loadFromString(makeConfig(1)); + } catch (std::runtime_error& e) { + threw = true; + } + BEAST_EXPECT(threw); + } + // Reject value > max + { + Config c; + bool threw = false; + try { + c.loadFromString(makeConfig(maxAllowed + 1)); + } catch (std::runtime_error& e) { + threw = true; + } + BEAST_EXPECT(threw); + } + } + void run() override { @@ -1556,6 +1613,7 @@ r.ripple.com:51235 testAmendment(); testOverlay(); testNetworkID(); + testPathWorkers(); } }; diff --git a/src/xrpld/app/ledger/detail/LedgerMaster.cpp b/src/xrpld/app/ledger/detail/LedgerMaster.cpp index 4d90cbddcd4..e8a1388025f 100644 --- a/src/xrpld/app/ledger/detail/LedgerMaster.cpp +++ b/src/xrpld/app/ledger/detail/LedgerMaster.cpp @@ -1472,7 +1472,14 @@ LedgerMaster::newOrderBookDB() bool LedgerMaster::newPFWork(char const* name, std::unique_lock&) { - if (!app_.isStopping() && mPathFindThread < 2 && app_.getPathRequestManager().requestsPending()) + auto const maxPathFindThreads = std::max( + 1, + std::min( + app_.config().PATH_WORKERS, + app_.getJobQueue().getUpdatePathsJobLimit())); + + if (!app_.isStopping() && mPathFindThread < maxPathFindThreads && + app_.getPathRequestManager().requestsPending()) { JLOG(m_journal.debug()) << "newPFWork: Creating job. path find threads: " << mPathFindThread; diff --git a/src/xrpld/app/main/Application.cpp b/src/xrpld/app/main/Application.cpp index 2be94d24bf1..2dff9970c70 100644 --- a/src/xrpld/app/main/Application.cpp +++ b/src/xrpld/app/main/Application.cpp @@ -237,6 +237,19 @@ class ApplicationImp : public Application, public BasicApp #endif } + static int + jobQueueThreads(std::unique_ptr const& config) + { + return Config::computeEffectiveWorkers( + config->standalone(), config->FORCE_MULTI_THREAD, config->WORKERS, config->NODE_SIZE); + } + + static int + maxUpdatePfLimit(std::unique_ptr const& config) + { + return std::max(2, (jobQueueThreads(config) * 3) / 4); + } + //-------------------------------------------------------------------------- ApplicationImp( @@ -266,33 +279,8 @@ class ApplicationImp : public Application, public BasicApp , m_jobQueue( std::make_unique( - [](std::unique_ptr const& config) { - if (config->standalone() && !config->FORCE_MULTI_THREAD) - return 1; - - if (config->WORKERS) - return config->WORKERS; - - auto count = static_cast(std::thread::hardware_concurrency()); - - // Be more aggressive about the number of threads to use - // for the job queue if the server is configured as - // "large" or "huge" if there are enough cores. - if (config->NODE_SIZE >= 4 && count >= 16) - { - count = 6 + std::min(count, 8); - } - else if (config->NODE_SIZE >= 3 && count >= 8) - { - count = 4 + std::min(count, 6); - } - else - { - count = 2 + std::min(count, 4); - } - - return count; - }(config_), + jobQueueThreads(config_), + std::min(config_->PATH_WORKERS, maxUpdatePfLimit(config_)), m_collectorManager->group("jobq"), logs_->journal("JobQueue"), *logs_, diff --git a/src/xrpld/core/Config.h b/src/xrpld/core/Config.h index 44509d21454..c1122fad62e 100644 --- a/src/xrpld/core/Config.h +++ b/src/xrpld/core/Config.h @@ -75,6 +75,11 @@ struct FeeSetup class Config : public BasicConfig { public: + /** + * Compute the effective number of job queue worker threads. + * This logic is shared between config validation and runtime. + */ + static int computeEffectiveWorkers(bool standalone, bool forceMultiThread, int workers, std::size_t nodeSize); // Settings related to the configuration file location and directories static char const* const configFileName; static char const* const configLegacyName; @@ -182,6 +187,7 @@ class Config : public BasicConfig int PATH_SEARCH = 2; int PATH_SEARCH_FAST = 2; int PATH_SEARCH_MAX = 3; + int PATH_WORKERS = 2; // Validation std::optional VALIDATION_QUORUM; // validations to consider ledger authoritative diff --git a/src/xrpld/core/ConfigSections.h b/src/xrpld/core/ConfigSections.h index 7f22dd59c1f..5af6c076ab3 100644 --- a/src/xrpld/core/ConfigSections.h +++ b/src/xrpld/core/ConfigSections.h @@ -47,6 +47,7 @@ struct ConfigSection #define SECTION_PATH_SEARCH "path_search" #define SECTION_PATH_SEARCH_FAST "path_search_fast" #define SECTION_PATH_SEARCH_MAX "path_search_max" +#define SECTION_PATH_WORKERS "path_workers" #define SECTION_PEER_PRIVATE "peer_private" #define SECTION_PEERS_MAX "peers_max" #define SECTION_PEERS_IN_MAX "peers_in_max" diff --git a/src/xrpld/core/detail/Config.cpp b/src/xrpld/core/detail/Config.cpp index c7179c181ba..970102cb38f 100644 --- a/src/xrpld/core/detail/Config.cpp +++ b/src/xrpld/core/detail/Config.cpp @@ -705,6 +705,14 @@ Config::loadFromString(std::string const& fileContents) PATH_SEARCH_FAST = beast::lexicalCastThrow(strTemp); if (getSingleSection(secConfig, SECTION_PATH_SEARCH_MAX, strTemp, j_)) PATH_SEARCH_MAX = beast::lexicalCastThrow(strTemp); + if (getSingleSection(secConfig, SECTION_PATH_WORKERS, strTemp, j_)) + { + PATH_WORKERS = beast::lexicalCastThrow(strTemp); + + if (PATH_WORKERS < 2) + Throw("Invalid " SECTION_PATH_WORKERS + ": must be greater than or equal to 2."); + } if (getSingleSection(secConfig, SECTION_DEBUG_LOGFILE, strTemp, j_)) DEBUG_LOGFILE = strTemp; @@ -731,6 +739,16 @@ Config::loadFromString(std::string const& fileContents) } } + auto const effectiveWorkers = Config::computeEffectiveWorkers( + standalone(), FORCE_MULTI_THREAD, WORKERS, NODE_SIZE); + + auto const maxUpdatePfLimit = std::max(2, (effectiveWorkers * 3) / 4); + if (PATH_WORKERS > maxUpdatePfLimit) + Throw(boost::str(boost::format( + "Invalid %1%: configured value %2% exceeds maximum %3% " + "(3/4 of effective job queue workers = %4%, minimum maximum of 2).") % + SECTION_PATH_WORKERS % PATH_WORKERS % maxUpdatePfLimit % effectiveWorkers)); + if (getSingleSection(secConfig, SECTION_IO_WORKERS, strTemp, j_)) { IO_WORKERS = beast::lexicalCastThrow(strTemp); @@ -1326,4 +1344,24 @@ setup_DatabaseCon(Config const& c, std::optional j) return setup; } + +int Config::computeEffectiveWorkers(bool standalone, bool forceMultiThread, int workers, std::size_t nodeSize) +{ + if (standalone && !forceMultiThread) + return 1; + + if (workers) + return workers; + + auto count = static_cast(std::thread::hardware_concurrency()); + + if (nodeSize >= 4 && count >= 16) + count = 6 + std::min(count, 8); + else if (nodeSize >= 3 && count >= 8) + count = 4 + std::min(count, 6); + else + count = 2 + std::min(count, 4); + + return count; +} } // namespace xrpl