Skip to content

Commit c6ad605

Browse files
committed
fix: EventLoop::post() deadlocks when posted function throws
1 parent 22bec91 commit c6ad605

File tree

1 file changed

+20
-4
lines changed

1 file changed

+20
-4
lines changed

src/mp/proxy.cpp

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,12 @@ EventLoop::~EventLoop()
213213
{
214214
if (m_async_thread.joinable()) m_async_thread.join();
215215
const Lock lock(m_mutex);
216+
217+
if (m_post_fd != -1) {
218+
KJ_SYSCALL(::close(m_post_fd));
219+
m_post_fd = -1;
220+
}
221+
216222
KJ_ASSERT(m_post_fn == nullptr);
217223
KJ_ASSERT(!m_async_fns);
218224
KJ_ASSERT(m_wait_fd == -1);
@@ -239,13 +245,26 @@ void EventLoop::loop()
239245
kj::Own<kj::AsyncIoStream> wait_stream{
240246
m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
241247
int post_fd{m_post_fd};
248+
KJ_DEFER({
249+
Lock lock(m_mutex);
250+
m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_num_clients == 0; });
251+
m_wait_fd = -1;
252+
m_async_fns.reset();
253+
m_cv.notify_all();
254+
});
242255
char buffer = 0;
243256
for (;;) {
244257
const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope);
245258
if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly");
246259
Lock lock(m_mutex);
247260
if (m_post_fn) {
248-
Unlock(lock, *m_post_fn);
261+
try {
262+
Unlock(lock, *m_post_fn);
263+
} catch (...) {
264+
m_post_fn = nullptr;
265+
m_cv.notify_all();
266+
throw;
267+
}
249268
m_post_fn = nullptr;
250269
m_cv.notify_all();
251270
} else if (done()) {
@@ -262,10 +281,7 @@ void EventLoop::loop()
262281
wait_stream = nullptr;
263282
KJ_SYSCALL(::close(post_fd));
264283
const Lock lock(m_mutex);
265-
m_wait_fd = -1;
266284
m_post_fd = -1;
267-
m_async_fns.reset();
268-
m_cv.notify_all();
269285
}
270286

271287
void EventLoop::post(kj::Function<void()> fn)

0 commit comments

Comments
 (0)