Skip to content

Commit c7efd56

Browse files
committed
feat: rpc external users: use 2 queues but no extra threads
1 parent c575a58 commit c7efd56

File tree

2 files changed

+36
-44
lines changed

2 files changed

+36
-44
lines changed

src/httpserver.cpp

Lines changed: 36 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,14 @@ class WorkQueue
6969
Mutex cs;
7070
std::condition_variable cond GUARDED_BY(cs);
7171
std::deque<std::unique_ptr<WorkItem>> queue GUARDED_BY(cs);
72+
std::deque<std::unique_ptr<WorkItem>> external_queue GUARDED_BY(cs);
7273
bool running GUARDED_BY(cs);
7374
const size_t maxDepth;
74-
const bool m_is_external;
75+
const size_t m_external_depth;
7576

7677
public:
77-
explicit WorkQueue(size_t _maxDepth, bool is_external) : running(true),
78-
maxDepth(_maxDepth),
79-
m_is_external(is_external)
78+
explicit WorkQueue(size_t _maxDepth, size_t external_depth) : running(true),
79+
maxDepth(_maxDepth), m_external_depth(external_depth)
8080
{
8181
}
8282
/** Precondition: worker threads have all stopped (they have been joined).
@@ -85,13 +85,19 @@ class WorkQueue
8585
{
8686
}
8787
/** Enqueue a work item */
88-
bool Enqueue(WorkItem* item) EXCLUSIVE_LOCKS_REQUIRED(!cs)
88+
bool Enqueue(WorkItem* item, bool is_external) EXCLUSIVE_LOCKS_REQUIRED(!cs)
8989
{
9090
LOCK(cs);
91-
if (!running || queue.size() >= maxDepth) {
91+
if (!running) {
9292
return false;
9393
}
94-
queue.emplace_back(std::unique_ptr<WorkItem>(item));
94+
if (is_external) {
95+
if (external_queue.size() >= m_external_depth) return false;
96+
external_queue.emplace_back(std::unique_ptr<WorkItem>(item));
97+
} else {
98+
if (queue.size() >= maxDepth) return false;
99+
queue.emplace_back(std::unique_ptr<WorkItem>(item));
100+
}
95101
cond.notify_one();
96102
return true;
97103
}
@@ -102,15 +108,18 @@ class WorkQueue
102108
std::unique_ptr<WorkItem> i;
103109
{
104110
WAIT_LOCK(cs, lock);
105-
while (running && queue.empty())
111+
while (running && external_queue.empty() && queue.empty())
106112
cond.wait(lock);
107-
if (!running && queue.empty())
113+
if (!running && external_queue.empty() && queue.empty())
108114
break;
109-
i = std::move(queue.front());
110-
queue.pop_front();
111-
}
112-
if (m_is_external) {
113-
LogPrintf("HTTP: Calling handler for external user...\n");
115+
if (!queue.empty()) {
116+
i = std::move(queue.front());
117+
queue.pop_front();
118+
} else {
119+
i = std::move(external_queue.front());
120+
external_queue.pop_front();
121+
LogPrintf("HTTP: Calling handler for external user...\n");
122+
}
114123
}
115124
(*i)();
116125
}
@@ -145,7 +154,6 @@ static struct evhttp* eventHTTP = nullptr;
145154
static std::vector<CSubNet> rpc_allow_subnets;
146155
//! Work queue for handling longer requests off the event loop thread
147156
static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue{nullptr};
148-
static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue_external{nullptr};
149157
//! Handlers for (sub)paths
150158
static std::vector<HTTPPathHandler> pathHandlers;
151159
//! Bound listening sockets
@@ -284,19 +292,17 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
284292
auto item{std::make_unique<HTTPWorkItem>(std::move(hreq), path, i->handler)};
285293
assert(g_work_queue);
286294

