Skip to content

Commit e920529

Browse files
JoelKatzJeff Garzik
authored andcommitted
Support multi-threaded JSON-RPC
Change internal HTTP JSON-RPC server from single-threaded to thread-per-connection model. The IP filter list is applied prior to starting the thread, which then processes the RPC. A mutex covers the entire RPC operation, because not all RPC operations are thread-safe. [minor modifications by jgarzik, to make change upstream-ready]
1 parent 203f9e6 commit e920529

File tree

4 files changed

+72
-35
lines changed

4 files changed

+72
-35
lines changed

src/bitcoinrpc.cpp

Lines changed: 66 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ extern Value importprivkey(const Array& params, bool fHelp);
4646

4747
const Object emptyobj;
4848

49+
void ThreadRPCServer3(void* parg);
50+
4951
Object JSONRPCError(int code, const string& message)
5052
{
5153
Object error;
@@ -2021,7 +2023,7 @@ Value getwork(const Array& params, bool fHelp)
20212023
throw JSONRPCError(-10, "Bitcoin is downloading blocks...");
20222024

20232025
typedef map<uint256, pair<CBlock*, CScript> > mapNewBlock_t;
2024-
static mapNewBlock_t mapNewBlock;
2026+
static mapNewBlock_t mapNewBlock; // FIXME: thread safety
20252027
static vector<CBlock*> vNewBlock;
20262028
static CReserveKey reservekey(pwalletMain);
20272029

@@ -2573,20 +2575,34 @@ class SSLIOStreamDevice : public iostreams::device<iostreams::bidirectional> {
25732575
SSLStream& stream;
25742576
};
25752577

2578+
class AcceptedConnection
2579+
{
2580+
public:
2581+
SSLStream sslStream;
2582+
SSLIOStreamDevice d;
2583+
iostreams::stream<SSLIOStreamDevice> stream;
2584+
2585+
ip::tcp::endpoint peer;
2586+
2587+
AcceptedConnection(asio::io_service &io_service, ssl::context &context,
2588+
bool fUseSSL) : sslStream(io_service, context), d(sslStream, fUseSSL),
2589+
stream(d) { ; }
2590+
};
2591+
25762592
void ThreadRPCServer(void* parg)
25772593
{
25782594
IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer(parg));
25792595
try
25802596
{
2581-
vnThreadsRunning[THREAD_RPCSERVER]++;
2597+
vnThreadsRunning[THREAD_RPCLISTENER]++;
25822598
ThreadRPCServer2(parg);
2583-
vnThreadsRunning[THREAD_RPCSERVER]--;
2599+
vnThreadsRunning[THREAD_RPCLISTENER]--;
25842600
}
25852601
catch (std::exception& e) {
2586-
vnThreadsRunning[THREAD_RPCSERVER]--;
2602+
vnThreadsRunning[THREAD_RPCLISTENER]--;
25872603
PrintException(&e, "ThreadRPCServer()");
25882604
} catch (...) {
2589-
vnThreadsRunning[THREAD_RPCSERVER]--;
2605+
vnThreadsRunning[THREAD_RPCLISTENER]--;
25902606
PrintException(NULL, "ThreadRPCServer()");
25912607
}
25922608
printf("ThreadRPCServer exiting\n");
@@ -2664,54 +2680,67 @@ void ThreadRPCServer2(void* parg)
26642680
loop
26652681
{
26662682
// Accept connection
2667-
SSLStream sslStream(io_service, context);
2668-
SSLIOStreamDevice d(sslStream, fUseSSL);
2669-
iostreams::stream<SSLIOStreamDevice> stream(d);
2670-
2671-
ip::tcp::endpoint peer;
2672-
vnThreadsRunning[THREAD_RPCSERVER]--;
2673-
acceptor.accept(sslStream.lowest_layer(), peer);
2674-
vnThreadsRunning[4]++;
2683+
AcceptedConnection *conn =
2684+
new AcceptedConnection(io_service, context, fUseSSL);
2685+
2686+
vnThreadsRunning[THREAD_RPCLISTENER]--;
2687+
acceptor.accept(conn->sslStream.lowest_layer(), conn->peer);
2688+
vnThreadsRunning[THREAD_RPCLISTENER]++;
2689+
26752690
if (fShutdown)
2691+
{
2692+
delete conn;
26762693
return;
2694+
}
26772695

2678-
// Restrict callers by IP
2679-
if (!ClientAllowed(peer.address().to_string()))
2696+
// Restrict callers by IP. It is important to
2697+
// do this before starting client thread, to filter out
2698+
// certain DoS and misbehaving clients.
2699+
if (!ClientAllowed(conn->peer.address().to_string()))
26802700
{
26812701
// Only send a 403 if we're not using SSL to prevent a DoS during the SSL handshake.
26822702
if (!fUseSSL)
2683-
stream << HTTPReply(403, "") << std::flush;
2684-
continue;
2703+
conn->stream << HTTPReply(403, "") << std::flush;
2704+
delete conn;
26852705
}
26862706

2707+
// start HTTP client thread
2708+
else if (!CreateThread(ThreadRPCServer3, conn)) {
2709+
printf("Failed to create RPC server client thread\n");
2710+
delete conn;
2711+
}
2712+
}
2713+
}
2714+
2715+
void ThreadRPCServer3(void* parg)
2716+
{
2717+
IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer3(parg));
2718+
vnThreadsRunning[THREAD_RPCHANDLER]++;
2719+
AcceptedConnection *conn = (AcceptedConnection *) parg;
2720+
2721+
do {
26872722
map<string, string> mapHeaders;
26882723
string strRequest;
26892724

2690-
boost::thread api_caller(ReadHTTP, boost::ref(stream), boost::ref(mapHeaders), boost::ref(strRequest));
2691-
if (!api_caller.timed_join(boost::posix_time::seconds(GetArg("-rpctimeout", 30))))
2692-
{ // Timed out:
2693-
acceptor.cancel();
2694-
printf("ThreadRPCServer ReadHTTP timeout\n");
2695-
continue;
2696-
}
2725+
ReadHTTP(conn->stream, mapHeaders, strRequest);
26972726

26982727
// Check authorization
26992728
if (mapHeaders.count("authorization") == 0)
27002729
{
2701-
stream << HTTPReply(401, "") << std::flush;
2702-
continue;
2730+
conn->stream << HTTPReply(401, "") << std::flush;
2731+
break;
27032732
}
27042733
if (!HTTPAuthorized(mapHeaders))
27052734
{
2706-
printf("ThreadRPCServer incorrect password attempt from %s\n",peer.address().to_string().c_str());
2735+
printf("ThreadRPCServer incorrect password attempt from %s\n", conn->peer.address().to_string().c_str());
27072736
/* Deter brute-forcing short passwords.
27082737
If this results in a DOS the user really
27092738
shouldn't have their RPC port exposed.*/
27102739
if (mapArgs["-rpcpassword"].size() < 20)
27112740
Sleep(250);
27122741

2713-
stream << HTTPReply(401, "") << std::flush;
2714-
continue;
2742+
conn->stream << HTTPReply(401, "") << std::flush;
2743+
break;
27152744
}
27162745

27172746
Value id = Value::null;
@@ -2750,17 +2779,22 @@ void ThreadRPCServer2(void* parg)
27502779

27512780
// Send reply
27522781
string strReply = JSONRPCReply(result, Value::null, id);
2753-
stream << HTTPReply(200, strReply) << std::flush;
2782+
conn->stream << HTTPReply(200, strReply) << std::flush;
27542783
}
27552784
catch (Object& objError)
27562785
{
2757-
ErrorReply(stream, objError, id);
2786+
ErrorReply(conn->stream, objError, id);
2787+
break;
27582788
}
27592789
catch (std::exception& e)
27602790
{
2761-
ErrorReply(stream, JSONRPCError(-32700, e.what()), id);
2791+
ErrorReply(conn->stream, JSONRPCError(-32700, e.what()), id);
2792+
break;
27622793
}
27632794
}
2795+
while (0);
2796+
delete conn;
2797+
vnThreadsRunning[THREAD_RPCHANDLER]--;
27642798
}
27652799

