Skip to content

Commit ea38392

Browse files
committed
Prevent EventLoop async cleanup thread early exit during shutdown
Antoine Poinsot <[email protected]> reported a bug with details in #182 (comment) where an IPC client rapidly connecting and disconnecting to the server in a loop could cause problems. One problem, fixed by this commit, was that if a server process is shutting down, the async cleanup thread in `EventLoop::startAsyncThread` responsible for destroying unused server object on unclean disconnects could detect the done() condition and decide to exit right before a new incoming connection is processed, and exit prematurely when there might be more cleanup work for it do. If this happens, process shutdown could hang waiting for cleanup work that will never be completed. This commit fixes that problem by changing the `EventLoop::startAsyncThread()` while condition to check whether the `EventLoop::loop() method has _exited_, instead of checking whether it is about to exit. Specifically the change makes the `m_async_fns` list into a `std::optional` value and sets it to `nullopt` when the `loop()` method exits.
1 parent 616d9a7 commit ea38392

File tree

4 files changed

+30
-22
lines changed

4 files changed

+30
-22
lines changed

include/mp/proxy-io.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ class EventLoop
218218
kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
219219

220220
//! Callback functions to run on async thread.
221-
CleanupList m_async_fns MP_GUARDED_BY(m_mutex);
221+
std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);
222222

223223
//! Pipe read handle used to wake up the event loop thread.
224224
int m_wait_fd = -1;

include/mp/proxy.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class EventLoopRef
6262
~EventLoopRef() { reset(); }
6363
EventLoop& operator*() const { assert(m_loop); return *m_loop; }
6464
EventLoop* operator->() const { assert(m_loop); return m_loop; }
65-
bool reset();
65+
void reset(bool relock=false);
6666

6767
EventLoop* m_loop{nullptr};
6868
Lock* m_lock{nullptr};

include/mp/util.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ struct PtrOrValue {
161161
#define MP_RELEASE(...) MP_TSA(release_capability(__VA_ARGS__))
162162
#define MP_ASSERT_CAPABILITY(x) MP_TSA(assert_capability(x))
163163
#define MP_GUARDED_BY(x) MP_TSA(guarded_by(x))
164+
#define MP_NO_TSA MP_TSA(no_thread_safety_analysis)
164165

165166
class MP_CAPABILITY("mutex") Mutex {
166167
public:

src/mp/proxy.cpp

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -56,27 +56,29 @@ EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(
5656
m_loop->m_num_clients += 1;
5757
}
5858

59-
bool EventLoopRef::reset()
59+
// Due to the conditionals in this function, MP_NO_TSA is required to avoid
60+
// error "error: mutex 'loop_lock' is not held on every path through here
61+
// [-Wthread-safety-analysis]"
62+
void EventLoopRef::reset(bool relock) MP_NO_TSA
6063
{
61-
bool done = false;
6264
if (auto* loop{m_loop}) {
6365
m_loop = nullptr;
6466
auto loop_lock{PtrOrValue{m_lock, loop->m_mutex}};
6567
loop_lock->assert_locked(loop->m_mutex);
6668
assert(loop->m_num_clients > 0);
6769
loop->m_num_clients -= 1;
6870
if (loop->done()) {
69-
done = true;
7071
loop->m_cv.notify_all();
7172
int post_fd{loop->m_post_fd};
7273
loop_lock->unlock();
7374
char buffer = 0;
7475
KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
75-
// Do not try to relock `loop_lock` after writing, because the event loop
76-
// could wake up and destroy itself and the mutex might no longer exist.
76+
// By default, do not try to relock `loop_lock` after writing,
77+
// because the event loop could wake up and destroy itself and the
78+
// mutex might no longer exist.
79+
if (relock) loop_lock->lock();
7780
}
7881
}
79-
return done;
8082
}
8183

8284
ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{*connection->m_loop} {}
@@ -137,7 +139,7 @@ Connection::~Connection()
137139
}
138140
while (!m_async_cleanup_fns.empty()) {
139141
const Lock lock(m_loop->m_mutex);
140-
m_loop->m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front()));
142+
m_loop->m_async_fns->emplace_back(std::move(m_async_cleanup_fns.front()));
141143
m_async_cleanup_fns.pop_front();
142144
}
143145
Lock lock(m_loop->m_mutex);
@@ -204,7 +206,7 @@ EventLoop::~EventLoop()
204206
if (m_async_thread.joinable()) m_async_thread.join();
205207
const Lock lock(m_mutex);
206208
KJ_ASSERT(m_post_fn == nullptr);
207-
KJ_ASSERT(m_async_fns.empty());
209+
KJ_ASSERT(!m_async_fns);
208210
KJ_ASSERT(m_wait_fd == -1);
209211
KJ_ASSERT(m_post_fd == -1);
210212
KJ_ASSERT(m_num_clients == 0);
@@ -220,6 +222,12 @@ void EventLoop::loop()
220222
g_thread_context.loop_thread = true;
221223
KJ_DEFER(g_thread_context.loop_thread = false);
222224

225+
{
226+
const Lock lock(m_mutex);
227+
assert(!m_async_fns);
228+
m_async_fns.emplace();
229+
}
230+
223231
kj::Own<kj::AsyncIoStream> wait_stream{
224232
m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
225233
int post_fd{m_post_fd};
@@ -248,6 +256,8 @@ void EventLoop::loop()
248256
const Lock lock(m_mutex);
249257
m_wait_fd = -1;
250258
m_post_fd = -1;
259+
m_async_fns.reset();
260+
m_cv.notify_all();
251261
}
252262

253263
void EventLoop::post(kj::Function<void()> fn)
@@ -270,24 +280,21 @@ void EventLoop::post(kj::Function<void()> fn)
270280

271281
void EventLoop::startAsyncThread()
272282
{
283+
assert (std::this_thread::get_id() == m_thread_id);
273284
if (m_async_thread.joinable()) {
274285
// Notify to wake up the async thread if it is already running.
275286
m_cv.notify_all();
276-
} else if (!m_async_fns.empty()) {
287+
} else if (!m_async_fns->empty()) {
277288
m_async_thread = std::thread([this] {
278289
Lock lock(m_mutex);
279-
while (!done()) {
280-
if (!m_async_fns.empty()) {
290+
while (m_async_fns) {
291+
if (!m_async_fns->empty()) {
281292
EventLoopRef ref{*this, &lock};
282-
const std::function<void()> fn = std::move(m_async_fns.front());
283-
m_async_fns.pop_front();
293+
const std::function<void()> fn = std::move(m_async_fns->front());
294+
m_async_fns->pop_front();
284295
Unlock(lock, fn);
285-
// Reset ref and break if that returns true instead of
286-
// passively letting ref go out of scope. This is important
287-
// because the ref destructor would leave m_mutex unlocked
288-
// when done() returns true, causing undefined behavior if
289-
// the loop continued to execute.
290-
if (ref.reset()) break;
296+
// Important to relock because of the wait() call below.
297+
ref.reset(/*relock=*/true);
291298
// Continue without waiting in case there are more async_fns
292299
continue;
293300
}
@@ -300,7 +307,7 @@ void EventLoop::startAsyncThread()
300307
bool EventLoop::done() const
301308
{
302309
assert(m_num_clients >= 0);
303-
return m_num_clients == 0 && m_async_fns.empty();
310+
return m_num_clients == 0 && m_async_fns->empty();
304311
}
305312

306313
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)

0 commit comments

Comments
 (0)