Skip to content

Commit bbb1bc7

Browse files
committed
MB-25822: Create separate mutex for pending_io
This is a recreate of commit 0800742 against our spock branch. Lots of refactoring has occurred between spock to vulcan, meaning a simple cherry-pick of the original patch was not easy. This patch recreates the original intent and switches the pending-io from a list to std::map (rather than the original switch from std::set to std::map) The LIBEVENT_THREAD::mutex currently guards the whole state of the worker thread - as such it is acquired by the worker thread when it starts processing event(s) for a connection, and only unlocked when it finishes with that connection. The state guarded includes the pending_io set; which is (primarily): a) Read at the start of the event loop (to re-register connections in libevent). b) Written by notify_io_complete when background threads want to wakeup frontend threads. Profiling of the LIBEVENT_THREAD::mutex wait/lock times shows that callers of notify_io_complete (notably ConnManager and DCP ConnNotifier task) can end up waiting for long periods (~100ms in local testing) to simply add a connection to the pending set. To improve this, add a separate mutex which is only used for the pending_io set. This also requires a change to the data structure for the pending_io - we need to additionally record the pending status code against each connection. This is because previously this status (aiostat) was updated with LIBEVENT_THREAD::mutex held; therefore no worker thread could possibly be running. With the separate mutex this is no longer the case and we cannot simply write to aiostat as it may be being accessed by the worker. We obviously don't want to have to acquire LIBEVENT_THREAD::mutex to update it as that introduces the same problem. Instead we record the pending status with the connection in pending_io; and when we come to process pending_io in the worker thread (which already holds LIBEVENT_THREAD::mutex) we update aiostat with the pending status before running it's event loop). Change-Id: Ifad49f12d2249b92d4ffd3a761e353bb85d0d2d5 Reviewed-on: http://review.couchbase.org/94350 Well-Formed: Build Bot <[email protected]> Reviewed-by: Trond Norbye <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent ed140ef commit bbb1bc7

File tree

4 files changed

+64
-73
lines changed

4 files changed

+64
-73
lines changed

