Skip to content

Commit b34c5f3

Browse files
author
Jeff Garzik
committed
Merge pull request #1101 from jgarzik/http11
Multithreaded JSON-RPC with HTTP 1.1 Keep-Alive support
2 parents eb79342 + 96c5269 commit b34c5f3

File tree

4 files changed

+103
-40
lines changed

4 files changed

+103
-40
lines changed

src/bitcoinrpc.cpp

Lines changed: 97 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ extern Value importprivkey(const Array& params, bool fHelp);
4646

4747
const Object emptyobj;
4848

49+
void ThreadRPCServer3(void* parg);
50+
4951
Object JSONRPCError(int code, const string& message)
5052
{
5153
Object error;
@@ -2021,7 +2023,7 @@ Value getwork(const Array& params, bool fHelp)
20212023
throw JSONRPCError(-10, "Bitcoin is downloading blocks...");
20222024

20232025
typedef map<uint256, pair<CBlock*, CScript> > mapNewBlock_t;
2024-
static mapNewBlock_t mapNewBlock;
2026+
static mapNewBlock_t mapNewBlock; // FIXME: thread safety
20252027
static vector<CBlock*> vNewBlock;
20262028
static CReserveKey reservekey(pwalletMain);
20272029

@@ -2355,7 +2357,7 @@ string rfc1123Time()
23552357
return string(buffer);
23562358
}
23572359

2358-
static string HTTPReply(int nStatus, const string& strMsg)
2360+
static string HTTPReply(int nStatus, const string& strMsg, bool keepalive)
23592361
{
23602362
if (nStatus == 401)
23612363
return strprintf("HTTP/1.0 401 Authorization Required\r\n"
@@ -2384,7 +2386,7 @@ static string HTTPReply(int nStatus, const string& strMsg)
23842386
return strprintf(
23852387
"HTTP/1.1 %d %s\r\n"
23862388
"Date: %s\r\n"
2387-
"Connection: close\r\n"
2389+
"Connection: %s\r\n"
23882390
"Content-Length: %d\r\n"
23892391
"Content-Type: application/json\r\n"
23902392
"Server: bitcoin-json-rpc/%s\r\n"
@@ -2393,19 +2395,24 @@ static string HTTPReply(int nStatus, const string& strMsg)
23932395
nStatus,
23942396
cStatus,
23952397
rfc1123Time().c_str(),
2398+
keepalive ? "keep-alive" : "close",
23962399
strMsg.size(),
23972400
FormatFullVersion().c_str(),
23982401
strMsg.c_str());
23992402
}
24002403

2401-
int ReadHTTPStatus(std::basic_istream<char>& stream)
2404+
int ReadHTTPStatus(std::basic_istream<char>& stream, int &proto)
24022405
{
24032406
string str;
24042407
getline(stream, str);
24052408
vector<string> vWords;
24062409
boost::split(vWords, str, boost::is_any_of(" "));
24072410
if (vWords.size() < 2)
24082411
return 500;
2412+
proto = 0;
2413+
const char *ver = strstr(str.c_str(), "HTTP/1.");
2414+
if (ver != NULL)
2415+
proto = atoi(ver+7);
24092416
return atoi(vWords[1].c_str());
24102417
}
24112418

@@ -2440,7 +2447,8 @@ int ReadHTTP(std::basic_istream<char>& stream, map<string, string>& mapHeadersRe
24402447
strMessageRet = "";
24412448

24422449
// Read status
2443-
int nStatus = ReadHTTPStatus(stream);
2450+
int nProto;
2451+
int nStatus = ReadHTTPStatus(stream, nProto);
24442452

24452453
// Read header
24462454
int nLen = ReadHTTPHeader(stream, mapHeadersRet);
@@ -2455,6 +2463,16 @@ int ReadHTTP(std::basic_istream<char>& stream, map<string, string>& mapHeadersRe
24552463
strMessageRet = string(vch.begin(), vch.end());
24562464
}
24572465

2466+
string sConHdr = mapHeadersRet["connection"];
2467+
2468+
if ((sConHdr != "close") && (sConHdr != "keep-alive"))
2469+
{
2470+
if (nProto >= 1)
2471+
mapHeadersRet["connection"] = "keep-alive";
2472+
else
2473+
mapHeadersRet["connection"] = "close";
2474+
}
2475+
24582476
return nStatus;
24592477
}
24602478

@@ -2507,7 +2525,7 @@ void ErrorReply(std::ostream& stream, const Object& objError, const Value& id)
25072525
if (code == -32600) nStatus = 400;
25082526
else if (code == -32601) nStatus = 404;
25092527
string strReply = JSONRPCReply(Value::null, objError, id);
2510-
stream << HTTPReply(nStatus, strReply) << std::flush;
2528+
stream << HTTPReply(nStatus, strReply, false) << std::flush;
25112529
}
25122530

