Skip to content

Commit 2b830e5

Browse files
committed
refactor: Use EventLoopRef instead of addClient/removeClient
Use EventLoopRef to avoid reference counting bugs and be more exception safe
1 parent 315ff53 commit 2b830e5

File tree

3 files changed

+29
-49
lines changed

3 files changed

+29
-49
lines changed

include/mp/proxy-io.h

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -313,21 +313,13 @@ class Connection
313313
Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
314314
: m_loop(loop), m_stream(kj::mv(stream_)),
315315
m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
316-
m_rpc_system(::capnp::makeRpcClient(m_network))
317-
{
318-
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
319-
m_loop.addClient(lock);
320-
}
316+
m_rpc_system(::capnp::makeRpcClient(m_network)) {}
321317
Connection(EventLoop& loop,
322318
kj::Own<kj::AsyncIoStream>&& stream_,
323319
const std::function<::capnp::Capability::Client(Connection&)>& make_client)
324320
: m_loop(loop), m_stream(kj::mv(stream_)),
325321
m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
326-
m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this)))
327-
{
328-
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
329-
m_loop.addClient(lock);
330-
}
322+
m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
331323

332324
//! Run cleanup functions. Must be called from the event loop thread. First
333325
//! calls synchronous cleanup functions while blocked (to free capnp
@@ -356,12 +348,12 @@ class Connection
356348
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
357349
// error in cases where f deletes this Connection object.
358350
m_on_disconnect.add(m_network.onDisconnect().then(
359-
[f = std::forward<F>(f), this]() mutable { m_loop.m_task_set->add(kj::evalLater(kj::mv(f))); }));
351+
[f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
360352
}
361353

362-
EventLoop& m_loop;
354+
EventLoopRef m_loop;
363355
kj::Own<kj::AsyncIoStream> m_stream;
364-
LoggingErrorHandler m_error_handler{m_loop};
356+
LoggingErrorHandler m_error_handler{*m_loop};
365357
kj::TaskSet m_on_disconnect{m_error_handler};
366358
::capnp::TwoPartyVatNetwork m_network;
367359
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
@@ -404,21 +396,12 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
404396
: m_client(std::move(client)), m_context(connection)
405397

406398
{
407-
{
408-
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
409-
m_context.connection->m_loop.addClient(lock);
410-
}
411-
412399
// Handler for the connection getting destroyed before this client object.
413400
auto cleanup_it = m_context.connection->addSyncCleanup([this]() {
414401
// Release client capability by move-assigning to temporary.
415402
{
416403
typename Interface::Client(std::move(m_client));
417404
}
418-
{
419-
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
420-
m_context.connection->m_loop.removeClient(lock);
421-
}
422405
m_context.connection = nullptr;
423406
});
424407

@@ -451,11 +434,6 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
451434
{
452435
typename Interface::Client(std::move(m_client));
453436
}
454-
{
455-
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
456-
m_context.connection->m_loop.removeClient(lock);
457-
}
458-
459437
if (destroy_connection) {
460438
delete m_context.connection;
461439
m_context.connection = nullptr;
@@ -477,8 +455,6 @@ ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Co
477455
: m_impl(std::move(impl)), m_context(&connection)
478456
{
479457
assert(m_impl);
480-
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
481-
m_context.connection->m_loop.addClient(lock);
482458
}
483459

484460
//! ProxyServer destructor, called from the EventLoop thread by Cap'n Proto
@@ -512,8 +488,6 @@ ProxyServerBase<Interface, Impl>::~ProxyServerBase()
512488
});
513489
}
514490
assert(m_context.cleanup_fns.empty());
515-
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
516-
m_context.connection->m_loop.removeClient(lock);
517491
}
518492

519493
//! If the capnp interface defined a special "destroy" method, as described the

include/mp/proxy.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class EventLoopRef
7272
struct ProxyContext
7373
{
7474
Connection* connection;
75-
EventLoop* loop;
75+
EventLoopRef loop;
7676
CleanupList cleanup_fns;
7777

7878
ProxyContext(Connection* connection);

src/mp/proxy.cpp

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ bool EventLoopRef::reset()
6565
return done;
6666
}
6767

68-
ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{&connection->m_loop} {}
68+
ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{*connection->m_loop} {}
6969

7070
Connection::~Connection()
7171
{
@@ -122,18 +122,17 @@ Connection::~Connection()
122122
m_sync_cleanup_fns.pop_front();
123123
}
124124
while (!m_async_cleanup_fns.empty()) {
125-
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
126-
m_loop.m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front()));
125+
const std::unique_lock<std::mutex> lock(m_loop->m_mutex);
126+
m_loop->m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front()));
127127
m_async_cleanup_fns.pop_front();
128128
}
129-
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
130-
m_loop.startAsyncThread(lock);
131-
m_loop.removeClient(lock);
129+
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
130+
m_loop->startAsyncThread(lock);
132131
}
133132

