Skip to content

Commit 18b3ad8

Browse files
committed
MB-32687: Backport signalIfIdle refactor
signalIfIdle used to update the event mask in libevent to add a write event to make the socket available. Libevent would then pick up that socket next time it went down to the OS to do a poll on the sockets. The problem is that when we try to update the event mask for a socket libevent will try to acquire the mutex used by the event base in order to do that. If the code was called from another worker thread it would hold it's own lock on the event base (and we're subject for a deadlock). The new implementation of signalIfIdle push the connection onto a list of connections to signal for the worker thread, before it notifies the worker thread by writing to the notification pipe for the worker thread. The callback for the worker thread swaps out the list of connections, and runs the state machine for each of the connections in there. Change-Id: Ic0ba5de83b8f924a8b7457c65d0e91fbf7927f7d Reviewed-on: http://review.couchbase.org/105714 Tested-by: Build Bot <[email protected]> Well-Formed: Build Bot <[email protected]> Reviewed-by: Jim Walker <[email protected]>
1 parent f4395e1 commit 18b3ad8

File tree

5 files changed

+66
-19
lines changed

5 files changed

+66
-19
lines changed

daemon/connection.cc

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1467,25 +1467,10 @@ void Connection::propagateDisconnect() const {
14671467

14681468
void Connection::signalIfIdle(bool logbusy, size_t workerthread) {
14691469
if (!isEwouldblock() && stateMachine.isIdleState()) {
1470-
// Raise a 'fake' write event to ensure the connection has an
1471-
// event delivered (for example if its sendQ is full).
1472-
if (!registered_in_libevent) {
1473-
ev_flags = EV_READ | EV_WRITE | EV_PERSIST;
1474-
if (!registerEvent()) {
1475-
LOG_WARNING(
1476-
"{}: Connection::signalIfIdle: Unable to "
1477-
"registerEvent. Setting state to conn_closing",
1478-
getId());
1479-
setState(McbpStateMachine::State::closing);
1480-
}
1481-
} else if (!updateEvent(EV_READ | EV_WRITE | EV_PERSIST)) {
1482-
LOG_WARNING(
1483-
"{}: Connection::signalIfIdle: Unable to "
1484-
"updateEvent. Setting state to conn_closing",
1485-
getId());
1486-
setState(McbpStateMachine::State::closing);
1487-
}
1488-
event_active(&event, EV_WRITE, 0);
1470+
auto* thr = getThread();
1471+
LOG_WARNING("Add notificaiton of {}", uint64_t(this));
1472+
thr->notification.push(this);
1473+
notify_thread(*thr);
14891474
} else if (logbusy) {
14901475
unique_cJSON_ptr json(toJSON());
14911476
auto details = to_string(json, false);

daemon/connections.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,8 @@ void conn_close(Connection& connection) {
223223
if (thread == nullptr) {
224224
throw std::logic_error("conn_close: unable to obtain non-NULL thread from connection");
225225
}
226+
227+
thread->notification.remove(&connection);
226228
// remove from pending-io list
227229
{
228230
std::lock_guard<std::mutex> lock(thread->pending_io.mutex);

daemon/memcached.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -907,6 +907,9 @@ void event_handler(evutil_socket_t fd, short which, void *arg) {
907907
thr->pending_io.map.erase(c);
908908
}
909909

910+
// Remove the connection from the notification list if it's there
911+
thr->notification.remove(c);
912+
910913
TRACE_LOCKGUARD_TIMED(thr->mutex,
911914
"mutex",
912915
"event_handler::threadLock",

daemon/memcached.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,18 @@ struct LIBEVENT_THREAD {
132132
PendingIoMap map;
133133
} pending_io;
134134

135+
/// A list of connections to signal if they're idle
136+
class NotificationList {
137+
public:
138+
void push(Connection* c);
139+
void remove(Connection* c);
140+
void swap(std::vector<Connection*>& other);
141+
142+
protected:
143+
std::mutex mutex;
144+
std::vector<Connection*> connections;
145+
} notification;
146+
135147
/// index of this thread in the threads array
136148
size_t index = 0;
137149

daemon/thread.cc

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,32 @@ void ConnectionQueue::push(std::unique_ptr<ConnectionQueueItem> item) {
6565

6666
static LIBEVENT_THREAD dispatcher_thread;
6767

68+
void LIBEVENT_THREAD::NotificationList::push(Connection* c) {
69+
std::lock_guard<std::mutex> lock(mutex);
70+
auto iter = std::find(connections.begin(), connections.end(), c);
71+
if (iter == connections.end()) {
72+
try {
73+
connections.push_back(c);
74+
} catch (const std::bad_alloc&) {
75+
// Just ignore and hopefully we'll be able to signal it at a later
76+
// time.
77+
}
78+
}
79+
}
80+
81+
void LIBEVENT_THREAD::NotificationList::remove(Connection* c) {
82+
std::lock_guard<std::mutex> lock(mutex);
83+
auto iter = std::find(connections.begin(), connections.end(), c);
84+
if (iter != connections.end()) {
85+
connections.erase(iter);
86+
}
87+
}
88+
89+
void LIBEVENT_THREAD::NotificationList::swap(std::vector<Connection*>& other) {
90+
std::lock_guard<std::mutex> lock(mutex);
91+
connections.swap(other);
92+
}
93+
6894
/*
6995
* Each libevent instance has a wakeup pipe, which other threads
7096
* can use to signal that they've put a new connection on its queue.
@@ -282,9 +308,22 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
282308
"thread_libevent_process::threadLock",
283309
SlowMutexThreshold);
284310

311+
std::vector<Connection*> notify;
312+
me.notification.swap(notify);
313+
285314
for (auto io : pending) {
286315
auto* c = io.first;
287316
auto status = io.second;
317+
318+
// Remove from the notify list if it's there as we don't
319+
// want to run them twice
320+
{
321+
auto iter = std::find(notify.begin(), notify.end(), c);
322+
if (iter != notify.end()) {
323+
notify.erase(iter);
324+
}
325+
}
326+
288327
if (c->getSocketDescriptor() != INVALID_SOCKET &&
289328
!c->isRegisteredInLibevent()) {
290329
/* The socket may have been shut down while we're looping */
@@ -302,6 +341,12 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
302341
run_event_loop(c, EV_READ | EV_WRITE);
303342
}
304343

344+
// Notify the connections we haven't notified yet
345+
for (auto c : notify) {
346+
c->setNumEvents(1);
347+
run_event_loop(c, EV_READ | EV_WRITE);
348+
}
349+
305350
/*
306351
* I could look at all of the connection objects bound to dying buckets
307352
*/

0 commit comments

Comments
 (0)