diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4933588c..2e751c5f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,7 +11,7 @@ jobs: strategy: fail-fast: false matrix: - config: [default, llvm, gnu32] + config: [default, llvm, gnu32, sanitize] name: build • ${{ matrix.config }} diff --git a/ci/README.md b/ci/README.md index 72eebba8..85eb467c 100644 --- a/ci/README.md +++ b/ci/README.md @@ -16,9 +16,10 @@ All CI is just bash and nix. To run jobs locally: ```bash -CI_CONFIG=ci/configs/default.bash ci/scripts/run.sh -CI_CONFIG=ci/configs/llvm.bash ci/scripts/run.sh -CI_CONFIG=ci/configs/gnu32.bash ci/scripts/run.sh +CI_CONFIG=ci/configs/default.bash ci/scripts/run.sh +CI_CONFIG=ci/configs/llvm.bash ci/scripts/run.sh +CI_CONFIG=ci/configs/gnu32.bash ci/scripts/run.sh +CI_CONFIG=ci/configs/sanitize.bash ci/scripts/run.sh ``` By default CI jobs will reuse their build directories. `CI_CLEAN=1` can be specified to delete them before running instead. diff --git a/ci/configs/sanitize.bash b/ci/configs/sanitize.bash new file mode 100644 index 00000000..ce920f44 --- /dev/null +++ b/ci/configs/sanitize.bash @@ -0,0 +1,7 @@ +CI_DESC="CI job running ThreadSanitizer" +CI_DIR=build-sanitize +export CXX=clang++ +export CXXFLAGS="-ggdb -Werror -Wall -Wextra -Wpedantic -Wthread-safety-analysis -Wno-unused-parameter -fsanitize=thread" +CMAKE_ARGS=() +BUILD_ARGS=(-k -j4) +BUILD_TARGETS=(mptest) diff --git a/ci/scripts/ci.sh b/ci/scripts/ci.sh index 23c4cc13..baf21700 100755 --- a/ci/scripts/ci.sh +++ b/ci/scripts/ci.sh @@ -11,9 +11,12 @@ set -o errexit -o nounset -o pipefail -o xtrace [ "${CI_CONFIG+x}" ] && source "$CI_CONFIG" : "${CI_DIR:=build}" +if ! [ -v BUILD_TARGETS ]; then + BUILD_TARGETS=(all tests mpexamples) +fi [ -n "${CI_CLEAN-}" ] && rm -rf "${CI_DIR}" cmake -B "$CI_DIR" "${CMAKE_ARGS[@]+"${CMAKE_ARGS[@]}"}" -cmake --build "$CI_DIR" -t all tests mpexamples -- "${BUILD_ARGS[@]+"${BUILD_ARGS[@]}"}" +cmake --build "$CI_DIR" -t "${BUILD_TARGETS[@]}" -- "${BUILD_ARGS[@]+"${BUILD_ARGS[@]}"}" ctest --test-dir "$CI_DIR" --output-on-failure diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 3ed7bad7..367a9beb 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -66,16 +66,18 @@ struct ProxyClient : public ProxyClientBase ProxyClient(const ProxyClient&) = delete; ~ProxyClient(); - void setCleanup(const std::function& fn); - - //! Cleanup function to run when the connection is closed. If the Connection - //! gets destroyed before this ProxyClient object, this cleanup - //! callback lets it destroy this object and remove its entry in the - //! thread's request_threads or callback_threads map (after resetting - //! m_cleanup_it so the destructor does not try to access it). But if this - //! object gets destroyed before the Connection, there's no need to run the - //! cleanup function and the destructor will unregister it. - std::optional m_cleanup_it; + void setDisconnectCallback(const std::function& fn); + + //! Reference to callback function that is run if there is a sudden + //! disconnect and the Connection object is destroyed before this + //! ProxyClient object. The callback will destroy this object and + //! remove its entry from the thread's request_threads or callback_threads + //! map. It will also reset m_disconnect_cb so the destructor does not + //! access it. In the normal case where there is no sudden disconnect, the + //! destructor will unregister m_disconnect_cb so the callback is never run. + //! Since this variable is accessed from multiple threads, accesses should + //! be guarded with the associated Waiter::m_mutex. + std::optional m_disconnect_cb; }; template <> @@ -298,6 +300,13 @@ struct Waiter }); } + //! Mutex mainly used internally by waiter class, but also used externally + //! to guard access to related state. Specifically, since the thread_local + //! ThreadContext struct owns a Waiter, the Waiter::m_mutex is used to guard + //! access to other parts of the struct to avoid needing to deal with more + //! mutexes than necessary. This mutex can be held at the same time as + //! EventLoop::m_mutex as long as Waiter::mutex is locked first and + //! EventLoop::m_mutex is locked second. std::mutex m_mutex; std::condition_variable m_cv; std::optional> m_fn; @@ -393,11 +402,12 @@ ProxyClientBase::ProxyClientBase(typename Interface::Client cli { // Handler for the connection getting destroyed before this client object. - auto cleanup_it = m_context.connection->addSyncCleanup([this]() { + auto disconnect_cb = m_context.connection->addSyncCleanup([this]() { // Release client capability by move-assigning to temporary. { typename Interface::Client(std::move(m_client)); } + Lock lock{m_context.loop->m_mutex}; m_context.connection = nullptr; }); @@ -410,14 +420,10 @@ ProxyClientBase::ProxyClientBase(typename Interface::Client cli // down while external code is still holding client references. // // The first case is handled here when m_context.connection is not null. The - // second case is handled by the cleanup function, which sets m_context.connection to - // null so nothing happens here. - m_context.cleanup_fns.emplace_front([this, destroy_connection, cleanup_it]{ - if (m_context.connection) { - // Remove cleanup callback so it doesn't run and try to access - // this object after it's already destroyed. - m_context.connection->removeSyncCleanup(cleanup_it); - + // second case is handled by the disconnect_cb function, which sets + // m_context.connection to null so nothing happens here. + m_context.cleanup_fns.emplace_front([this, destroy_connection, disconnect_cb]{ + { // If the capnp interface defines a destroy method, call it to destroy // the remote object, waiting for it to be deleted server side. If the // capnp interface does not define a destroy method, this will just call @@ -426,6 +432,14 @@ ProxyClientBase::ProxyClientBase(typename Interface::Client cli // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code m_context.loop->sync([&]() { + // Remove disconnect callback on cleanup so it doesn't run and try + // to access this object after it's destroyed. This call needs to + // run inside loop->sync() on the event loop thread because + // otherwise, if there were an ill-timed disconnect, the + // onDisconnect handler could fire and delete the Connection object + // before the removeSyncCleanup call. + if (m_context.connection) m_context.connection->removeSyncCleanup(disconnect_cb); + // Release client capability by move-assigning to temporary. { typename Interface::Client(std::move(m_client)); diff --git a/include/mp/proxy-types.h b/include/mp/proxy-types.h index 597bdfda..de96d134 100644 --- a/include/mp/proxy-types.h +++ b/include/mp/proxy-types.h @@ -609,42 +609,44 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel << "{" << g_thread_context.thread_name << "} IPC client first request from current thread, constructing waiter"; } - ClientInvokeContext invoke_context{*proxy_client.m_context.connection, g_thread_context}; + ThreadContext& thread_context{g_thread_context}; + std::optional invoke_context; // Must outlive waiter->wait() call below std::exception_ptr exception; std::string kj_exception; bool done = false; const char* disconnected = nullptr; proxy_client.m_context.loop->sync([&]() { if (!proxy_client.m_context.connection) { - const std::unique_lock lock(invoke_context.thread_context.waiter->m_mutex); + const std::unique_lock lock(thread_context.waiter->m_mutex); done = true; disconnected = "IPC client method called after disconnect."; - invoke_context.thread_context.waiter->m_cv.notify_all(); + thread_context.waiter->m_cv.notify_all(); return; } auto request = (proxy_client.m_client.*get_request)(nullptr); using Request = CapRequestTraits; using FieldList = typename ProxyClientMethodTraits::Fields; - IterateFields().handleChain(invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...); + invoke_context.emplace(*proxy_client.m_context.connection, thread_context); + IterateFields().handleChain(*invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...); proxy_client.m_context.loop->logPlain() - << "{" << invoke_context.thread_context.thread_name << "} IPC client send " + << "{" << thread_context.thread_name << "} IPC client send " << TypeName() << " " << LogEscape(request.toString()); proxy_client.m_context.loop->m_task_set->add(request.send().then( [&](::capnp::Response&& response) { proxy_client.m_context.loop->logPlain() - << "{" << invoke_context.thread_context.thread_name << "} IPC client recv " + << "{" << thread_context.thread_name << "} IPC client recv " << TypeName() << " " << LogEscape(response.toString()); try { IterateFields().handleChain( - invoke_context, response, FieldList(), typename FieldObjs::ReadResults{&fields}...); + *invoke_context, response, FieldList(), typename FieldObjs::ReadResults{&fields}...); } catch (...) { exception = std::current_exception(); } - const std::unique_lock lock(invoke_context.thread_context.waiter->m_mutex); + const std::unique_lock lock(thread_context.waiter->m_mutex); done = true; - invoke_context.thread_context.waiter->m_cv.notify_all(); + thread_context.waiter->m_cv.notify_all(); }, [&](const ::kj::Exception& e) { if (e.getType() == ::kj::Exception::Type::DISCONNECTED) { @@ -652,16 +654,16 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel } else { kj_exception = kj::str("kj::Exception: ", e).cStr(); proxy_client.m_context.loop->logPlain() - << "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception; + << "{" << thread_context.thread_name << "} IPC client exception " << kj_exception; } - const std::unique_lock lock(invoke_context.thread_context.waiter->m_mutex); + const std::unique_lock lock(thread_context.waiter->m_mutex); done = true; - invoke_context.thread_context.waiter->m_cv.notify_all(); + thread_context.waiter->m_cv.notify_all(); })); }); - std::unique_lock lock(invoke_context.thread_context.waiter->m_mutex); - invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; }); + std::unique_lock lock(thread_context.waiter->m_mutex); + thread_context.waiter->wait(lock, [&done]() { return done; }); if (exception) std::rethrow_exception(exception); if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception; if (disconnected) proxy_client.m_context.loop->raise() << disconnected; diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 952734f3..894daadb 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -69,7 +69,6 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& const auto& params = call_context.getParams(); Context::Reader context_arg = Accessor::get(params); ServerContext server_context{server, call_context, req}; - bool disconnected{false}; { // Before invoking the function, store a reference to the // callbackThread provided by the client in the @@ -101,7 +100,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // recursive call (IPC call calling back to the caller which // makes another IPC call), so avoid modifying the map. const bool erase_thread{inserted}; - KJ_DEFER({ + KJ_DEFER(if (erase_thread) { std::unique_lock lock(thread_context.waiter->m_mutex); // Call erase here with a Connection* argument instead // of an iterator argument, because the `request_thread` @@ -112,24 +111,10 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // erases the thread from the map, and also because the // ProxyServer destructor calls // request_threads.clear(). - if (erase_thread) { - disconnected = !request_threads.erase(server.m_context.connection); - } else { - disconnected = !request_threads.count(server.m_context.connection); - } + request_threads.erase(server.m_context.connection); }); fn.invoke(server_context, args...); } - if (disconnected) { - // If disconnected is true, the Connection object was - // destroyed during the method call. Deal with this by - // returning without ever fulfilling the promise, which will - // cause the ProxyServer object to leak. This is not ideal, - // but fixing the leak will require nontrivial code changes - // because there is a lot of code assuming ProxyServer - // objects are destroyed before Connection objects. - return; - } KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { server.m_context.loop->sync([&] { auto fulfiller_dispose = kj::mv(fulfiller); diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 579f7995..c9fecf5c 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -132,9 +132,11 @@ Connection::~Connection() // on clean and unclean shutdowns. In unclean shutdown case when the // connection is broken, sync and async cleanup lists will filled with // callbacks. In the clean shutdown case both lists will be empty. + Lock lock{m_loop->m_mutex}; while (!m_sync_cleanup_fns.empty()) { - m_sync_cleanup_fns.front()(); - m_sync_cleanup_fns.pop_front(); + CleanupList fn; + fn.splice(fn.begin(), m_sync_cleanup_fns, m_sync_cleanup_fns.begin()); + Unlock(lock, fn.front()); } } @@ -311,18 +313,18 @@ std::tuple SetThread(ConnThreads& threads, std::mutex& mutex, thread = threads.emplace( std::piecewise_construct, std::forward_as_tuple(connection), std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first; - thread->second.setCleanup([&threads, &mutex, thread] { + thread->second.setDisconnectCallback([&threads, &mutex, thread] { // Note: it is safe to use the `thread` iterator in this cleanup // function, because the iterator would only be invalid if the map entry // was removed, and if the map entry is removed the ProxyClient // destructor unregisters the cleanup. // Connection is being destroyed before thread client is, so reset - // thread client m_cleanup_it member so thread client destructor does not - // try unregister this callback after connection is destroyed. - thread->second.m_cleanup_it.reset(); + // thread client m_disconnect_cb member so thread client destructor does not + // try to unregister this callback after connection is destroyed. // Remove connection pointer about to be destroyed from the map const std::unique_lock lock(mutex); + thread->second.m_disconnect_cb.reset(); threads.erase(thread); }); return {thread, true}; @@ -333,16 +335,16 @@ ProxyClient::~ProxyClient() // If thread is being destroyed before connection is destroyed, remove the // cleanup callback that was registered to handle the connection being // destroyed before the thread being destroyed. - if (m_cleanup_it) { - m_context.connection->removeSyncCleanup(*m_cleanup_it); + if (m_disconnect_cb) { + m_context.connection->removeSyncCleanup(*m_disconnect_cb); } } -void ProxyClient::setCleanup(const std::function& fn) +void ProxyClient::setDisconnectCallback(const std::function& fn) { assert(fn); - assert(!m_cleanup_it); - m_cleanup_it = m_context.connection->addSyncCleanup(fn); + assert(!m_disconnect_cb); + m_disconnect_cb = m_context.connection->addSyncCleanup(fn); } ProxyServer::ProxyServer(ThreadContext& thread_context, std::thread&& thread) diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp index fa5a6f4c..1793b34c 100644 --- a/test/mp/test/test.cpp +++ b/test/mp/test/test.cpp @@ -49,12 +49,14 @@ namespace test { class TestSetup { public: - std::thread thread; std::function server_disconnect; std::function client_disconnect; std::promise>> client_promise; std::unique_ptr> client; ProxyServer* server{nullptr}; + //! Thread variable should be after other struct members so the thread does + //! not start until the other members are initialized. + std::thread thread; TestSetup(bool client_owns_connection = true) : thread{[&] { @@ -266,12 +268,12 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call") // ProxyServer objects associated with the connection. Having an in-progress // RPC call requires keeping the ProxyServer longer. + std::promise signal; TestSetup setup{/*client_owns_connection=*/false}; ProxyClient* foo = setup.client.get(); KJ_EXPECT(foo->add(1, 2) == 3); foo->initThreadMap(); - std::promise signal; setup.server->m_impl->m_fn = [&] { EventLoopRef loop{*setup.server->m_context.loop}; setup.client_disconnect(); @@ -287,6 +289,11 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call") } KJ_EXPECT(disconnected); + // Now that the disconnect has been detected, set signal allowing the + // callFnAsync() IPC call to return. Since signalling may not wake up the + // thread right away, it is important for the signal variable to be declared + // *before* the TestSetup variable so is not destroyed while + // signal.get_future().get() is called. signal.set_value(); }