134133
CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
135134
{
136-
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
135+
const std::unique_lock<std::mutex> lock(m_loop->m_mutex);
137136
// Add cleanup callbacks to the front of list, so sync cleanup functions run
138137
// in LIFO order. This is a good approach because sync cleanup functions are
139138
// added as client objects are created, and it is natural to clean up
@@ -147,13 +146,13 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
147146

148147
void Connection::removeSyncCleanup(CleanupIt it)
149148
{
150-
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
149+
const std::unique_lock<std::mutex> lock(m_loop->m_mutex);
151150
m_sync_cleanup_fns.erase(it);
152151
}
153152

154153
void Connection::addAsyncCleanup(std::function<void()> fn)
155154
{
156-
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
155+
const std::unique_lock<std::mutex> lock(m_loop->m_mutex);
157156
// Add async cleanup callbacks to the back of the list. Unlike the sync
158157
// cleanup list, this list order is more significant because it determines
159158
// the order server objects are destroyed when there is a sudden disconnect,
@@ -244,7 +243,7 @@ void EventLoop::post(const std::function<void()>& fn)
244243
return;
245244
}
246245
std::unique_lock<std::mutex> lock(m_mutex);
247-
addClient(lock);
246+
EventLoopRef ref(*this, &lock);
248247
m_cv.wait(lock, [this] { return m_post_fn == nullptr; });
249248
m_post_fn = &fn;
250249
int post_fd{m_post_fd};
@@ -253,20 +252,22 @@ void EventLoop::post(const std::function<void()>& fn)
253252
KJ_SYSCALL(write(post_fd, &buffer, 1));
254253
});
255254
m_cv.wait(lock, [this, &fn] { return m_post_fn != &fn; });
256-
removeClient(lock);
257255
}
258256

259257
void EventLoop::addClient(std::unique_lock<std::mutex>& lock) { m_num_clients += 1; }
260258

261259
bool EventLoop::removeClient(std::unique_lock<std::mutex>& lock)
262260
{
261+
assert(m_num_clients > 0);
263262
m_num_clients -= 1;
264263
if (done(lock)) {
265264
m_cv.notify_all();
266265
int post_fd{m_post_fd};
267266
lock.unlock();
268267
char buffer = 0;
269268
KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
269+
// Do not try to relock `lock` after writing, because the event loop
270+
// could wake up and destroy itself and the mutex might no longer exist.
270271
return true;
271272
}
272273
return false;
@@ -275,20 +276,25 @@ bool EventLoop::removeClient(std::unique_lock<std::mutex>& lock)
275276
void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
276277
{
277278
if (m_async_thread.joinable()) {
279+
// Notify to wake up the async thread if it is already running.
278280
m_cv.notify_all();
279281
} else if (!m_async_fns.empty()) {
280282
m_async_thread = std::thread([this] {
281283
std::unique_lock<std::mutex> lock(m_mutex);
282-
while (true) {
284+
while (!done(lock)) {
283285
if (!m_async_fns.empty()) {
284-
addClient(lock);
286+
EventLoopRef ref{*this, &lock};
285287
const std::function<void()> fn = std::move(m_async_fns.front());
286288
m_async_fns.pop_front();
287289
Unlock(lock, fn);
288-
if (removeClient(lock)) break;
290+
// Reset ref and break if that returns true instead of
291+
// passively letting ref go out of scope. This is important
292+
// because the ref destructor would leave m_mutex unlocked
293+
// when done() returns true, causing undefined behavior if
294+
// the loop continued to execute.
295+
if (ref.reset()) break;
296+
// Continue without waiting in case there are more async_fns
289297
continue;
290-
} else if (m_num_clients == 0) {
291-
break;
292298
}
293299
m_cv.wait(lock);
294300
}
@@ -394,7 +400,7 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
394400
const std::string from = context.getParams().getName();
395401
std::promise<ThreadContext*> thread_context;
396402
std::thread thread([&thread_context, from, this]() {
397-
g_thread_context.thread_name = ThreadName(m_connection.m_loop.m_exe_name) + " (from " + from + ")";
403+
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
398404
g_thread_context.waiter = std::make_unique<Waiter>();
399405
thread_context.set_value(&g_thread_context);
400406
std::unique_lock<std::mutex> lock(g_thread_context.waiter->m_mutex);

0 commit comments

Comments
 (0)