25132531
bool ClientAllowed(const string& strAddress)
@@ -2573,20 +2591,34 @@ class SSLIOStreamDevice : public iostreams::device<iostreams::bidirectional> {
25732591
SSLStream& stream;
25742592
};
25752593

2594+
class AcceptedConnection
2595+
{
2596+
public:
2597+
SSLStream sslStream;
2598+
SSLIOStreamDevice d;
2599+
iostreams::stream<SSLIOStreamDevice> stream;
2600+
2601+
ip::tcp::endpoint peer;
2602+
2603+
AcceptedConnection(asio::io_service &io_service, ssl::context &context,
2604+
bool fUseSSL) : sslStream(io_service, context), d(sslStream, fUseSSL),
2605+
stream(d) { ; }
2606+
};
2607+
25762608
void ThreadRPCServer(void* parg)
25772609
{
25782610
IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer(parg));
25792611
try
25802612
{
2581-
vnThreadsRunning[THREAD_RPCSERVER]++;
2613+
vnThreadsRunning[THREAD_RPCLISTENER]++;
25822614
ThreadRPCServer2(parg);
2583-
vnThreadsRunning[THREAD_RPCSERVER]--;
2615+
vnThreadsRunning[THREAD_RPCLISTENER]--;
25842616
}
25852617
catch (std::exception& e) {
2586-
vnThreadsRunning[THREAD_RPCSERVER]--;
2618+
vnThreadsRunning[THREAD_RPCLISTENER]--;
25872619
PrintException(&e, "ThreadRPCServer()");
25882620
} catch (...) {
2589-
vnThreadsRunning[THREAD_RPCSERVER]--;
2621+
vnThreadsRunning[THREAD_RPCLISTENER]--;
25902622
PrintException(NULL, "ThreadRPCServer()");
25912623
}
25922624
printf("ThreadRPCServer exiting\n");
@@ -2664,55 +2696,78 @@ void ThreadRPCServer2(void* parg)
26642696
loop
26652697
{
26662698
// Accept connection
2667-
SSLStream sslStream(io_service, context);
2668-
SSLIOStreamDevice d(sslStream, fUseSSL);
2669-
iostreams::stream<SSLIOStreamDevice> stream(d);
2670-
2671-
ip::tcp::endpoint peer;
2672-
vnThreadsRunning[THREAD_RPCSERVER]--;
2673-
acceptor.accept(sslStream.lowest_layer(), peer);
2674-
vnThreadsRunning[4]++;
2699+
AcceptedConnection *conn =
2700+
new AcceptedConnection(io_service, context, fUseSSL);
2701+
2702+
vnThreadsRunning[THREAD_RPCLISTENER]--;
2703+
acceptor.accept(conn->sslStream.lowest_layer(), conn->peer);
2704+
vnThreadsRunning[THREAD_RPCLISTENER]++;
2705+
26752706
if (fShutdown)
2707+
{
2708+
delete conn;
26762709
return;
2710+
}
26772711

2678-
// Restrict callers by IP
2679-
if (!ClientAllowed(peer.address().to_string()))
2712+
// Restrict callers by IP. It is important to
2713+
// do this before starting client thread, to filter out
2714+
// certain DoS and misbehaving clients.
2715+
if (!ClientAllowed(conn->peer.address().to_string()))
26802716
{
26812717
// Only send a 403 if we're not using SSL to prevent a DoS during the SSL handshake.
26822718
if (!fUseSSL)
2683-
stream << HTTPReply(403, "") << std::flush;
2684-
continue;
2719+
conn->stream << HTTPReply(403, "", false) << std::flush;
2720+
delete conn;
2721+
}
2722+
2723+
// start HTTP client thread
2724+
else if (!CreateThread(ThreadRPCServer3, conn)) {
2725+
printf("Failed to create RPC server client thread\n");
2726+
delete conn;
26852727
}
2728+
}
2729+
}
2730+
2731+
void ThreadRPCServer3(void* parg)
2732+
{
2733+
IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer3(parg));
2734+
vnThreadsRunning[THREAD_RPCHANDLER]++;
2735+
AcceptedConnection *conn = (AcceptedConnection *) parg;
26862736

