Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/configs/llvm.bash
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ CI_DESC="CI job using LLVM-based libraries and tools (clang, libc++, clang-tidy,
CI_DIR=build-llvm
NIX_ARGS=(--arg enableLibcxx true)
export CXX=clang++
export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wthread-safety-analysis -Wno-unused-parameter"
export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wthread-safety -Wno-unused-parameter"
CMAKE_ARGS=(
-G Ninja
-DMP_ENABLE_CLANG_TIDY=ON
Expand Down
2 changes: 1 addition & 1 deletion ci/configs/sanitize.bash
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CI_DESC="CI job running ThreadSanitizer"
CI_DIR=build-sanitize
export CXX=clang++
export CXXFLAGS="-ggdb -Werror -Wall -Wextra -Wpedantic -Wthread-safety-analysis -Wno-unused-parameter -fsanitize=thread"
export CXXFLAGS="-ggdb -Werror -Wall -Wextra -Wpedantic -Wthread-safety -Wno-unused-parameter -fsanitize=thread"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4e365b0:
This is unrelated to the PR, but why generate debugging information here? I'm referring to the use of the -ggdb flag

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #201 (comment)

I haven't tried without -ggdb, but sanitizers like ThreadSanitizer print stack traces that I think depend on having this debug information. Cap'n Proto also can use addr2line to print stack traces internally when there are errors (though I don't think we have this option enabled)

CMAKE_ARGS=()
BUILD_ARGS=(-k -j4)
BUILD_TARGETS=(mptest)
72 changes: 58 additions & 14 deletions include/mp/proxy-io.h
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tangent: Why does EventLoop use Unix sockets within the same process to signal when there's a new m_post_fn to execute? Seems like there should be thread synchronization primitives with marginally less overhead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: https://github.com/bitcoin-core/libmultiprocess/pull/201/files#r2359060452

Tangent: Why does EventLoop use Unix sockets within the same process to signal when there's a new m_post_fn to execute? Seems like there should be thread synchronization primitives with marginally less overhead?

EventLoop documentation is meant to provide some background on this and specifically the link at the bottom https://groups.google.com/d/msg/capnproto/TuQFF1eH2-M/g81sHaTAAQAJ addresses this question.

Cap'n Proto doesn't use threads and assumes all code calling it runs on a single thread. There are benefits and costs that come from this but one of the costs is that external threads need to use I/O to communicate with cap'n proto's event loop. There isn't another way for threads to send signals. (Or at least there wasn't at time I wrote this, maybe newer versions of cap'n proto provide something)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main thing I was missing is the need for pumping the async IO. I assume waiting on a async IO-wrapped socket serves that purpose?

Was curious why you didn't use m_io_context->provider->newOneWayPipe() but when writing to the AsyncOutputStream a promise is returned with KJ_WARN_UNUSED_RESULT (https://github.com/capnproto/capnproto/blob/7db701e94ad00b8db06ede2bea5f527e2a6fa3de/c%2B%2B/src/kj/async-io.h#L121), which doesn't mirror what was in the example in Google Groups.

Q: I can see there's a ::close() call for [m_]post_fd, but none for m_wait_fd? Maybe it's garbage collected once the process dies, or does wrapSocketFd() assume responsibility for closing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #201 (comment)

Yes IIRC wrapSocketFd does take ownership. A lot of this code is also updated & made more generic in windows PR bitcoin/bitcoin#32387 in case you are curious.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tangent in unrelated file - regarding how easy it is to understand libmultiprocess:
TestSetup (in test/mp/test/test.cpp) is hard to read, maybe would benefit from adding comments with argument names of anonymous lambdas and explicit captures of variables. Also feeling resistance when having to learn that kj::heap returns something std::unique_ptr-like which implicit casts to raw pointer. Are the kj types only used when necessary to interface with Cap'n'Proto, and std everywhere else?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #201 (comment)

Tangent in unrelated file - regarding how easy it is to understand libmultiprocess: TestSetup (in test/mp/test/test.cpp) is hard to read, maybe would benefit from adding comments with argument names of anonymous lambdas and explicit captures of variables.

This is probably true and I'm currently working on test improvements where I'm adding more members to the TestSetup class and could add more comments.

I'm not sure I would always prefer explicit variable captures though. I think [&] is not just a way of capturing state and cutting down noise, but also an important way indicating that the lambda will only be called in the current scope, and is not some event handler that can be called at a later time. So I tend to like [&] for synchronous inline callbacks, and [x, y, z] for asynchronous event handlers.

Also feeling resistance when having to learn that kj::heap returns something std::unique_ptr-like which implicit casts to raw pointer. Are the kj types only used when necessary to interface with Cap'n'Proto, and std everywhere else?

I think basically yes but the word "only" is too strong. The code does tends to use kj library when cap'nproto types are involved and std library otherwise. But there are probably exceptions, and I haven't found kj types and macros to cause problems in practice. In general kj types tend to have nice debugging and safety features and have a real rationale behind them. They are not just NIH. I think kj::Own in particular has capabilities std::unique_ptr doesn't have because it doesn't bake the deleter into the type definition, making it easier to support things like memory pools and implement zero-copy techniques cap'n proto uses internally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your policy of only using [&] for indicating that the lambda will be called in current scope is nice when it comes to function-scope - as when creating server_connection, but less so in the case of TestSetup::server_disconnect when this is captured, which can perhaps be somewhat forgiven for a relatively simple lambda.


I agree that kj has some clever quirks that are missing from std. But is not a one-way street (implicit cast from Own to raw pointer for example). Would still advise towards using std when possible to make review more approachable. This passed mptest for example:

diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h
index a0bdf13..6358c86 100644
--- a/include/mp/proxy-io.h
+++ b/include/mp/proxy-io.h
@@ -179,7 +179,7 @@ public:
 
     //! Run function on event loop thread. Does not return until function completes.
     //! Must be called while the loop() function is active.
-    void post(kj::Function<void()> fn);
+    void post(std::function<void()> fn);
 
     //! Wrapper around EventLoop::post that takes advantage of the
     //! fact that callable will not go out of scope to avoid requirement that it
@@ -231,7 +231,7 @@ public:
     std::thread m_async_thread;
 
     //! Callback function to run on event loop thread during post() or sync() call.
-    kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
+    std::function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
 
     //! Callback functions to run on async thread.
     std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);
diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp
index 06825c9..6611cfb 100644
--- a/src/mp/proxy.cpp
+++ b/src/mp/proxy.cpp
@@ -263,7 +263,7 @@ void EventLoop::loop()
     m_cv.notify_all();
 }
 