daemon/connections.cc

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -297,12 +297,18 @@ void conn_close(McbpConnection *c) {
297297
if (thread == nullptr) {
298298
throw std::logic_error("conn_close: unable to obtain non-NULL thread from connection");
299299
}
300-
/* remove from pending-io list */
301-
if (settings.getVerbose() > 1 && list_contains(thread->pending_io, c)) {
302-
LOG_WARNING(c,
303-
"Current connection was in the pending-io list.. Nuking it");
300+
301+
// remove from pending-io list
302+
{
303+
std::lock_guard<std::mutex> lock(*thread->pending_io.mutex);
304+
/* remove from pending-io list */
305+
if (settings.getVerbose() > 1 && thread->pending_io.map->count(c)) {
306+
LOG_WARNING(c,
307+
"Current connection was in the pending-io list.. "
308+
"Nuking it");
309+
}
310+
thread->pending_io.map->erase(c);
304311
}
305-
thread->pending_io = list_remove(thread->pending_io, c);
306312

307313
conn_cleanup(c);
308314

daemon/memcached.cc

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,6 +1025,16 @@ void event_handler(evutil_socket_t fd, short which, void *arg) {
10251025
return;
10261026
}
10271027

1028+
// Remove the list from the list of pending io's (in case the
1029+
// object was scheduled to run in the dispatcher before the
1030+
// callback for the worker thread is executed.
1031+
//
1032+
{
1033+
std::lock_guard<std::mutex> lock(*thr->pending_io.mutex);
1034+
thr->pending_io.map->erase(c);
1035+
}
1036+
1037+
10281038
LOCK_THREAD(thr);
10291039
if (memcached_shutdown) {
10301040
// Someone requested memcached to shut down.
@@ -1036,13 +1046,6 @@ void event_handler(evutil_socket_t fd, short which, void *arg) {
10361046
}
10371047
}
10381048

1039-
/*
1040-
* Remove the list from the list of pending io's (in case the
1041-
* object was scheduled to run in the dispatcher before the
1042-
* callback for the worker thread is executed.
1043-
*/
1044-
thr->pending_io = list_remove(thr->pending_io, c);
1045-
10461049
/* sanity */
10471050
cb_assert(fd == c->getSocketDescriptor());
10481051

@@ -1684,7 +1687,7 @@ static ENGINE_ERROR_CODE release_cookie(const void *void_cookie) {
16841687
}
16851688

16861689
Connection *c = cookie->connection;
1687-
int notify;
1690+
int notify = 0;
16881691
LIBEVENT_THREAD *thr;
16891692

16901693
thr = c->getThread();
@@ -1698,7 +1701,7 @@ static ENGINE_ERROR_CODE release_cookie(const void *void_cookie) {
16981701
* pending IO and have the system retry the operation for the
16991702
* connection
17001703
*/
1701-
notify = add_conn_to_pending_io_list(c);
1704+
notify = add_conn_to_pending_io_list(c, ENGINE_SUCCESS);
17021705
UNLOCK_THREAD(thr);
17031706

17041707
/* kick the thread in the butt */

daemon/memcached.h

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,28 @@ class Connection;
7272
class ConnectionQueue;
7373

7474
struct LIBEVENT_THREAD {
75+
/**
76+
* Pending IO requests for this thread. Maps each pending Connection to
77+
* the IO status to be notified.
78+
*/
79+
using PendingIoMap = std::unordered_map<Connection*, ENGINE_ERROR_CODE>;
80+
7581
cb_thread_t thread_id; /* unique ID of this thread */
7682
struct event_base *base; /* libevent handle this thread uses */
7783
struct event notify_event; /* listen event for notify pipe */
7884
SOCKET notify[2]; /* notification pipes */
7985
ConnectionQueue *new_conn_queue; /* queue of new connections to handle */
80-
cb_mutex_t mutex; /* Mutex to lock protect access to the pending_io */
86+
cb_mutex_t mutex; /* Mutex for access to this structure */
8187
bool is_locked;
82-
Connection *pending_io; /* List of connection with pending async io ops */
88+
89+
struct {
90+
// Note: these are pointers as this struct is calloc'd and we need to
91+
// ensure correct construct/destruct of these objects on all platforms
92+
// which is much simpler if we new/delete these two objects
93+
std::mutex* mutex; /// Mutex for access to the PendingIoMap
94+
PendingIoMap* map; /// map of connection with pending async io ops
95+
} pending_io;
96+
8397
int index; /* index of this thread in the threads array */
8498
ThreadType type; /* Type of IO this thread processes */
8599

@@ -171,7 +185,7 @@ Connection *list_remove(Connection *h, Connection *n);
171185

172186
bool load_extension(const char *soname, const char *config);
173187

174-
int add_conn_to_pending_io_list(Connection *c);
188+
int add_conn_to_pending_io_list(Connection* c, ENGINE_ERROR_CODE status);
175189

176190
/* connection state machine */
177191
bool conn_listening(ListenConnection *c);

daemon/thread.cc

Lines changed: 24 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,9 @@ static void setup_thread(LIBEVENT_THREAD *me) {
205205

206206
cb_mutex_initialize(&me->mutex);
207207

208+
me->pending_io.mutex = new std::mutex();
209+
me->pending_io.map = new LIBEVENT_THREAD::PendingIoMap();
210+
208211
// Initialize threads' sub-document parser / handler
209212
me->subdoc_op = subdoc_op_alloc();
210213

@@ -236,16 +239,6 @@ static void worker_libevent(void *arg) {
236239
ERR_remove_state(0);
237240
}
238241

239-
static int number_of_pending(Connection *c, Connection *list) {
240-
int rv = 0;
241-
for (; list; list = list->getNext()) {
242-
if (list == c) {
243-
rv ++;
244-
}
245-
}
246-
return rv;
247-
}
248-
249242
static void drain_notification_channel(evutil_socket_t fd)
250243
{
251244
int nread;
@@ -309,14 +302,19 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
309302

310303
dispatch_new_connections(me);
311304

305+
LIBEVENT_THREAD::PendingIoMap pending;
306+
{
307+
std::lock_guard<std::mutex> lock(*me->pending_io.mutex);
308+
me->pending_io.map->swap(pending);
309+
}
310+
312311
LOCK_THREAD(me);
313-
Connection* pending = me->pending_io;
314-
me->pending_io = NULL;
315-
while (pending != NULL) {
316-
Connection *c = pending;
312+
313+
for (auto io : pending) {
314+
auto* c = io.first;
315+
auto status = io.second;
316+
317317
cb_assert(me == c->getThread());
318-
pending = pending->getNext();
319-
c->setNext(nullptr);
320318

321319
auto *mcbp = dynamic_cast<McbpConnection*>(c);
322320
if (mcbp != nullptr) {
@@ -332,6 +330,7 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
332330
* from the context of the notification pipe, so just let it
333331
* run one time to set up the correct mask in libevent
334332
*/
333+
mcbp->setAiostat(status);
335334
mcbp->setNumEvents(1);
336335
}
337336
run_event_loop(c, EV_READ|EV_WRITE);
@@ -364,23 +363,6 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
364363

365364
extern volatile rel_time_t current_time;
366365

367-
static bool has_cycle(Connection *c) {
368-
Connection *slowNode, *fastNode1, *fastNode2;
369-
370-
if (!c) {
371-
return false;
372-
}
373-
374-
slowNode = fastNode1 = fastNode2 = c;
375-
while (slowNode && (fastNode1 = fastNode2->getNext()) && (fastNode2 = fastNode1->getNext())) {
376-
if (slowNode == fastNode1 || slowNode == fastNode2) {
377-
return true;
378-
}
379-
slowNode = slowNode->getNext();
380-
}
381-
return false;
382-
}
383-
384366
bool list_contains(Connection *haystack, Connection *needle) {
385367
for (; haystack; haystack = haystack->getNext()) {
386368
if (needle == haystack) {
@@ -406,17 +388,6 @@ Connection * list_remove(Connection *haystack, Connection *needle) {
406388
return haystack;
407389
}
408390

409-
static void enlist_conn(Connection *c, Connection **list) {
410-
LIBEVENT_THREAD *thr = c->getThread();
411-
cb_assert(list == &thr->pending_io);
412-
cb_assert(!list_contains(thr->pending_io, c));
413-
cb_assert(c->getNext() == nullptr);
414-
c->setNext(*list);
415-
*list = c;
416-
cb_assert(list_contains(*list, c));
417-
cb_assert(!has_cycle(*list));
418-
}
419-
420391
void notify_io_complete(const void *void_cookie, ENGINE_ERROR_CODE status)
421392
{
422393
if (void_cookie == nullptr) {
@@ -445,10 +416,7 @@ void notify_io_complete(const void *void_cookie, ENGINE_ERROR_CODE status)
445416
LOG_DEBUG(NULL, "Got notify from %u, status 0x%x",
446417
connection->getId(), status);
447418

448-
LOCK_THREAD(thr);
449-
reinterpret_cast<McbpConnection*>(connection)->setAiostat(status);
450-
notify = add_conn_to_pending_io_list(connection);
451-
UNLOCK_THREAD(thr);
419+
notify = add_conn_to_pending_io_list(connection, status);
452420

453421
/* kick the thread in the butt */
454422
if (notify) {
@@ -578,6 +546,8 @@ void threads_cleanup(void)
578546
subdoc_op_free(threads[ii].subdoc_op);
579547
delete threads[ii].validator;
580548
delete threads[ii].new_conn_queue;
549+
delete threads[ii].pending_io.mutex;
550+
delete threads[ii].pending_io.map;
581551
}
582552

583553
cb_free(thread_ids);
@@ -620,15 +590,13 @@ void notify_thread(LIBEVENT_THREAD *thread) {
620590
}
621591
}
622592

623-
int add_conn_to_pending_io_list(Connection *c) {
624-
int notify = 0;
593+
int add_conn_to_pending_io_list(Connection* c, ENGINE_ERROR_CODE status) {
625594
auto thread = c->getThread();
626-
if (number_of_pending(c, thread->pending_io) == 0) {
627-
if (thread->pending_io == NULL) {
628-
notify = 1;
629-
}
630-
enlist_conn(c, &thread->pending_io);
595+
std::pair<LIBEVENT_THREAD::PendingIoMap::iterator, bool> result;
596+
{
597+
std::lock_guard<std::mutex> lock(*thread->pending_io.mutex);
598+
result = thread->pending_io.map->emplace(c, status);
631599
}
632-
600+
int notify = result.second ? 1 : 0;
633601
return notify;
634602
}

0 commit comments

Comments
 (0)