Skip to content

Commit 535fa0a

Browse files
committed
Squashed 'src/ipc/libmultiprocess/' changes from 13424cf2ecc1..47d79db8a552
47d79db8a552 Merge bitcoin-core/libmultiprocess#201: bug: fix mptest hang, ProxyClient<Thread> deadlock in disconnect handler f15ae9c9b9fb Merge bitcoin-core/libmultiprocess#211: Add .gitignore 4a269b21b8c8 bug: fix ProxyClient<Thread> deadlock if disconnected as IPC call is returning 85df96482c49 Use try_emplace in SetThread instead of threads.find ca9b380ea91a Use std::optional in ConnThreads to allow shortening locks 9b0799113557 doc: describe ThreadContext struct and synchronization requirements d60db601ed9b proxy-io.h: add Waiter::m_mutex thread safety annotations 4e365b019a9f ci: Use -Wthread-safety not -Wthread-safety-analysis 15d7bafbb001 Add .gitignore fe1cd8c76131 Merge bitcoin-core/libmultiprocess#208: ci: Test minimum cmake version in olddeps job b713a0b7bfbc Merge bitcoin-core/libmultiprocess#207: ci: output CMake version in CI script 0f580397c913 ci: Test minimum cmake version in olddeps job d603dcc0eef0 ci: output CMake version in CI script git-subtree-dir: src/ipc/libmultiprocess git-subtree-split: 47d79db8a5528097b408e18f7b0bae11a6702d26
1 parent a334bbe commit 535fa0a

File tree

11 files changed

+190
-79
lines changed

11 files changed

