Skip to content

Commit 5baa522

Browse files
Merge dashpay#6045: feat: one more queue for "external" requests from 3rd parties
241f073 feat: rpc external users are comma separated list (Konstantin Akimov) 68def97 refactor: re-order arguments options alphabetically (Konstantin Akimov) c7efd56 feat: rpc external users: use 2 queues but no extra threads (Konstantin Akimov) c575a58 feat: change handler to '/' for external users, use only rpc user name to choose queue (Konstantin Akimov) f1c1fd8 feat: implementation for /external handler for RPC (Konstantin Akimov) Pull request description: ## Issue being fixed or feature implemented To avoid struggling to response to critical rpc requests, and split them from 3rd parties who uses a node as an external service, there are introduced one more queue of requests that will be served without throttling for instance consensus important rpcs ## What was done? new command line arguments: - `rpcexternaluser` - List of comma-separated usernames for JSON-RPC external connections. If not specified, there's no special queue is created, all requests in one queue - `rpcexternalworkqueue=<n>` - Set the depth of the work queue to service external RPC calls ## How Has This Been Tested? Functional test `rpc_platform_filter.py` is updated to test new functionality ## Breaking Changes NA ## Checklist: - [x] I have performed a self-review of my own code - [x] I have commented my code, particularly in hard-to-understand areas - [x] I have added or updated relevant unit/integration/functional/e2e tests - [ ] I have made corresponding changes to the documentation - [x] I have assigned this pull request to a milestone ACKs for top commit: UdjinM6: utACK 241f073 PastaPastaPasta: utACK dashpay@241f073 Tree-SHA512: 15b371f24f5302853b85419e2b20c29749d6aae1c98a541d7471f1d3a681643063302c2a5ecce04dfad2da9101ea69d2f08a7e0e11a28609c6011d78273c57a7
2 parents 7ca4812 + 241f073 commit 5baa522

File tree

4 files changed

+83
-19
lines changed

4 files changed

+83
-19
lines changed

src/httprpc.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ static bool RPCAuthorized(const std::string& strAuth, std::string& strAuthUserna
146146
return multiUserAuthorized(strUserPass);
147147
}
148148

149-
static bool HTTPReq_JSONRPC(const CoreContext& context, HTTPRequest* req)
149+
static bool HTTPReq_JSONRPC(const CoreContext& context, HTTPRequest* req, bool external = false)
150150
{
151151
// JSONRPC handles only POST
152152
if (req->GetRequestMethod() != HTTPRequest::POST) {

src/httpserver.cpp

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,14 @@ class WorkQueue
6969
Mutex cs;
7070
std::condition_variable cond GUARDED_BY(cs);
7171
std::deque<std::unique_ptr<WorkItem>> queue GUARDED_BY(cs);
72+
std::deque<std::unique_ptr<WorkItem>> external_queue GUARDED_BY(cs);
7273
bool running GUARDED_BY(cs);
7374
const size_t maxDepth;
75+
const size_t m_external_depth;
7476

7577
public:
76-
explicit WorkQueue(size_t _maxDepth) : running(true),
77-
maxDepth(_maxDepth)
78+
explicit WorkQueue(size_t _maxDepth, size_t external_depth) : running(true),
79+
maxDepth(_maxDepth), m_external_depth(external_depth)
7880
{
7981
}
8082
/** Precondition: worker threads have all stopped (they have been joined).
@@ -83,13 +85,19 @@ class WorkQueue
8385
{
8486
}
8587
/** Enqueue a work item */
86-
bool Enqueue(WorkItem* item) EXCLUSIVE_LOCKS_REQUIRED(!cs)
88+
bool Enqueue(WorkItem* item, bool is_external) EXCLUSIVE_LOCKS_REQUIRED(!cs)
8789
{
8890
LOCK(cs);
89-
if (!running || queue.size() >= maxDepth) {
91+
if (!running) {
9092
return false;
9193
}
92-
queue.emplace_back(std::unique_ptr<WorkItem>(item));
94+
if (is_external) {
95+
if (external_queue.size() >= m_external_depth) return false;
96+
external_queue.emplace_back(std::unique_ptr<WorkItem>(item));
97+
} else {
98+
if (queue.size() >= maxDepth) return false;
99+
queue.emplace_back(std::unique_ptr<WorkItem>(item));
100+
}
93101
cond.notify_one();
94102
return true;
95103
}
@@ -100,12 +108,18 @@ class WorkQueue
100108
std::unique_ptr<WorkItem> i;
101109
{
102110
WAIT_LOCK(cs, lock);
103-
while (running && queue.empty())
111+
while (running && external_queue.empty() && queue.empty())
104112
cond.wait(lock);
105-
if (!running && queue.empty())
113+
if (!running && external_queue.empty() && queue.empty())
106114
break;
107-
i = std::move(queue.front());
108-
queue.pop_front();
115+
if (!queue.empty()) {
116+
i = std::move(queue.front());
117+
queue.pop_front();
118+
} else {
119+
i = std::move(external_queue.front());
120+
external_queue.pop_front();
121+
LogPrintf("HTTP: Calling handler for external user...\n");
122+
}
109123
}
110124
(*i)();
111125
}
@@ -140,6 +154,8 @@ static struct evhttp* eventHTTP = nullptr;
140154
static std::vector<CSubNet> rpc_allow_subnets;
141155
//! Work queue for handling longer requests off the event loop thread
142156
static std::unique_ptr<WorkQueue<HTTPClosure>> g_work_queue{nullptr};
157+
//! List of 'external' RPC users
158+
static std::vector<std::string> g_external_usernames;
143159
//! Handlers for (sub)paths
144160
static std::vector<HTTPPathHandler> pathHandlers;
145161
//! Bound listening sockets
@@ -252,16 +268,39 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
252268
break;
253269
}
254270
}
271+
const bool is_external_request = [&hreq]() -> bool {
272+
if (g_external_usernames.empty()) return false;
273+
274+
const std::string strAuth = hreq->GetHeader("authorization").second;
275+
if (strAuth.substr(0, 6) != "Basic ")
276+
return false;
277+
278+
std::string strUserPass64 = TrimString(strAuth.substr(6));
279+
bool invalid;
280+
std::string strUserPass = DecodeBase64(strUserPass64, &invalid);
281+
if (invalid) return false;
282+
283+
if (strUserPass.find(':') == std::string::npos) return false;
284+
const std::string username{strUserPass.substr(0, strUserPass.find(':'))};
285+
return find(g_external_usernames.begin(), g_external_usernames.end(), username) != g_external_usernames.end();
286+
}();
255287

