Skip to content

Commit 1a9f19a

Browse files
committed
Merge pull request #6719
ec908d5 http: Force-exit event loop after predefined time (Wladimir J. van der Laan) de9de2d http: Wait for worker threads to exit (Wladimir J. van der Laan) 5e0c221 Make HTTP server shutdown more graceful (Wladimir J. van der Laan)
2 parents ad57b31 + ec908d5 commit 1a9f19a

File tree

2 files changed

+67
-10
lines changed

2 files changed

+67
-10
lines changed

src/httpserver.cpp

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,35 @@ class WorkQueue
7272
std::deque<WorkItem*> queue;
7373
bool running;
7474
size_t maxDepth;
75+
int numThreads;
76+
77+
/** RAII object to keep track of number of running worker threads */
78+
class ThreadCounter
79+
{
80+
public:
81+
WorkQueue &wq;
82+
ThreadCounter(WorkQueue &w): wq(w)
83+
{
84+
boost::lock_guard<boost::mutex> lock(wq.cs);
85+
wq.numThreads += 1;
86+
}
87+
~ThreadCounter()
88+
{
89+
boost::lock_guard<boost::mutex> lock(wq.cs);
90+
wq.numThreads -= 1;
91+
wq.cond.notify_all();
92+
}
93+
};
7594

7695
public:
7796
WorkQueue(size_t maxDepth) : running(true),
78-
maxDepth(maxDepth)
97+
maxDepth(maxDepth),
98+
numThreads(0)
7999
{
80100
}
81-
/* Precondition: worker threads have all stopped */
101+
/*( Precondition: worker threads have all stopped
102+
* (call WaitExit)
103+
*/
82104
~WorkQueue()
83105
{
84106
while (!queue.empty()) {
@@ -100,6 +122,7 @@ class WorkQueue
100122
/** Thread function */
101123
void Run()
102124
{
125+
ThreadCounter count(*this);
103126
while (running) {
104127
WorkItem* i = 0;
105128
{
@@ -122,6 +145,13 @@ class WorkQueue
122145
running = false;
123146
cond.notify_all();
124147
}
148+
/** Wait for worker threads to exit */
149+
void WaitExit()
150+
{
151+
boost::unique_lock<boost::mutex> lock(cs);
152+
while (numThreads > 0)
153+
cond.wait(lock);
154+
}
125155

126156
/** Return current depth of queue */
127157
size_t Depth()
@@ -155,6 +185,8 @@ static std::vector<CSubNet> rpc_allow_subnets;
155185
static WorkQueue<HTTPClosure>* workQueue = 0;
156186
//! Handlers for (sub)paths
157187
std::vector<HTTPPathHandler> pathHandlers;
188+
//! Bound listening sockets
189+
std::vector<evhttp_bound_socket *> boundSockets;
158190

159191
/** Check if a network address is allowed to access the HTTP server */
160192
static bool ClientAllowed(const CNetAddr& netaddr)
@@ -264,6 +296,13 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
264296
}
265297
}
266298

299+
/** Callback to reject HTTP requests after shutdown. */
300+
static void http_reject_request_cb(struct evhttp_request* req, void*)
301+
{
302+
LogPrint("http", "Rejecting request while shutting down\n");
303+
evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
304+
}
305+
267306
/** Event dispatcher thread */
268307
static void ThreadHTTP(struct event_base* base, struct evhttp* http)
269308
{
@@ -278,7 +317,6 @@ static void ThreadHTTP(struct event_base* base, struct evhttp* http)
278317
static bool HTTPBindAddresses(struct evhttp* http)
279318
{
280319
int defaultPort = GetArg("-rpcport", BaseParams().RPCPort());
281-
int nBound = 0;
282320
std::vector<std::pair<std::string, uint16_t> > endpoints;
283321

284322
// Determine what addresses to bind to
@@ -304,13 +342,14 @@ static bool HTTPBindAddresses(struct evhttp* http)
304342
// Bind addresses
305343
for (std::vector<std::pair<std::string, uint16_t> >::iterator i = endpoints.begin(); i != endpoints.end(); ++i) {
306344
LogPrint("http", "Binding RPC on address %s port %i\n", i->first, i->second);
307-
if (evhttp_bind_socket(http, i->first.empty() ? NULL : i->first.c_str(), i->second) == 0) {
308-
nBound += 1;
345+
evhttp_bound_socket *bind_handle = evhttp_bind_socket_with_handle(http, i->first.empty() ? NULL : i->first.c_str(), i->second);
346+
if (bind_handle) {
347+
boundSockets.push_back(bind_handle);
309348
} else {
310349
LogPrintf("Binding RPC on address %s port %i failed.\n", i->first, i->second);
311350
}
312351
}
313-
return nBound > 0;
352+
return !boundSockets.empty();
314353
}
315354

316355
/** Simple wrapper to set thread name and run work queue */
@@ -410,16 +449,33 @@ bool StartHTTPServer(boost::thread_group& threadGroup)
410449
void InterruptHTTPServer()
411450
{
412451
LogPrint("http", "Interrupting HTTP server\n");
413-
if (eventBase)
414-
event_base_loopbreak(eventBase);
452+
if (eventHTTP) {
453+
// Unlisten sockets
454+
BOOST_FOREACH (evhttp_bound_socket *socket, boundSockets) {
455+
evhttp_del_accept_socket(eventHTTP, socket);
456+
}
457+
// Reject requests on current connections
458+
evhttp_set_gencb(eventHTTP, http_reject_request_cb, NULL);
459+
}
460+
if (eventBase) {
461+
// Force-exit event loop after predefined time
462+
struct timeval tv;
463+
tv.tv_sec = 10;
464+
tv.tv_usec = 0;
465+
event_base_loopexit(eventBase, &tv);
466+
}
415467
if (workQueue)
416468
workQueue->Interrupt();
417469
}
418470

419471
void StopHTTPServer()
420472
{
421473
LogPrint("http", "Stopping HTTP server\n");
422-
delete workQueue;
474+
if (workQueue) {
475+
LogPrint("http", "Waiting for HTTP worker threads to exit\n");
476+
workQueue->WaitExit();
477+
delete workQueue;
478+
}
423479
if (eventHTTP) {
424480
evhttp_free(eventHTTP);
425481
eventHTTP = 0;

src/rpcserver.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,8 @@ UniValue stop(const UniValue& params, bool fHelp)
243243
throw runtime_error(
244244
"stop\n"
245245
"\nStop Bitcoin server.");
246-
// Shutdown will take long enough that the response should get back
246+
// Event loop will exit after current HTTP requests have been handled, so
247+
// this reply will get back to the client.
247248
StartShutdown();
248249
return "Bitcoin server stopping";
249250
}

0 commit comments

Comments
 (0)