Skip to content

Commit f1c1fd8

Browse files
committed
feat: implementation for /external handler for RPC
1 parent 3612b8a commit f1c1fd8

File tree

6 files changed

+85
-19
lines changed

6 files changed

+85
-19
lines changed

src/httprpc.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ static bool RPCAuthorized(const std::string& strAuth, std::string& strAuthUserna
146146
return multiUserAuthorized(strUserPass);
147147
}
148148

149-
static bool HTTPReq_JSONRPC(const CoreContext& context, HTTPRequest* req)
149+
static bool HTTPReq_JSONRPC(const CoreContext& context, HTTPRequest* req, bool external = false)
150150
{
151151
// JSONRPC handles only POST
152152
if (req->GetRequestMethod() != HTTPRequest::POST) {
@@ -176,6 +176,14 @@ static bool HTTPReq_JSONRPC(const CoreContext& context, HTTPRequest* req)
176176
return false;
177177
}
178178

179+
if (jreq.authUser == gArgs.GetArg("-rpcexternaluser", "") && !jreq.authUser.empty()) {
180+
if (!external) {
181+
LogPrintf("RPC User '%s' is allowed to call rpc only by path /external\n", jreq.authUser);
182+
req->WriteReply(HTTP_FORBIDDEN);
183+
return false;
184+
}
185+
LogPrintf("RPC user '%s' is external\n", jreq.authUser);
186+
}
179187
try {
180188
// Parse request
181189
UniValue valRequest;
@@ -298,10 +306,12 @@ bool StartHTTPRPC(const CoreContext& context)
298306
return false;
299307

300308
auto handle_rpc = [&context](HTTPRequest* req, const std::string&) { return HTTPReq_JSONRPC(context, req); };
301-
RegisterHTTPHandler("/", true, handle_rpc);
309+
auto handle_rpc_external = [&context](HTTPRequest* req, const std::string&) { return HTTPReq_JSONRPC(context, req, true); };
310+
RegisterHTTPHandler("/", true, false, handle_rpc);
302311
if (g_wallet_init_interface.HasWalletSupport()) {
303-
RegisterHTTPHandler("/wallet/", false, handle_rpc);
312+
RegisterHTTPHandler("/wallet/", false, false, handle_rpc);
304313
}
314+
RegisterHTTPHandler("/external", true, true, handle_rpc_external);
305315
struct event_base* eventBase = EventBase();
306316
assert(eventBase);
307317
httpRPCTimerInterface = std::make_unique<HTTPRPCTimerInterface>(eventBase);
@@ -317,6 +327,7 @@ void InterruptHTTPRPC()
317327
void StopHTTPRPC()
318328
{
319329
LogPrint(BCLog::RPC, "Stopping HTTP RPC server\n");
330+
UnregisterHTTPHandler("/external", true);
320331
UnregisterHTTPHandler("/", true);
321332
if (g_wallet_init_interface.HasWalletSupport()) {
322333
UnregisterHTTPHandler("/wallet/", false);

src/httpserver.cpp

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,12 @@ class WorkQueue
7171
std::deque<std::unique_ptr<WorkItem>> queue GUARDED_BY(cs);
7272
bool running GUARDED_BY(cs);
7373
const size_t maxDepth;
74+
const bool m_is_external;
7475

7576
public:
76-
explicit WorkQueue(size_t _maxDepth) : running(true),
77-
maxDepth(_maxDepth)
77+
explicit WorkQueue(size_t _maxDepth, bool is_external) : running(true),
78+
maxDepth(_maxDepth),
79+
m_is_external(is_external)
7880
{
7981
}
8082
/** Precondition: worker threads have all stopped (they have been joined).
@@ -107,6 +109,9 @@ class WorkQueue
107109
i = std::move(queue.front());
108110
queue.pop_front();
109111
}
112+
if (m_is_external) {
113+
LogPrintf("HTTP: Calling handler for external user...\n");
114+
}
110115
(*i)();
111116
}
112117
}
@@ -121,12 +126,13 @@ class WorkQueue
121126

122127
struct HTTPPathHandler
123128
{
124-
HTTPPathHandler(std::string _prefix, bool _exactMatch, HTTPRequestHandler _handler):
125-
prefix(_prefix), exactMatch(_exactMatch), handler(_handler)
129+
HTTPPathHandler(std::string _prefix, bool _exactMatch, bool external, HTTPRequestHandler _handler):
130+
prefix(_prefix), exactMatch(_exactMatch), m_external(external), handler(_handler)
126131
{
127132
}
128133
std::string prefix;
129134
bool exactMatch;
135+
bool m_external;
130136
HTTPRequestHandler handler;
131137
};
132138

@@ -140,6 +146,7 @@ static struct evhttp* eventHTTP = nullptr;
140146
static std::vector<CSubNet> rpc_allow_subnets;
141147
//! Work queue for handling longer requests off the event loop thread
142148
static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue{nullptr};
149+
static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue_external{nullptr};
143150
//! Handlers for (sub)paths
144151
static std::vector<HTTPPathHandler> pathHandlers;
145152
//! Bound listening sockets
@@ -258,9 +265,18 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
258265

259266
// Dispatch to worker thread
260267
if (i != iend) {
261-
auto item{std::make_unique<HTTPWorkItem>(std::move(hreq), path, i->handler)};
268+
auto item{std::make_unique<HTTPWorkItem>(std::move(hreq), path, i->handler)}; /// this handler!
262269
assert(g_work_queue);
263-
if (g_work_queue->Enqueue(item.get())) {
270+
271+
// We have queue created only if RPC arg 'rpcexternaluser' is specified
272+
if (i->m_external && g_work_queue_external) {
273+
if (g_work_queue_external->Enqueue(item.get())) {
274+
item.release();
275+
} else {
276+
LogPrintf("WARNING: request rejected because http work queue depth of externals exceeded, it can be increased with the -rpcexternalworkqueue= setting\n");
277+
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth of externals exceeded");
278+
}
279+
} else if (g_work_queue->Enqueue(item.get())) {
264280
item.release(); /* if true, queue took ownership */
265281
} else {
266282
LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");
@@ -393,9 +409,14 @@ bool InitHTTPServer()
393409

394410
LogPrint(BCLog::HTTP, "Initialized HTTP server\n");
395411
int workQueueDepth = std::max((long)gArgs.GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
412+
int workQueueDepthExternal = std::max((long)gArgs.GetArg("-rpcexternalworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
396413
LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth);
397414

398-
g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth);
415+
g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth, false);
416+
if (!gArgs.GetArg("-rpcexternaluser", "").empty()) {
417+
LogPrintf("HTTP: creating external work queue of depth %d\n", workQueueDepthExternal);
418+
g_work_queue_external = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepthExternal, true);
419+
}
399420
// transfer ownership to eventBase/HTTP via .release()
400421
eventBase = base_ctr.release();
401422
eventHTTP = http_ctr.release();
@@ -423,12 +444,18 @@ void StartHTTPServer()
423444
{
424445
LogPrint(BCLog::HTTP, "Starting HTTP server\n");
425446
int rpcThreads = std::max((long)gArgs.GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
447+
int rpcThreadsExternals = std::max((long)gArgs.GetArg("-rpcexternalthreads", DEFAULT_HTTP_THREADS), 1L);
426448
LogPrintf("HTTP: starting %d worker threads\n", rpcThreads);
427449
g_thread_http = std::thread(ThreadHTTP, eventBase);
428450

429451
for (int i = 0; i < rpcThreads; i++) {
430452
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue.get(), i);
431453
}
454+
if (g_work_queue_external) {
455+
for (int i = 0; i < rpcThreadsExternals; i++) {
456+
g_thread_http_workers.emplace_back(HTTPWorkQueueRun, g_work_queue_external.get(), i);
457+
}
458+
}
432459
}
433460

434461
void InterruptHTTPServer()
@@ -438,6 +465,9 @@ void InterruptHTTPServer()
438465
// Reject requests on current connections
439466
evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr);
440467
}
468+
if (g_work_queue_external) {
469+
g_work_queue_external->Interrupt();
470+
}
441471
if (g_work_queue) {
442472
g_work_queue->Interrupt();
443473
}
@@ -446,6 +476,9 @@ void InterruptHTTPServer()
446476
void StopHTTPServer()
447477
{
448478
LogPrint(BCLog::HTTP, "Stopping HTTP server\n");
479+
if (g_work_queue_external) {
480+
g_work_queue_external->Interrupt();
481+
}
449482
if (g_work_queue) {
450483
LogPrint(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
451484
for (auto& thread : g_thread_http_workers) {
@@ -471,6 +504,7 @@ void StopHTTPServer()
471504
event_base_free(eventBase);
472505
eventBase = nullptr;
473506
}
507+
g_work_queue_external.reset();
474508
g_work_queue.reset();
475509
LogPrint(BCLog::HTTP, "Stopped HTTP server\n");
476510
}
@@ -639,10 +673,10 @@ HTTPRequest::RequestMethod HTTPRequest::GetRequestMethod() const
639673
}
640674
}
641675

642-
void RegisterHTTPHandler(const std::string &prefix, bool exactMatch, const HTTPRequestHandler &handler)
676+
void RegisterHTTPHandler(const std::string &prefix, bool exactMatch, bool external, const HTTPRequestHandler &handler)
643677
{
644-
LogPrint(BCLog::HTTP, "Registering HTTP handler for %s (exactmatch %d)\n", prefix, exactMatch);
645-
pathHandlers.push_back(HTTPPathHandler(prefix, exactMatch, handler));
678+
LogPrint(BCLog::HTTP, "Registering HTTP handler for %s (exactmatch %d external %d)\n", prefix, exactMatch, external);
679+
pathHandlers.push_back(HTTPPathHandler(prefix, exactMatch, external, handler));
646680
}
647681

648682
void UnregisterHTTPHandler(const std::string &prefix, bool exactMatch)

src/httpserver.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ typedef std::function<bool(HTTPRequest* req, const std::string &)> HTTPRequestHa
4141
* If multiple handlers match a prefix, the first-registered one will
4242
* be invoked.
4343
*/
44-
void RegisterHTTPHandler(const std::string &prefix, bool exactMatch, const HTTPRequestHandler &handler);
44+
void RegisterHTTPHandler(const std::string &prefix, bool exactMatch, bool external, const HTTPRequestHandler &handler);
4545
/** Unregister handler for prefix */
4646
void UnregisterHTTPHandler(const std::string &prefix, bool exactMatch);
4747

src/init.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,10 +768,13 @@ void SetupServerArgs(NodeContext& node)
768768
argsman.AddArg("-rpcport=<port>", strprintf("Listen for JSON-RPC connections on <port> (default: %u, testnet: %u, regtest: %u)", defaultBaseParams->RPCPort(), testnetBaseParams->RPCPort(), regtestBaseParams->RPCPort()), ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY, OptionsCategory::RPC);
769769
argsman.AddArg("-rpcservertimeout=<n>", strprintf("Timeout during HTTP requests (default: %d)", DEFAULT_HTTP_SERVER_TIMEOUT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC);
770770
argsman.AddArg("-rpcthreads=<n>", strprintf("Set the number of threads to service RPC calls (default: %d)", DEFAULT_HTTP_THREADS), ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
771+
argsman.AddArg("-rpcexternalthreads=<n>", strprintf("Set the number of threads to service RPC calls from external consumers (default: %d)", DEFAULT_HTTP_THREADS), ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
771772
argsman.AddArg("-rpcuser=<user>", "Username for JSON-RPC connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
773+
argsman.AddArg("-rpcexternaluser=<user>", "Username for JSON-RPC external connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
772774
argsman.AddArg("-rpcwhitelist=<whitelist>", "Set a whitelist to filter incoming RPC calls for a specific user. The field <whitelist> comes in the format: <USERNAME>:<rpc 1>,<rpc 2>,...,<rpc n>. If multiple whitelists are set for a given user, they are set-intersected. See -rpcwhitelistdefault documentation for information on default whitelist behavior.", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
773775
argsman.AddArg("-rpcwhitelistdefault", "Sets default behavior for rpc whitelisting. Unless rpcwhitelistdefault is set to 0, if any -rpcwhitelist is set, the rpc server acts as if all rpc users are subject to empty-unless-otherwise-specified whitelists. If rpcwhitelistdefault is set to 1 and no -rpcwhitelist is set, rpc server acts as if all rpc users are subject to empty whitelists.", ArgsManager::ALLOW_BOOL, OptionsCategory::RPC);
774776
argsman.AddArg("-rpcworkqueue=<n>", strprintf("Set the depth of the work queue to service RPC calls (default: %d)", DEFAULT_HTTP_WORKQUEUE), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC);
777+
argsman.AddArg("-rpcexternalworkqueue=<n>", strprintf("Set the depth of the work queue to service external RPC calls (default: %d)", DEFAULT_HTTP_WORKQUEUE), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC);
775778
argsman.AddArg("-server", "Accept command line and JSON-RPC commands", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
776779

777780
argsman.AddArg("-statsenabled", strprintf("Publish internal stats to statsd (default: %u)", DEFAULT_STATSD_ENABLE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);

src/rest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,7 @@ void StartREST(const CoreContext& context)
723723
{
724724
for (const auto& up : uri_prefixes) {
725725
auto handler = [&context, up](HTTPRequest* req, const std::string& prefix) { return up.handler(context, req, prefix); };
726-
RegisterHTTPHandler(up.prefix, false, handler);
726+
RegisterHTTPHandler(up.prefix, false, false, handler);
727727
}
728728
}
729729

test/functional/rpc_platform_filter.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,24 @@ def setup_chain(self):
4242
def run_test(self):
4343
url = urllib.parse.urlparse(self.nodes[0].url)
4444

45-
def test_command(method, params, auth, expexted_status, should_not_match=False):
45+
def test_command(method, params, auth, expected_status, should_not_match=False):
46+
test_command_helper(method, params, '/', auth, expected_status, should_not_match)
47+
48+
def test_external_command(method, params, auth, expected_status, should_not_match=False):
49+
test_command_helper(method, params, '/external', auth, expected_status, should_not_match)
50+
51+
def test_command_helper(method, params, path, auth, expected_status, should_not_match):
4652
conn = http.client.HTTPConnection(url.hostname, url.port)
4753
conn.connect()
4854
body = {"method": method}
4955
if len(params):
5056
body["params"] = params
51-
conn.request('POST', '/', json.dumps(body), {"Authorization": "Basic " + str_to_b64str(auth)})
57+
conn.request('POST', path, json.dumps(body), {"Authorization": "Basic " + str_to_b64str(auth)})
5258
resp = conn.getresponse()
5359
if should_not_match:
54-
assert resp.status != expexted_status
60+
assert resp.status != expected_status
5561
else:
56-
assert_equal(resp.status, expexted_status)
62+
assert_equal(resp.status, expected_status)
5763
conn.close()
5864

5965
whitelisted = ["getassetunlockstatuses",
@@ -114,5 +120,17 @@ def test_command(method, params, auth, expexted_status, should_not_match=False):
114120
test_command("debug", ["1"], rpcuser_authpair_operator, 200)
115121

116122

123+
self.log.info("Restart node with /external handler...")
124+
test_external_command("getbestblockhash", [], rpcuser_authpair_platform, 200)
125+
test_external_command("getblockchaininfo", [], rpcuser_authpair_platform, 403)
126+
127+
self.restart_node(0, extra_args=["-rpcexternaluser=platform-user"])
128+
test_command("getbestblockhash", [], rpcuser_authpair_platform, 403)
129+
test_external_command("getbestblockhash", [], rpcuser_authpair_platform, 200)
130+
test_external_command("getblockchaininfo", [], rpcuser_authpair_platform, 403)
131+
test_external_command("getbestblockhash", [], rpcuser_authpair_operator, 200)
132+
133+
134+
117135
if __name__ == '__main__':
118136
HTTPBasicsTest().main()

0 commit comments

Comments
 (0)