Skip to content

Commit c1b4ab4

Browse files
committed
Merge #106: Bugfix: Clean up ThreadContext pointers when Connection is destroyed
8ba0d03 Bugfix: Clean up ThreadContext pointers when Connection is destroyed (Ryan Ofsky) Pull request description: Currently ThreadContext Connection* pointers are not removed up when a connection is destroyed. This is only a problem if a Connection instance is destroyed and new Connection is allocated at the same address, because the code assumes pointers uniquely identify connections. This causes a bug in a bitcoin IPC test which creates multiple connections in a loop, described in bitcoin/bitcoin#30509 (comment), and depending on how the heap allocator behaves, a new Connection could have the same address as a previously destroyed connection, and the code tries to use a thread reference associated with the previous connection when making a new call, and there is a segfault because the thread no longer exists. Fix this problem by adding Connection cleanup callbacks to remove Connection* pointers from the ThreadContext struct if the connection is destroyed before the thread is. Top commit has no ACKs. Tree-SHA512: c715f15a2218e5c8073cf6e2852285b8ad295ed331187d7d4d454df28512621ef76067c7cb11ddf17065efdfa0f7096190ae9033466d252eea6ddae51ea44d64
2 parents a9e16da + 8ba0d03 commit c1b4ab4

File tree

3 files changed

+94
-43
lines changed

3 files changed

+94
-43
lines changed

include/mp/proxy-io.h

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include <assert.h>
1616
#include <functional>
17+
#include <optional>
1718
#include <map>
1819
#include <memory>
1920
#include <sstream>
@@ -60,6 +61,18 @@ struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
6061
using ProxyClientBase::ProxyClientBase;
6162
// https://stackoverflow.com/questions/22357887/comparing-two-mapiterators-why-does-it-need-the-copy-constructor-of-stdpair
6263
ProxyClient(const ProxyClient&) = delete;
64+
~ProxyClient();
65+
66+
void setCleanup(std::function<void()> cleanup);
67+
68+
//! Cleanup function to run when the connection is closed. If the Connection
69+
//! gets destroyed before this ProxyClient<Thread> object, this cleanup
70+
//! callback lets it destroy this object and remove its entry in the
71+
//! thread's request_threads or callback_threads map (after resetting
72+
//! m_cleanup so the destructor does not try to access it). But if this
73+
//! object gets destroyed before the Connection, there's no need to run the
74+
//! cleanup function and the destructor will unregister it.
75+
std::optional<CleanupIt> m_cleanup;
6376
};
6477

6578
template <>
@@ -503,6 +516,14 @@ void ProxyServerBase<Interface, Impl>::invokeDestroy()
503516
m_context.cleanup.clear();
504517
}
505518

519+
using ConnThreads = std::map<Connection*, ProxyClient<Thread>>;
520+
using ConnThread = ConnThreads::iterator;
521+
522+
// Retrieve ProxyClient<Thread> object associated with this connection from a
523+
// map, or create a new one and insert it into the map. Return map iterator and
524+
// inserted bool.
525+
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, std::function<Thread::Client()> make_thread);
526+
506527
struct ThreadContext
507528
{
508529
//! Identifying string for debug.
@@ -517,7 +538,7 @@ struct ThreadContext
517538
//! `callbackThread` argument it passes in the request, used by the server
518539
//! in case it needs to make callbacks into the client that need to execute
519540
//! while the client is waiting. This will be set to a local thread object.
520-
std::map<Connection*, ProxyClient<Thread>> callback_threads;
541+
ConnThreads callback_threads;
521542

522543
//! When client is making a request to a server, this is the `thread`
523544
//! argument it passes in the request, used to control which thread on
@@ -526,7 +547,7 @@ struct ThreadContext
526547
//! by makeThread. If a client call is being made from a thread currently
527548
//! handling a server request, this will be set to the `callbackThread`
528549
//! request thread argument passed in that request.
529-
std::map<Connection*, ProxyClient<Thread>> request_threads;
550+
ConnThreads request_threads;
530551

531552
//! Whether this thread is a capnp event loop thread. Not really used except
532553
//! to assert false if there's an attempt to execute a blocking operation

include/mp/proxy-types.h

Lines changed: 37 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -62,36 +62,33 @@ void CustomBuildField(TypeList<>,
6262
{
6363
auto& connection = invoke_context.connection;
6464
auto& thread_context = invoke_context.thread_context;
65-
auto& request_threads = thread_context.request_threads;
66-
auto& callback_threads = thread_context.callback_threads;
67-
68-
auto callback_thread = callback_threads.find(&connection);
69-
if (callback_thread == callback_threads.end()) {
70-
callback_thread =
71-
callback_threads
72-
.emplace(std::piecewise_construct, std::forward_as_tuple(&connection),
73-
std::forward_as_tuple(
74-
connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})),
75-
&connection, /* destroy_connection= */ false))
76-
.first;
77-
}
7865

