Skip to content

Commit cd6d95a

Browse files
committed
bug: fix ProxyClient<Thread> deadlock if disconnected as IPC call is returning
This bug is currently causing mptest "disconnecting and blocking" test to occasionally hang as reported by maflcko in bitcoin/bitcoin#33244. The bug was actually first reported by Sjors in Sjors/bitcoin#90 (comment) and there are more details about it in #189. The bug is caused by the "disconnecting and blocking" test triggering a disconnect right before a server IPC call returns. This results in a race between the IPC server thread and the onDisconnect handler in the event loop thread both trying to destroy the server's request_threads ProxyClient<Thread> object when the IPC call is done. There was a lack of synchronization in this case, fixed here by adding loop->sync() various places. There were also lock order issues where Waiter::m_mutex could be incorrectly locked before EventLoop::m_mutex resulting in a deadlock.
1 parent dd86363 commit cd6d95a

File tree

3 files changed

+68
-49
lines changed

3 files changed

+68
-49
lines changed

include/mp/proxy-io.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
6666
ProxyClient(const ProxyClient&) = delete;
6767
~ProxyClient();
6868

69-
void setDisconnectCallback(const std::function<void()>& fn);
70-
7169
//! Reference to callback function that is run if there is a sudden
7270
//! disconnect and the Connection object is destroyed before this
7371
//! ProxyClient<Thread> object. The callback will destroy this object and
@@ -537,8 +535,10 @@ void ProxyServerBase<Interface, Impl>::invokeDestroy()
537535
//! Map from Connection to local or remote thread handle which will be used over
538536
//! that connection. This map will typically only contain one entry, but can
539537
//! contain multiple if a single thread makes IPC calls over multiple
540-
//! connections.
541-
using ConnThreads = std::map<Connection*, ProxyClient<Thread>>;
538+
//! connections. A std::optional value type is used to avoid the map needing to
539+
//! be locked while ProxyClient<Thread> objects are constructred, see
540+
//! ThreadContext "Synchronization note" below.
541+
using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
542542
using ConnThread = ConnThreads::iterator;
543543

544544
// Retrieve ProxyClient<Thread> object associated with this connection from a

include/mp/type-context.h

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ void CustomBuildField(TypeList<>,
4747
&connection, make_request_thread)};
4848

4949
auto context = output.init();
50-
context.setThread(request_thread->second.m_client);
51-
context.setCallbackThread(callback_thread->second.m_client);
50+
context.setThread(request_thread->second->m_client);
51+
context.setCallbackThread(callback_thread->second->m_client);
5252
}
5353

5454
//! PassField override for mp.Context arguments. Return asynchronously and call
@@ -89,10 +89,13 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
8989
// need to update the map.
9090
auto& thread_context = g_thread_context;
9191
auto& request_threads = thread_context.request_threads;
92-
auto [request_thread, inserted]{SetThread(
93-
request_threads, thread_context.waiter->m_mutex,
94-
server.m_context.connection,
95-
[&] { return context_arg.getCallbackThread(); })};
92+
ConnThread request_thread;
93+
bool inserted;
94+
server.m_context.loop->sync([&] {
95+
std::tie(request_thread, inserted) = SetThread(
96+
request_threads, thread_context.waiter->m_mutex, server.m_context.connection,
97+
[&] { return context_arg.getCallbackThread(); });
98+
});
9699

97100
// If an entry was inserted into the requests_threads map,
98101
// remove it after calling fn.invoke. If an entry was not
@@ -101,17 +104,24 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
101104
// makes another IPC call), so avoid modifying the map.
102105
const bool erase_thread{inserted};
103106
KJ_DEFER(if (erase_thread) {
104-
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
105-
// Call erase here with a Connection* argument instead
106-
// of an iterator argument, because the `request_thread`
107-
// iterator may be invalid if the connection is closed
108-
// during this function call. More specifically, the
109-
// iterator may be invalid because SetThread adds a
110-
// cleanup callback to the Connection destructor that
111-
// erases the thread from the map, and also because the
112-
// ProxyServer<Thread> destructor calls
113-
// request_threads.clear().
114-
request_threads.erase(server.m_context.connection);
107+
// Erase the request_threads entry on the event loop
108+
// thread with loop->sync(), so if the connection is
109+
// broken there is not a race between this thread and
110+
// the disconnect handler trying to destroy the thread
111+
// client object.
112+
server.m_context.loop->sync([&] {
113+
// Look up the thread again without using existing
114+
// iterator since entry may no longer be there after
115+
// a disconnect. Destroy node after releasing
116+
// Waiter::m_mutex, so the ProxyClient<Thread>
117+
// destructor is able to use EventLoop::mutex
118+
// without violating lock order.
119+
ConnThreads::node_type removed;
120+
{
121+
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
122+
removed = request_threads.extract(server.m_context.connection);
123+
}
124+
});
115125
});
116126
fn.invoke(server_context, args...);
117127
}