+190
-79
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# CMake artifacts
2+
/*build*
3+
4+
# Git artifacts
5+
*.patch

ci/configs/llvm.bash

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ CI_DESC="CI job using LLVM-based libraries and tools (clang, libc++, clang-tidy,
22
CI_DIR=build-llvm
33
NIX_ARGS=(--arg enableLibcxx true)
44
export CXX=clang++
5-
export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wthread-safety-analysis -Wno-unused-parameter"
5+
export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wthread-safety -Wno-unused-parameter"
66
CMAKE_ARGS=(
77
-G Ninja
88
-DMP_ENABLE_CLANG_TIDY=ON

ci/configs/olddeps.bash

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
CI_DESC="CI job using old Cap'n Proto version"
1+
CI_DESC="CI job using old Cap'n Proto and cmake versions"
22
CI_DIR=build-olddeps
33
export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wno-unused-parameter -Wno-error=array-bounds"
4-
NIX_ARGS=(--argstr capnprotoVersion "0.7.1")
4+
NIX_ARGS=(--argstr capnprotoVersion "0.7.1" --argstr cmakeVersion "3.12.4")
55
BUILD_ARGS=(-k)

ci/configs/sanitize.bash

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
CI_DESC="CI job running ThreadSanitizer"
22
CI_DIR=build-sanitize
33
export CXX=clang++
4-
export CXXFLAGS="-ggdb -Werror -Wall -Wextra -Wpedantic -Wthread-safety-analysis -Wno-unused-parameter -fsanitize=thread"
4+
export CXXFLAGS="-ggdb -Werror -Wall -Wextra -Wpedantic -Wthread-safety -Wno-unused-parameter -fsanitize=thread"
55
CMAKE_ARGS=()
66
BUILD_ARGS=(-k -j4)
77
BUILD_TARGETS=(mptest)

ci/scripts/ci.sh

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,21 @@ fi
1717

1818
[ -n "${CI_CLEAN-}" ] && rm -rf "${CI_DIR}"
1919

20-
cmake -B "$CI_DIR" "${CMAKE_ARGS[@]+"${CMAKE_ARGS[@]}"}"
21-
cmake --build "$CI_DIR" -t "${BUILD_TARGETS[@]}" -- "${BUILD_ARGS[@]+"${BUILD_ARGS[@]}"}"
22-
ctest --test-dir "$CI_DIR" --output-on-failure
20+
cmake --version
21+
cmake_ver=$(cmake --version | awk '/version/{print $3; exit}')
22+
ver_ge() { [ "$(printf '%s\n' "$2" "$1" | sort -V | head -n1)" = "$2" ]; }
23+
24+
src_dir=$PWD
25+
mkdir -p "$CI_DIR"
26+
cd "$CI_DIR"
27+
cmake "$src_dir" "${CMAKE_ARGS[@]+"${CMAKE_ARGS[@]}"}"
28+
if ver_ge "$cmake_ver" "3.15"; then
29+
cmake --build . -t "${BUILD_TARGETS[@]}" -- "${BUILD_ARGS[@]+"${BUILD_ARGS[@]}"}"
30+
else
31+
# Older versions of cmake can only build one target at a time with --target,
32+
# and do not support -t shortcut
33+
for t in "${BUILD_TARGETS[@]}"; do
34+
cmake --build . --target "$t" -- "${BUILD_ARGS[@]+"${BUILD_ARGS[@]}"}"
35+
done
36+
fi
37+
ctest --output-on-failure

include/mp/proxy-io.h

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
6666
ProxyClient(const ProxyClient&) = delete;
6767
~ProxyClient();
6868

69-
void setDisconnectCallback(const std::function<void()>& fn);
70-
7169
//! Reference to callback function that is run if there is a sudden
7270
//! disconnect and the Connection object is destroyed before this
7371
//! ProxyClient<Thread> object. The callback will destroy this object and
@@ -285,16 +283,16 @@ struct Waiter
285283
template <typename Fn>
286284
void post(Fn&& fn)
287285
{
288-
const std::unique_lock<std::mutex> lock(m_mutex);
286+
const Lock lock(m_mutex);
289287
assert(!m_fn);
290288
m_fn = std::forward<Fn>(fn);
291289
m_cv.notify_all();
292290
}
293291

294292
template <class Predicate>
295-
void wait(std::unique_lock<std::mutex>& lock, Predicate pred)
293+
void wait(Lock& lock, Predicate pred)
296294
{
297-
m_cv.wait(lock, [&] {
295+
m_cv.wait(lock.m_lock, [&]() MP_REQUIRES(m_mutex) {
298296
// Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
299297
// a lost-wakeup bug. A new m_fn and m_cv notification might be sent
300298
// after the fn() call and before the lock.lock() call in this loop
@@ -317,9 +315,9 @@ struct Waiter
317315
//! mutexes than necessary. This mutex can be held at the same time as
318316
//! EventLoop::m_mutex as long as Waiter::mutex is locked first and
319317
//! EventLoop::m_mutex is locked second.
320-
std::mutex m_mutex;
318+
Mutex m_mutex;
321319
std::condition_variable m_cv;
322-
std::optional<kj::Function<void()>> m_fn;
320+
std::optional<kj::Function<void()>> m_fn MP_GUARDED_BY(m_mutex);
323321
};
324322

325323
//! Object holding network & rpc state associated with either an incoming server
@@ -544,29 +542,73 @@ void ProxyServerBase<Interface, Impl>::invokeDestroy()
544542
CleanupRun(m_context.cleanup_fns);
545543
}
546544

547-
using ConnThreads = std::map<Connection*, ProxyClient<Thread>>;
545+
//! Map from Connection to local or remote thread handle which will be used over
546+
//! that connection. This map will typically only contain one entry, but can
547+
//! contain multiple if a single thread makes IPC calls over multiple
548+
//! connections. A std::optional value type is used to avoid the map needing to
549+
//! be locked while ProxyClient<Thread> objects are constructed, see
550+
//! ThreadContext "Synchronization note" below.
551+
using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
548552
using ConnThread = ConnThreads::iterator;
549553

550554
// Retrieve ProxyClient<Thread> object associated with this connection from a
551555
// map, or create a new one and insert it into the map. Return map iterator and
552556
// inserted bool.
553-
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread);
557+
std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread);
554558

559+
//! The thread_local ThreadContext g_thread_context struct provides information
560+
//! about individual threads and a way of communicating between them. Because
561+
//! it's a thread local struct, each ThreadContext instance is initialized by
562+
//! the thread that owns it.
563+
//!
564+
//! ThreadContext is used for any client threads created externally which make
565+
//! IPC calls, and for server threads created by
566+
//! ProxyServer<ThreadMap>::makeThread() which execute IPC calls for clients.
567+
//!
568+
//! In both cases, the struct holds information like the thread name, and a
569+
//! Waiter object where the EventLoop can post incoming IPC requests to execute
570+
//! on the thread. The struct also holds ConnThread maps associating the thread
571+
//! with local and remote ProxyClient<Thread> objects.
555572
struct ThreadContext
556573
{
557574
//! Identifying string for debug.
558575
std::string thread_name;
559576

560-
//! Waiter object used to allow client threads blocked waiting for a server
561-
//! response to execute callbacks made from the client's corresponding
562-
//! server thread.
577+
//! Waiter object used to allow remote clients to execute code on this
578+
//! thread. For server threads created by
579+
//! ProxyServer<ThreadMap>::makeThread(), this is initialized in that
580+
//! function. Otherwise, for client threads created externally, this is
581+
//! initialized the first time the thread tries to make an IPC call. Having
582+
//! a waiter is necessary for threads making IPC calls in case a server they
583+
//! are calling expects them to execute a callback during the call, before
584+
//! it sends a response.
585+
//!
586+
//! For IPC client threads, the Waiter pointer is never cleared and the Waiter
587+
//! just gets destroyed when the thread does. For server threads created by
588+
//! makeThread(), this pointer is set to null in the ~ProxyServer<Thread> as
589+
//! a signal for the thread to exit and destroy itself. In both cases, the
590+
//! same Waiter object is used across different calls and only created and
591+
//! destroyed once for the lifetime of the thread.
563592
std::unique_ptr<Waiter> waiter = nullptr;
564593

565594
//! When client is making a request to a server, this is the
566595
//! `callbackThread` argument it passes in the request, used by the server
567596
//! in case it needs to make callbacks into the client that need to execute
568597
//! while the client is waiting. This will be set to a local thread object.
569-
ConnThreads callback_threads;
598+
//!
599+
//! Synchronization note: The callback_thread and request_thread maps are
600+
//! only ever accessed internally by this thread's destructor and externally
601+
//! by Cap'n Proto event loop threads. Since it's possible for IPC client
602+
//! threads to make calls over different connections that could have
603+
//! different event loops, these maps are guarded by Waiter::m_mutex in case
604+
//! different event loop threads add or remove map entries simultaneously.
605+
//! However, individual ProxyClient<Thread> objects in the maps will only be
606+
//! associated with one event loop and guarded by EventLoop::m_mutex. So
607+
//! Waiter::m_mutex does not need to be held while accessing individual
608+
//! ProxyClient<Thread> instances, and may even need to be released to
609+
//! respect lock order and avoid locking Waiter::m_mutex before
610+
//! EventLoop::m_mutex.
611+
ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex);
570612

571613
//! When client is making a request to a server, this is the `thread`
572614
//! argument it passes in the request, used to control which thread on
@@ -575,7 +617,9 @@ struct ThreadContext
575617
//! by makeThread. If a client call is being made from a thread currently
576618
//! handling a server request, this will be set to the `callbackThread`
577619
//! request thread argument passed in that request.
578-
ConnThreads request_threads;
620+
//!
621+
//! Synchronization note: \ref callback_threads note applies here as well.
622+
ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex);
579623

580624
//! Whether this thread is a capnp event loop thread. Not really used except
581625
//! to assert false if there's an attempt to execute a blocking operation

include/mp/proxy-types.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
617617
const char* disconnected = nullptr;
618618
proxy_client.m_context.loop->sync([&]() {
619619
if (!proxy_client.m_context.connection) {
620-
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
620+
const Lock lock(thread_context.waiter->m_mutex);
621621
done = true;
622622
disconnected = "IPC client method called after disconnect.";
623623
thread_context.waiter->m_cv.notify_all();
@@ -644,7 +644,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
644644
} catch (...) {
645645
exception = std::current_exception();
646646
}
647-
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
647+
const Lock lock(thread_context.waiter->m_mutex);
648648
done = true;
649649
thread_context.waiter->m_cv.notify_all();
650650
},
@@ -656,13 +656,13 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
656656
proxy_client.m_context.loop->logPlain()
657657
<< "{" << thread_context.thread_name << "} IPC client exception " << kj_exception;
658658
}
659-
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
659+
const Lock lock(thread_context.waiter->m_mutex);
660660
done = true;
661661
thread_context.waiter->m_cv.notify_all();
662662
}));
663663
});
664664

665-
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
665+
Lock lock(thread_context.waiter->m_mutex);
666666
thread_context.waiter->wait(lock, [&done]() { return done; });
667667
if (exception) std::rethrow_exception(exception);
668668
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;

include/mp/type-context.h

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ void CustomBuildField(TypeList<>,
2525
// Also store the Thread::Client reference in the callback_threads map so
2626
// future calls over this connection can reuse it.
2727
auto [callback_thread, _]{SetThread(
28-
thread_context.callback_threads, thread_context.waiter->m_mutex, &connection,
28+
GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
2929
[&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
3030

3131
// Call remote ThreadMap.makeThread function so server will create a
@@ -43,12 +43,12 @@ void CustomBuildField(TypeList<>,
4343
return request.send().getResult(); // Nonblocking due to capnp request pipelining.
4444
}};
4545
auto [request_thread, _1]{SetThread(
46-
thread_context.request_threads, thread_context.waiter->m_mutex,
46+
GuardedRef{thread_context.waiter->m_mutex, thread_context.request_threads},
4747
&connection, make_request_thread)};
4848

4949
auto context = output.init();
50-
context.setThread(request_thread->second.m_client);
51-
context.setCallbackThread(callback_thread->second.m_client);
50+
context.setThread(request_thread->second->m_client);
51+
context.setCallbackThread(callback_thread->second->m_client);
5252
}
5353

5454
//! PassField override for mp.Context arguments. Return asynchronously and call
@@ -89,29 +89,39 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
8989
// need to update the map.
9090
auto& thread_context = g_thread_context;
9191
auto& request_threads = thread_context.request_threads;
92-
auto [request_thread, inserted]{SetThread(
93-
request_threads, thread_context.waiter->m_mutex,
94-
server.m_context.connection,
95-
[&] { return context_arg.getCallbackThread(); })};
92+
ConnThread request_thread;
93+
bool inserted;
94+
server.m_context.loop->sync([&] {
95+
std::tie(request_thread, inserted) = SetThread(
96+
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
97+
[&] { return context_arg.getCallbackThread(); });
98+
});
9699

97-
// If an entry was inserted into the requests_threads map,
100+
// If an entry was inserted into the request_threads map,
98101
// remove it after calling fn.invoke. If an entry was not
99102
// inserted, one already existed, meaning this must be a
100103
// recursive call (IPC call calling back to the caller which
101104
// makes another IPC call), so avoid modifying the map.
102105
const bool erase_thread{inserted};
103106
KJ_DEFER(if (erase_thread) {
104-
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
105-
// Call erase here with a Connection* argument instead
106-
// of an iterator argument, because the `request_thread`
107-
// iterator may be invalid if the connection is closed
108-
// during this function call. More specifically, the
109-
// iterator may be invalid because SetThread adds a
110-
// cleanup callback to the Connection destructor that
111-
// erases the thread from the map, and also because the
112-
// ProxyServer<Thread> destructor calls
113-
// request_threads.clear().
114-
request_threads.erase(server.m_context.connection);
107+
// Erase the request_threads entry on the event loop
108+
// thread with loop->sync(), so if the connection is
109+
// broken there is not a race between this thread and
110+
// the disconnect handler trying to destroy the thread
111+
// client object.
112+
server.m_context.loop->sync([&] {
113+
// Look up the thread again without using existing
114+
// iterator since entry may no longer be there after
115+
// a disconnect. Destroy node after releasing
116+
// Waiter::m_mutex, so the ProxyClient<Thread>
117+
// destructor is able to use EventLoop::mutex
118+
// without violating lock order.
119+
ConnThreads::node_type removed;
120+
{
121+
Lock lock(thread_context.waiter->m_mutex);
122+
removed = request_threads.extract(server.m_context.connection);
123+
}
124+
});
115125
});
116126
fn.invoke(server_context, args...);
117127
}

include/mp/util.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,17 @@ class MP_SCOPED_CAPABILITY Lock {
182182
std::unique_lock<std::mutex> m_lock;
183183
};
184184

185+
template<typename T>
186+
struct GuardedRef
187+
{
188+
Mutex& mutex;
189+
T& ref MP_GUARDED_BY(mutex);
190+
};
191+
192+
// CTAD for Clang 16: GuardedRef{mutex, x} -> GuardedRef<decltype(x)>
193+
template <class U>
194+
GuardedRef(Mutex&, U&) -> GuardedRef<U>;
195+
185196
//! Analog to std::lock_guard that unlocks instead of locks.
186197
template <typename Lock>
187198
struct UnlockGuard

shell.nix

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
, enableLibcxx ? false # Whether to use libc++ toolchain and libraries instead of libstdc++
44
, minimal ? false # Whether to create minimal shell without extra tools (faster when cross compiling)
55
, capnprotoVersion ? null
6+
, cmakeVersion ? null
67
}:
78

89
let
@@ -37,12 +38,23 @@ let
3738
capnproto = capnprotoBase.override (lib.optionalAttrs enableLibcxx { clangStdenv = llvm.libcxxStdenv; });
3839
clang = if enableLibcxx then llvm.libcxxClang else llvm.clang;
3940
clang-tools = llvm.clang-tools.override { inherit enableLibcxx; };
41+
cmakeHashes = {
42+
"3.12.4" = "sha256-UlVYS/0EPrcXViz/iULUcvHA5GecSUHYS6raqbKOMZQ=";
43+
};
44+
cmakeBuild = if cmakeVersion == null then pkgs.cmake else (pkgs.cmake.overrideAttrs (old: {
45+
version = cmakeVersion;
46+
src = pkgs.fetchurl {
47+
url = "https://cmake.org/files/v${lib.versions.majorMinor cmakeVersion}/cmake-${cmakeVersion}.tar.gz";
48+
hash = lib.attrByPath [cmakeVersion] "" cmakeHashes;
49+
};
50+
patches = [];
51+
})).override { isMinimalBuild = true; };
4052
in crossPkgs.mkShell {
4153
buildInputs = [
4254
capnproto
4355
];
4456
nativeBuildInputs = with pkgs; [
45-
cmake
57+
cmakeBuild
4658
include-what-you-use
4759
ninja
4860
] ++ lib.optionals (!minimal) [

0 commit comments

Comments
 (0)