256288
// Dispatch to worker thread
257289
if (i != iend) {
258290
auto item{std::make_unique<HTTPWorkItem>(std::move(hreq), path, i->handler)};
259291
assert(g_work_queue);
260-
if (g_work_queue->Enqueue(item.get())) {
292+
293+
if (g_work_queue->Enqueue(item.get(), is_external_request)) {
261294
item.release(); /* if true, queue took ownership */
262295
} else {
263-
LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");
264-
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
296+
if (is_external_request)
297+
{
298+
LogPrintf("WARNING: request rejected because http work queue depth of externals exceeded, it can be increased with the -rpcexternalworkqueue= setting\n");
299+
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth of externals exceeded");
300+
} else {
301+
LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");
302+
item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
303+
}
265304
}
266305
} else {
267306
hreq->WriteReply(HTTP_NOT_FOUND);
@@ -390,9 +429,13 @@ bool InitHTTPServer()
390429

391430
LogPrint(BCLog::HTTP, "Initialized HTTP server\n");
392431
int workQueueDepth = std::max((long)gArgs.GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
393-
LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth);
394-
395-
g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth);
432+
int workQueueDepthExternal = 0;
433+
if (const std::string rpc_externaluser{gArgs.GetArg("-rpcexternaluser", "")}; !rpc_externaluser.empty()) {
434+
workQueueDepthExternal = std::max((long)gArgs.GetArg("-rpcexternalworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
435+
g_external_usernames = SplitString(rpc_externaluser, ',');
436+
}
437+
LogPrintf("HTTP: creating work queue of depth %d external_depth %d\n", workQueueDepth, workQueueDepthExternal);
438+
g_work_queue = std::make_unique<WorkQueue<HTTPClosure>>(workQueueDepth, workQueueDepthExternal);
396439
// transfer ownership to eventBase/HTTP via .release()
397440
eventBase = base_ctr.release();
398441
eventHTTP = http_ctr.release();

src/init.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -767,6 +767,8 @@ void SetupServerArgs(NodeContext& node)
767767
argsman.AddArg("-rpcauth=<userpw>", "Username and HMAC-SHA-256 hashed password for JSON-RPC connections. The field <userpw> comes in the format: <USERNAME>:<SALT>$<HASH>. A canonical python script is included in share/rpcuser. The client then connects normally using the rpcuser=<USERNAME>/rpcpassword=<PASSWORD> pair of arguments. This option can be specified multiple times", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
768768
argsman.AddArg("-rpcbind=<addr>[:port]", "Bind to given address to listen for JSON-RPC connections. Do not expose the RPC server to untrusted networks such as the public internet! This option is ignored unless -rpcallowip is also passed. Port is optional and overrides -rpcport. Use [host]:port notation for IPv6. This option can be specified multiple times (default: 127.0.0.1 and ::1 i.e., localhost, or if -rpcallowip has been specified, 0.0.0.0 and :: i.e., all addresses)", ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
769769
argsman.AddArg("-rpccookiefile=<loc>", "Location of the auth cookie. Relative paths will be prefixed by a net-specific datadir location. (default: data dir)", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
770+
argsman.AddArg("-rpcexternaluser=<users>", "List of comma-separated usernames for JSON-RPC external connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
771+
argsman.AddArg("-rpcexternalworkqueue=<n>", strprintf("Set the depth of the work queue to service external RPC calls (default: %d)", DEFAULT_HTTP_WORKQUEUE), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC);
770772
argsman.AddArg("-rpcpassword=<pw>", "Password for JSON-RPC connections", ArgsManager::ALLOW_ANY | ArgsManager::SENSITIVE, OptionsCategory::RPC);
771773
argsman.AddArg("-rpcport=<port>", strprintf("Listen for JSON-RPC connections on <port> (default: %u, testnet: %u, regtest: %u)", defaultBaseParams->RPCPort(), testnetBaseParams->RPCPort(), regtestBaseParams->RPCPort()), ArgsManager::ALLOW_ANY | ArgsManager::NETWORK_ONLY, OptionsCategory::RPC);
772774
argsman.AddArg("-rpcservertimeout=<n>", strprintf("Timeout during HTTP requests (default: %d)", DEFAULT_HTTP_SERVER_TIMEOUT), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC);

test/functional/rpc_platform_filter.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def setup_chain(self):
4242
def run_test(self):
4343
url = urllib.parse.urlparse(self.nodes[0].url)
4444

45-
def test_command(method, params, auth, expexted_status, should_not_match=False):
45+
def test_command(method, params, auth, expected_status, should_not_match=False):
4646
conn = http.client.HTTPConnection(url.hostname, url.port)
4747
conn.connect()
4848
body = {"method": method}
@@ -51,9 +51,9 @@ def test_command(method, params, auth, expexted_status, should_not_match=False):
5151
conn.request('POST', '/', json.dumps(body), {"Authorization": "Basic " + str_to_b64str(auth)})
5252
resp = conn.getresponse()
5353
if should_not_match:
54-
assert resp.status != expexted_status
54+
assert resp.status != expected_status
5555
else:
56-
assert_equal(resp.status, expexted_status)
56+
assert_equal(resp.status, expected_status)
5757
conn.close()
5858

5959
whitelisted = ["getassetunlockstatuses",
@@ -114,5 +114,24 @@ def test_command(method, params, auth, expexted_status, should_not_match=False):
114114
test_command("debug", ["1"], rpcuser_authpair_operator, 200)
115115

116116

117+
self.log.info("Restart node with -rpcexternaluser")
118+
self.restart_node(0, extra_args=["-rpcexternaluser=platform-user"])
119+
120+
external_log_str = "HTTP: Calling handler for external user"
121+
expected_log_str = "ThreadRPCServer method="
122+
with self.nodes[0].assert_debug_log(expected_msgs=[expected_log_str, external_log_str]):
123+
test_command("getbestblockhash", [], rpcuser_authpair_platform, 200)
124+
with self.nodes[0].assert_debug_log(expected_msgs=[expected_log_str], unexpected_msgs = [external_log_str]):
125+
test_command("getbestblockhash", [], rpcuser_authpair_operator, 200)
126+
127+
self.log.info("Restart node with multiple external users")
128+
self.restart_node(0, extra_args=["-rpcexternaluser=platform-user,operator"])
129+
with self.nodes[0].assert_debug_log(expected_msgs=[expected_log_str, external_log_str]):
130+
test_command("getbestblockhash", [], rpcuser_authpair_platform, 200)
131+
with self.nodes[0].assert_debug_log(expected_msgs=[expected_log_str, external_log_str]):
132+
test_command("getbestblockhash", [], rpcuser_authpair_operator, 200)
133+
134+
135+
117136
if __name__ == '__main__':
118137
HTTPBasicsTest().main()

0 commit comments

Comments
 (0)