27662800
json_spirit::Value CRPCTable::execute(const std::string &strMethod, const json_spirit::Array &params) const

src/bitcoinrpc.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <string>
1010
#include <map>
1111

12+
#define BOOST_SPIRIT_THREADSAFE
1213
#include "json/json_spirit_reader_template.h"
1314
#include "json/json_spirit_writer_template.h"
1415
#include "json/json_spirit_utils.h"

src/net.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1839,12 +1839,13 @@ bool StopNode()
18391839
if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n");
18401840
if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n");
18411841
if (vnThreadsRunning[THREAD_MINER] > 0) printf("ThreadBitcoinMiner still running\n");
1842-
if (vnThreadsRunning[THREAD_RPCSERVER] > 0) printf("ThreadRPCServer still running\n");
1842+
if (vnThreadsRunning[THREAD_RPCLISTENER] > 0) printf("ThreadRPCListener still running\n");
1843+
if (vnThreadsRunning[THREAD_RPCHANDLER] > 0) printf("ThreadsRPCServer still running\n");
18431844
if (fHaveUPnP && vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n");
18441845
if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n");
18451846
if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n");
18461847
if (vnThreadsRunning[THREAD_DUMPADDRESS] > 0) printf("ThreadDumpAddresses still running\n");
1847-
while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCSERVER] > 0)
1848+
while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCHANDLER] > 0)
18481849
Sleep(20);
18491850
Sleep(50);
18501851
DumpAddresses();

src/net.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,12 @@ enum threadId
9292
THREAD_OPENCONNECTIONS,
9393
THREAD_MESSAGEHANDLER,
9494
THREAD_MINER,
95-
THREAD_RPCSERVER,
95+
THREAD_RPCLISTENER,
9696
THREAD_UPNP,
9797
THREAD_DNSSEED,
9898
THREAD_ADDEDCONNECTIONS,
9999
THREAD_DUMPADDRESS,
100+
THREAD_RPCHANDLER,
100101

101102
THREAD_MAX
102103
};

0 commit comments

Comments
 (0)