-void EventLoop::post(kj::Function<void()> fn)
+void EventLoop::post(std::function<void()> fn)
 {
     if (std::this_thread::get_id() == m_thread_id) {
         fn();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #201 (comment)

Nice find on lambda inconsistencies, that would make sense to clean up. Would be happy to review a PR with any cleanups and improvements like this.

On kj::Function vs std::function, this was actually changed recently in 52256e7 because std::function unlike kj::Function requires function objects to be copyable, which prevents writing lambdas that capture move-only objects. So I'm actually surprised that diff compiles. Maybe it compiles because it is only changing EventLoop::post not Waiter::post? Happy to change to whatever types work though.

Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
ProxyClient(const ProxyClient&) = delete;
~ProxyClient();

void setDisconnectCallback(const std::function<void()>& fn);

//! Reference to callback function that is run if there is a sudden
//! disconnect and the Connection object is destroyed before this
//! ProxyClient<Thread> object. The callback will destroy this object and
Expand Down Expand Up @@ -275,16 +273,16 @@ struct Waiter
template <typename Fn>
void post(Fn&& fn)
{
const std::unique_lock<std::mutex> lock(m_mutex);
const Lock lock(m_mutex);
assert(!m_fn);
m_fn = std::forward<Fn>(fn);
m_cv.notify_all();
}

template <class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred)
void wait(Lock& lock, Predicate pred)
{
m_cv.wait(lock, [&] {
m_cv.wait(lock.m_lock, [&]() MP_REQUIRES(m_mutex) {
// Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
// a lost-wakeup bug. A new m_fn and m_cv notification might be sent
// after the fn() call and before the lock.lock() call in this loop
Expand All @@ -307,9 +305,9 @@ struct Waiter
//! mutexes than necessary. This mutex can be held at the same time as
//! EventLoop::m_mutex as long as Waiter::mutex is locked first and
//! EventLoop::m_mutex is locked second.
std::mutex m_mutex;
Mutex m_mutex;
std::condition_variable m_cv;
std::optional<kj::Function<void()>> m_fn;
std::optional<kj::Function<void()>> m_fn MP_GUARDED_BY(m_mutex);
};

//! Object holding network & rpc state associated with either an incoming server
Expand Down Expand Up @@ -534,29 +532,73 @@ void ProxyServerBase<Interface, Impl>::invokeDestroy()
CleanupRun(m_context.cleanup_fns);
}

using ConnThreads = std::map<Connection*, ProxyClient<Thread>>;
//! Map from Connection to local or remote thread handle which will be used over
//! that connection. This map will typically only contain one entry, but can
//! contain multiple if a single thread makes IPC calls over multiple
//! connections. A std::optional value type is used to avoid the map needing to
//! be locked while ProxyClient<Thread> objects are constructed, see
//! ThreadContext "Synchronization note" below.
using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
using ConnThread = ConnThreads::iterator;

// Retrieve ProxyClient<Thread> object associated with this connection from a
// map, or create a new one and insert it into the map. Return map iterator and
// inserted bool.
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread);
std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread);

//! The thread_local ThreadContext g_thread_context struct provides information
//! about individual threads and a way of communicating between them. Because
//! it's a thread local struct, each ThreadContext instance is initialized by
//! the thread that owns it.
//!
//! ThreadContext is used for any client threads created externally which make
//! IPC calls, and for server threads created by
//! ProxyServer<ThreadMap>::makeThread() which execute IPC calls for clients.
//!
//! In both cases, the struct holds information like the thread name, and a
//! Waiter object where the EventLoop can post incoming IPC requests to execute
//! on the thread. The struct also holds ConnThread maps associating the thread
//! with local and remote ProxyClient<Thread> objects.
struct ThreadContext
{
//! Identifying string for debug.
std::string thread_name;

//! Waiter object used to allow client threads blocked waiting for a server
//! response to execute callbacks made from the client's corresponding
//! server thread.
//! Waiter object used to allow remote clients to execute code on this
//! thread. For server threads created by
//! ProxyServer<ThreadMap>::makeThread(), this is initialized in that
//! function. Otherwise, for client threads created externally, this is
//! initialized the first time the thread tries to make an IPC call. Having
//! a waiter is necessary for threads making IPC calls in case a server they
//! are calling expects them to execute a callback during the call, before
//! it sends a response.
//!
//! For IPC client threads, the Waiter pointer is never cleared and the Waiter
//! just gets destroyed when the thread does. For server threads created by
//! makeThread(), this pointer is set to null in the ~ProxyServer<Thread> as
//! a signal for the thread to exit and destroy itself. In both cases, the
//! same Waiter object is used across different calls and only created and
//! destroyed once for the lifetime of the thread.
std::unique_ptr<Waiter> waiter = nullptr;

//! When client is making a request to a server, this is the
//! `callbackThread` argument it passes in the request, used by the server
//! in case it needs to make callbacks into the client that need to execute
//! while the client is waiting. This will be set to a local thread object.
ConnThreads callback_threads;
//!
//! Synchronization note: The callback_thread and request_thread maps are
//! only ever accessed internally by this thread's destructor and externally
//! by Cap'n Proto event loop threads. Since it's possible for IPC client
//! threads to make calls over different connections that could have
//! different event loops, these maps are guarded by Waiter::m_mutex in case
//! different event loop threads add or remove map entries simultaneously.
//! However, individual ProxyClient<Thread> objects in the maps will only be
//! associated with one event loop and guarded by EventLoop::m_mutex. So
//! Waiter::m_mutex does not need to be held while accessing individual
//! ProxyClient<Thread> instances, and may even need to be released to
//! respect lock order and avoid locking Waiter::m_mutex before
//! EventLoop::m_mutex.
ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex);

//! When client is making a request to a server, this is the `thread`
//! argument it passes in the request, used to control which thread on
Expand All @@ -565,7 +607,9 @@ struct ThreadContext
//! by makeThread. If a client call is being made from a thread currently
//! handling a server request, this will be set to the `callbackThread`
//! request thread argument passed in that request.
ConnThreads request_threads;
//!
//! Synchronization note: \ref callback_threads note applies here as well.
ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex);

//! Whether this thread is a capnp event loop thread. Not really used except
//! to assert false if there's an attempt to execute a blocking operation
Expand Down
8 changes: 4 additions & 4 deletions include/mp/proxy-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
const char* disconnected = nullptr;
proxy_client.m_context.loop->sync([&]() {
if (!proxy_client.m_context.connection) {
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
const Lock lock(thread_context.waiter->m_mutex);
done = true;
disconnected = "IPC client method called after disconnect.";
thread_context.waiter->m_cv.notify_all();
Expand All @@ -644,7 +644,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
} catch (...) {
exception = std::current_exception();
}
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
const Lock lock(thread_context.waiter->m_mutex);
done = true;
thread_context.waiter->m_cv.notify_all();
},
Expand All @@ -656,13 +656,13 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
proxy_client.m_context.loop->logPlain()
<< "{" << thread_context.thread_name << "} IPC client exception " << kj_exception;
}
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
const Lock lock(thread_context.waiter->m_mutex);
done = true;
thread_context.waiter->m_cv.notify_all();
}));
});