287-
// We have queue created only if RPC arg 'rpcexternaluser' is specified
288-
if (is_external_request && g_work_queue_external) {
289-
if (g_work_queue_external->Enqueue(item.get())) {
290-
item.release();
291-
} else {
295+
if (g_work_queue->Enqueue(item.get(), is_external_request)) {
296+
item.release(); /* if true, queue took ownership */
297+
} else {
298+
if (is_external_request)
299+
{
292300
LogPrintf("WARNING: request rejected because http work queue depth of externals exceeded, it can be increased with the -rpcexternalworkqueue= setting\n");
293301
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth of externals exceeded");
302+
} else {
303+
LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");
304+
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
294305
}
295-
} else if (g_work_queue->Enqueue(item.get())) {
296-
item.release(); /* if true, queue took ownership */
297-
} else {
298-
LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");
299-
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
300306
}
301307
} else {
302308
hreq->WriteReply(HTTP_NOT_FOUND);
@@ -425,14 +431,14 @@ bool InitHTTPServer()
425431

426432
LogPrint(BCLog::HTTP, "Initialized HTTP server\n");
427433
int workQueueDepth = std::max((long)gArgs.GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
428-
int workQueueDepthExternal = std::max((long)gArgs.GetArg("-rpcexternalworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
429-
LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth);
430-
431-
g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth, false);
434+
int workQueueDepthExternal = 0;
432435
if (!gArgs.GetArg("-rpcexternaluser", "").empty()) {
433436
LogPrintf("HTTP: creating external work queue of depth %d\n", workQueueDepthExternal);
434-
g_work_queue_external = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepthExternal, true);
437+
workQueueDepthExternal = std::max((long)gArgs.GetArg("-rpcexternalworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
435438
}
439+
LogPrintf("HTTP: creating work queue of depth %d external_depth %d\n", workQueueDepth, workQueueDepthExternal);
440+
441+
g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth, workQueueDepthExternal);
436442
// transfer ownership to eventBase/HTTP via .release()
437443
eventBase = base_ctr.release();
438444
eventHTTP = http_ctr.release();
@@ -460,18 +466,12 @@ void StartHTTPServer()
460466
{
461467
LogPrint(BCLog::HTTP, "Starting HTTP server\n");
462468
int rpcThreads = std::max((long)gArgs.GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
463-
int rpcThreadsExternals = std::max((long)gArgs.GetArg("-rpcexternalthreads", DEFAULT_HTTP_THREADS), 1L);
464469
LogPrintf("HTTP: starting %d worker threads\n", rpcThreads);
465470
g_thread_http = std::thread(ThreadHTTP, eventBase);
466471

467472
for (int i = 0; i < rpcThreads; i++) {
468473
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue.get(), i);
469474
}
470-
if (g_work_queue_external) {
471-
for (int i = 0; i < rpcThreadsExternals; i++) {
472-
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue_external.get(), i);
473-
}
474-
}
475475
}
476476

477477
void InterruptHTTPServer()
@@ -481,9 +481,6 @@ void InterruptHTTPServer()
481481
// Reject requests on current connections
482482
evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr);
483483
}
484-
if (g_work_queue_external) {
485-
g_work_queue_external->Interrupt();
486-
}
487484
if (g_work_queue) {
488485
g_work_queue->Interrupt();
489486
}
@@ -492,9 +489,6 @@ void InterruptHTTPServer()
492489
void StopHTTPServer()
493490
{
494491
LogPrint(BCLog::HTTP, "Stopping HTTP server\n");
495-
if (g_work_queue_external) {
496-
g_work_queue_external->Interrupt();
497-
}
498492
if (g_work_queue) {
499493
LogPrint(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
500494
for (auto& thread : g_thread_http_workers) {
@@ -520,7 +514,6 @@ void StopHTTPServer()
520514
event_base_free(eventBase);
521515
eventBase = nullptr;
522516
}
523-
g_work_queue_external.reset();
524517
g_work_queue.reset();
525518
LogPrint(BCLog::HTTP, "Stopped HTTP server\n");
526519
}

src/init.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -768,7 +768,6 @@ void SetupServerArgs(NodeContext& node)
768768
argsman.AddArg("-rpcport=<port>", strprintf("Listen for JSON-RPC connections on <port> (default: %u, testnet: %u, regtest: %u)", defaultBaseParams->RPCPort(), testnetBaseParams->RPCPort(), regtestBaseParams->RPCPort()), ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY, OptionsCategory::RPC);
769769
argsman.AddArg("-rpcservertimeout=<n>", strprintf("Timeout during HTTP requests (default: %d)", DEFAULT_HTTP_SERVER_TIMEOUT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC);
770770
argsman.AddArg("-rpcthreads=<n>", strprintf("Set the number of threads to service RPC calls (default: %d)", DEFAULT_HTTP_THREADS), ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
771-
argsman.AddArg("-rpcexternalthreads=<n>", strprintf("Set the number of threads to service RPC calls from external consumers (default: %d)", DEFAULT_HTTP_THREADS), ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
772771
argsman.AddArg("-rpcuser=<user>", "Username for JSON-RPC connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
773772
argsman.AddArg("-rpcexternaluser=<user>", "Username for JSON-RPC external connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
774773
argsman.AddArg("-rpcwhitelist=<whitelist>", "Set a whitelist to filter incoming RPC calls for a specific user. The field <whitelist> comes in the format: <USERNAME>:<rpc 1>,<rpc 2>,...,<rpc n>. If multiple whitelists are set for a given user, they are set-intersected. See -rpcwhitelistdefault documentation for information on default whitelist behavior.", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);

0 commit comments

Comments
 (0)