2737+
bool fRun = true;
2738+
loop {
2739+
if (fShutdown || !fRun)
2740+
{
2741+
conn->stream.close();
2742+
delete conn;
2743+
--vnThreadsRunning[THREAD_RPCHANDLER];
2744+
return;
2745+
}
26872746
map<string, string> mapHeaders;
26882747
string strRequest;
26892748

2690-
boost::thread api_caller(ReadHTTP, boost::ref(stream), boost::ref(mapHeaders), boost::ref(strRequest));
2691-
if (!api_caller.timed_join(boost::posix_time::seconds(GetArg("-rpctimeout", 30))))
2692-
{ // Timed out:
2693-
acceptor.cancel();
2694-
printf("ThreadRPCServer ReadHTTP timeout\n");
2695-
continue;
2696-
}
2749+
ReadHTTP(conn->stream, mapHeaders, strRequest);
26972750

26982751
// Check authorization
26992752
if (mapHeaders.count("authorization") == 0)
27002753
{
2701-
stream << HTTPReply(401, "") << std::flush;
2702-
continue;
2754+
conn->stream << HTTPReply(401, "", false) << std::flush;
2755+
break;
27032756
}
27042757
if (!HTTPAuthorized(mapHeaders))
27052758
{
2706-
printf("ThreadRPCServer incorrect password attempt from %s\n",peer.address().to_string().c_str());
2759+
printf("ThreadRPCServer incorrect password attempt from %s\n", conn->peer.address().to_string().c_str());
27072760
/* Deter brute-forcing short passwords.
27082761
If this results in a DOS the user really
27092762
shouldn't have their RPC port exposed.*/
27102763
if (mapArgs["-rpcpassword"].size() < 20)
27112764
Sleep(250);
27122765

2713-
stream << HTTPReply(401, "") << std::flush;
2714-
continue;
2766+
conn->stream << HTTPReply(401, "", false) << std::flush;
2767+
break;
27152768
}
2769+
if (mapHeaders["connection"] == "close")
2770+
fRun = false;
27162771

27172772
Value id = Value::null;
27182773
try
@@ -2750,17 +2805,22 @@ void ThreadRPCServer2(void* parg)
27502805

27512806
// Send reply
27522807
string strReply = JSONRPCReply(result, Value::null, id);
2753-
stream << HTTPReply(200, strReply) << std::flush;
2808+
conn->stream << HTTPReply(200, strReply, fRun) << std::flush;
27542809
}
27552810
catch (Object& objError)
27562811
{
2757-
ErrorReply(stream, objError, id);
2812+
ErrorReply(conn->stream, objError, id);
2813+
break;
27582814
}
27592815
catch (std::exception& e)
27602816
{
2761-
ErrorReply(stream, JSONRPCError(-32700, e.what()), id);
2817+
ErrorReply(conn->stream, JSONRPCError(-32700, e.what()), id);
2818+
break;
27622819
}
27632820
}
2821+
2822+
delete conn;
2823+
vnThreadsRunning[THREAD_RPCHANDLER]--;
27642824
}
27652825

27662826
json_spirit::Value CRPCTable::execute(const std::string &strMethod, const json_spirit::Array &params) const

src/bitcoinrpc.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <string>
1010
#include <map>
1111

12+
#define BOOST_SPIRIT_THREADSAFE
1213
#include "json/json_spirit_reader_template.h"
1314
#include "json/json_spirit_writer_template.h"
1415
#include "json/json_spirit_utils.h"

src/net.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1839,12 +1839,13 @@ bool StopNode()
18391839
if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n");
18401840
if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n");
18411841
if (vnThreadsRunning[THREAD_MINER] > 0) printf("ThreadBitcoinMiner still running\n");
1842-
if (vnThreadsRunning[THREAD_RPCSERVER] > 0) printf("ThreadRPCServer still running\n");
1842+
if (vnThreadsRunning[THREAD_RPCLISTENER] > 0) printf("ThreadRPCListener still running\n");
1843+
if (vnThreadsRunning[THREAD_RPCHANDLER] > 0) printf("ThreadsRPCServer still running\n");
18431844
if (fHaveUPnP && vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n");
18441845
if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n");
18451846
if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n");
18461847
if (vnThreadsRunning[THREAD_DUMPADDRESS] > 0) printf("ThreadDumpAddresses still running\n");
1847-
while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCSERVER] > 0)
1848+
while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCHANDLER] > 0)
18481849
Sleep(20);
18491850
Sleep(50);
18501851
DumpAddresses();

src/net.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,12 @@ enum threadId
9292
THREAD_OPENCONNECTIONS,
9393
THREAD_MESSAGEHANDLER,
9494
THREAD_MINER,
95-
THREAD_RPCSERVER,
95+
THREAD_RPCLISTENER,
9696
THREAD_UPNP,
9797
THREAD_DNSSEED,
9898
THREAD_ADDEDCONNECTIONS,
9999
THREAD_DUMPADDRESS,
100+
THREAD_RPCHANDLER,
100101

101102
THREAD_MAX
102103
};

0 commit comments

Comments
 (0)