std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
Lock lock(thread_context.waiter->m_mutex);
thread_context.waiter->wait(lock, [&done]() { return done; });
if (exception) std::rethrow_exception(exception);
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;
Expand Down
50 changes: 30 additions & 20 deletions include/mp/type-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void CustomBuildField(TypeList<>,
// Also store the Thread::Client reference in the callback_threads map so
// future calls over this connection can reuse it.
auto [callback_thread, _]{SetThread(
thread_context.callback_threads, thread_context.waiter->m_mutex, &connection,
GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
[&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};

// Call remote ThreadMap.makeThread function so server will create a
Expand All @@ -43,12 +43,12 @@ void CustomBuildField(TypeList<>,
return request.send().getResult(); // Nonblocking due to capnp request pipelining.
}};
auto [request_thread, _1]{SetThread(
thread_context.request_threads, thread_context.waiter->m_mutex,
GuardedRef{thread_context.waiter->m_mutex, thread_context.request_threads},
&connection, make_request_thread)};

auto context = output.init();
context.setThread(request_thread->second.m_client);
context.setCallbackThread(callback_thread->second.m_client);
context.setThread(request_thread->second->m_client);
context.setCallbackThread(callback_thread->second->m_client);
}

//! PassField override for mp.Context arguments. Return asynchronously and call
Expand Down Expand Up @@ -89,29 +89,39 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// need to update the map.
auto& thread_context = g_thread_context;
auto& request_threads = thread_context.request_threads;
auto [request_thread, inserted]{SetThread(
request_threads, thread_context.waiter->m_mutex,
server.m_context.connection,
[&] { return context_arg.getCallbackThread(); })};
ConnThread request_thread;
bool inserted;
server.m_context.loop->sync([&] {
std::tie(request_thread, inserted) = SetThread(
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
[&] { return context_arg.getCallbackThread(); });
});

// If an entry was inserted into the requests_threads map,
// If an entry was inserted into the request_threads map,
// remove it after calling fn.invoke. If an entry was not
// inserted, one already existed, meaning this must be a
// recursive call (IPC call calling back to the caller which
// makes another IPC call), so avoid modifying the map.
const bool erase_thread{inserted};
KJ_DEFER(if (erase_thread) {
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
// Call erase here with a Connection* argument instead
// of an iterator argument, because the `request_thread`
// iterator may be invalid if the connection is closed
// during this function call. More specifically, the
// iterator may be invalid because SetThread adds a
// cleanup callback to the Connection destructor that
// erases the thread from the map, and also because the
// ProxyServer<Thread> destructor calls
// request_threads.clear().
request_threads.erase(server.m_context.connection);
// Erase the request_threads entry on the event loop
// thread with loop->sync(), so if the connection is
// broken there is not a race between this thread and
// the disconnect handler trying to destroy the thread
// client object.
server.m_context.loop->sync([&] {
// Look up the thread again without using existing
// iterator since entry may no longer be there after
// a disconnect. Destroy node after releasing
// Waiter::m_mutex, so the ProxyClient<Thread>
// destructor is able to use EventLoop::mutex
// without violating lock order.
ConnThreads::node_type removed;
{
Lock lock(thread_context.waiter->m_mutex);
removed = request_threads.extract(server.m_context.connection);
}
});
});
fn.invoke(server_context, args...);
}
Expand Down
11 changes: 11 additions & 0 deletions include/mp/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,17 @@ class MP_SCOPED_CAPABILITY Lock {
std::unique_lock<std::mutex> m_lock;
};

template<typename T>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meganit: Pretty sure Clang-format in main project prefers:

Suggested change
template<typename T>
template <typename T>

struct GuardedRef
{
Mutex& mutex;
T& ref MP_GUARDED_BY(mutex);
};

// CTAD for Clang 16: GuardedRef{mutex, x} -> GuardedRef<decltype(x)>
template <class U>
GuardedRef(Mutex&, U&) -> GuardedRef<U>;

//! Analog to std::lock_guard that unlocks instead of locks.
template <typename Lock>
struct UnlockGuard
Expand Down
Loading
Loading