79-
auto request_thread = request_threads.find(&connection);
80-
if (request_thread == request_threads.end()) {
81-
// This code will only run if IPC client call is being made for the
82-
// first time on a new thread. After the first call, subsequent calls
66+
// Create local Thread::Server object corresponding to the current thread
67+
// and pass a Thread::Client reference to it in the Context.callbackThread
68+
// field so the function being called can make callbacks to this thread.
69+
// Also store the Thread::Client reference in the callback_threads map so
70+
// future calls over this connection can reuse it.
71+
auto [callback_thread, _]{SetThread(
72+
thread_context.callback_threads, thread_context.waiter->m_mutex, &connection,
73+
[&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
74+
75+
// Call remote ThreadMap.makeThread function so server will create a
76+
// dedicated worker thread to run function calls from this thread. Store the
77+
// Thread::Client reference it returns in the request_threads map.
78+
auto make_request_thread{[&]{
79+
// This code will only run if an IPC client call is being made for the
80+
// first time on this thread. After the first call, subsequent calls
8381
// will use the existing request thread. This code will also never run at
8482
// all if the current thread is a request thread created for a different
8583
// IPC client, because in that case PassField code (below) will have set
8684
// request_thread to point to the calling thread.
8785
auto request = connection.m_thread_map.makeThreadRequest();
8886
request.setName(thread_context.thread_name);
89-
request_thread =
90-
request_threads
91-
.emplace(std::piecewise_construct, std::forward_as_tuple(&connection),
92-
std::forward_as_tuple(request.send().getResult(), &connection, /* destroy_connection= */ false))
93-
.first; // Nonblocking due to capnp request pipelining.
94-
}
87+
return request.send().getResult(); // Nonblocking due to capnp request pipelining.
88+
}};
89+
auto [request_thread, _1]{SetThread(
90+
thread_context.request_threads, thread_context.waiter->m_mutex,
91+
&connection, make_request_thread)};
9592

9693
auto context = output.init();
9794
context.setThread(request_thread->second.m_client);
@@ -143,24 +140,23 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
143140
// call. In this case, the callbackThread value should point
144141
// to the same thread already in the map, so there is no
145142
// need to update the map.
146-
auto& request_threads = g_thread_context.request_threads;
147-
auto request_thread = request_threads.find(server.m_context.connection);
148-
if (request_thread == request_threads.end()) {
149-
request_thread =
150-
g_thread_context.request_threads
151-
.emplace(std::piecewise_construct, std::forward_as_tuple(server.m_context.connection),
152-
std::forward_as_tuple(context_arg.getCallbackThread(), server.m_context.connection,
153-
/* destroy_connection= */ false))
154-
.first;
155-
} else {
156-
// The requests_threads map already has an entry for
157-
// this connection, so this must be a recursive call.
158-
// Avoid modifying the map in this case by resetting the
159-
// request_thread iterator, so the KJ_DEFER statement
160-
// below doesn't do anything.
161-
request_thread = request_threads.end();
162-
}
163-
KJ_DEFER(if (request_thread != request_threads.end()) request_threads.erase(request_thread));
143+
auto& thread_context = g_thread_context;
144+
auto& request_threads = thread_context.request_threads;
145+
auto [request_thread, inserted]{SetThread(
146+
request_threads, thread_context.waiter->m_mutex,
147+
server.m_context.connection,
148+
[&] { return context_arg.getCallbackThread(); })};
149+
150+
// If an entry was inserted into the requests_threads map,
151+
// remove it after calling fn.invoke. If an entry was not
152+
// inserted, one already existed, meaning this must be a
153+
// recursive call (IPC call calling back to the caller which
154+
// makes another IPC call), so avoid modifying the map.
155+
auto erase_thread{inserted ? request_thread : request_threads.end()};
156+
KJ_DEFER(if (erase_thread != request_threads.end()) {
157+
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
158+
request_threads.erase(erase_thread);
159+
});
164160
fn.invoke(server_context, args...);
165161
}
166162
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {

src/mp/proxy.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,40 @@ void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
262262
}
263263
}
264264

265+
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, std::function<Thread::Client()> make_thread)
266+
{
267+
std::unique_lock<std::mutex> lock(mutex);
268+
auto thread = threads.find(connection);
269+
if (thread != threads.end()) return {thread, false};
270+
thread = threads.emplace(
271+
std::piecewise_construct, std::forward_as_tuple(connection),
272+
std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first;
273+
thread->second.setCleanup([&threads, &mutex, thread] {
274+
// Connection is being destroyed before thread client is, so reset
275+
// thread client m_cleanup member so thread client destructor does not
276+
// try unregister this callback after connection is destroyed.
277+
thread->second.m_cleanup.reset();
278+
// Remove connection pointer about to be destroyed from the map
279+
std::unique_lock<std::mutex> lock(mutex);
280+
threads.erase(thread);
281+
});
282+
return {thread, true};
283+
}
284+
285+
ProxyClient<Thread>::~ProxyClient()
286+
{
287+
if (m_cleanup) {
288+
m_context.connection->removeSyncCleanup(*m_cleanup);
289+
}
290+
}
291+
292+
void ProxyClient<Thread>::setCleanup(std::function<void()> cleanup)
293+
{
294+
assert(cleanup);
295+
assert(!m_cleanup);
296+
m_cleanup = m_context.connection->addSyncCleanup(cleanup);
297+
}
298+
265299
ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
266300
: m_thread_context(thread_context), m_thread(std::move(thread))
267301
{

0 commit comments

Comments
 (0)