src/mp/proxy.cpp

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
156156

157157
void Connection::removeSyncCleanup(CleanupIt it)
158158
{
159+
// Require cleanup functions to be removed on the event loop thread to avoid
160+
// needing to deal with them being removed in the middle of a disconnect.
161+
assert(std::this_thread::get_id() == m_loop->m_thread_id);
159162
const Lock lock(m_loop->m_mutex);
160163
m_sync_cleanup_fns.erase(it);
161164
}
@@ -307,27 +310,32 @@ bool EventLoop::done() const
307310

308311
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
309312
{
310-
const std::unique_lock<std::mutex> lock(mutex);
311-
auto thread = threads.find(connection);
312-
if (thread != threads.end()) return {thread, false};
313-
thread = threads.emplace(
314-
std::piecewise_construct, std::forward_as_tuple(connection),
315-
std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first;
316-
thread->second.setDisconnectCallback([&threads, &mutex, thread] {
317-
// Note: it is safe to use the `thread` iterator in this cleanup
318-
// function, because the iterator would only be invalid if the map entry
319-
// was removed, and if the map entry is removed the ProxyClient<Thread>
320-
// destructor unregisters the cleanup.
321-
322-
// Connection is being destroyed before thread client is, so reset
323-
// thread client m_disconnect_cb member so thread client destructor does not
324-
// try to unregister this callback after connection is destroyed.
325-
// Remove connection pointer about to be destroyed from the map
313+
assert(std::this_thread::get_id() == connection->m_loop->m_thread_id);
314+
ConnThread thread;
315+
bool inserted;
316+
{
326317
const std::unique_lock<std::mutex> lock(mutex);
327-
thread->second.m_disconnect_cb.reset();
328-
threads.erase(thread);
329-
});
330-
return {thread, true};
318+
std::tie(thread, inserted) = threads.try_emplace(connection);
319+
}
320+
if (inserted) {
321+
thread->second.emplace(make_thread(), connection, /* destroy_connection= */ false);
322+
thread->second->m_disconnect_cb = connection->addSyncCleanup([&threads, &mutex, thread] {
323+
// Note: it is safe to use the `thread` iterator in this cleanup
324+
// function, because the iterator would only be invalid if the map entry
325+
// was removed, and if the map entry is removed the ProxyClient<Thread>
326+
// destructor unregisters the cleanup.
327+
328+
// Connection is being destroyed before thread client is, so reset
329+
// thread client m_disconnect_cb member so thread client destructor does not
330+
// try to unregister this callback after connection is destroyed.
331+
thread->second->m_disconnect_cb.reset();
332+
333+
// Remove connection pointer about to be destroyed from the map
334+
const std::unique_lock<std::mutex> lock(mutex);
335+
threads.erase(thread);
336+
});
337+
}
338+
return {thread, inserted};
331339
}
332340

333341
ProxyClient<Thread>::~ProxyClient()
@@ -336,17 +344,18 @@ ProxyClient<Thread>::~ProxyClient()
336344
// cleanup callback that was registered to handle the connection being
337345
// destroyed before the thread being destroyed.
338346
if (m_disconnect_cb) {
339-
m_context.connection->removeSyncCleanup(*m_disconnect_cb);
347+
// Remove disconnect callback on the event loop thread with
348+
// loop->sync(), so if the connection is broken there is not a race
349+
// between this thread trying to remove the callback and the disconnect
350+
// handler attempting to call it.
351+
m_context.loop->sync([&]() {
352+
if (m_disconnect_cb) {
353+
m_context.connection->removeSyncCleanup(*m_disconnect_cb);
354+
}
355+
});
340356
}
341357
}
342358

343-
void ProxyClient<Thread>::setDisconnectCallback(const std::function<void()>& fn)
344-
{
345-
assert(fn);
346-
assert(!m_disconnect_cb);
347-
m_disconnect_cb = m_context.connection->addSyncCleanup(fn);
348-
}
349-
350359
ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
351360
: m_thread_context(thread_context), m_thread(std::move(thread))
352361
{

0 commit comments

Comments
 (0)