Skip to content

Commit cbe5d32

Browse files
committed
proxy-io: fix race condition ProxyClientBase cleanup handler
This fixes a TSAN error in the "disconnecting and blocking during the call" unit test. The error output is below and problem it shows is that the ProxyClientBase `m_context.connection->removeSyncCleanup(cleanup_it);` call is dereferencing `Connection::m_loop.m_loop` pointer variable, shortly before the ~Connection destructor sets the pointer to null in another thread, without any synchronization in between. This could cause segfaults if the timing was different and the onDisconnect call happened at the same time as the removeSyncCleanup call. The fix for this error is to move the removeSyncCleanup call into a loop->sync() block so it runs on the event loop thread instead of running asynchronously. Moving this removeSyncCleanup to another thread required adding more synchronization around other removeSyncCleanup and addSyncCleanup cleanup calls, to prevent new races, so other places calling these were also updated to acquire and pass locks to these methods. Error output showing the details about the race condition is below. The test case that triggers it has 3 threads. There is the main client thread calling foo->callFnAsync() which is idle during this time. There is the corresponding server thread T11 in the stack trace below which is executing the callFnAsync() function, and calling setup.client_disconnect() to check for correct behavior when there is a disconnect during the IPC call. Finally there is the event loop thread T10 in the stack trace below whic is actually deleting the server Connection object in response to the client_disconnect() call (which deletes the client Connection object). During the callFnAsync() call, the server thread T11 responsible for executing that call creates a ProxyClient<mp::Thread> object and inserts it into the g_thread_context.request_threads std::map in the type-context.h CustomPassField function, in case the server needs to make an IPC call to the client (which it does not in this test). Before the CustomPassField funcion returns it deletes the entry in the request_threads map, deleting the ProxyClient<Thread> object, and leading to the ProxyClientBase<Thread> `m_context.connection->removeSyncCleanup(cleanup_it);` call referenced in the beginning which accesses the m_loop pointer variable without a lock. [ TEST ] test.cpp:249: Calling IPC method, disconnecting and blocking during the call ================== WARNING: ThreadSanitizer: data race (pid=1971504) Write of size 8 at 0x726000012c00 by thread T10: #0 mp::EventLoopRef::reset(bool) src/mp/proxy.cpp:61 (mptest+0x2213bf) #1 mp::Connection::~Connection() include/mp/proxy.h:58 (mptest+0x221a9b) #2 kj::_::TransformPromiseNode<kj::_::Void, kj::_::Void, mp::test::TestSetup::TestSetup(bool)::{lambda()#1}::operator()() const::{lambda()#2}, kj::_::PropagateException>::getImpl(kj::_::ExceptionOrValue&) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/unique_ptr.h:93 (mptest+0x133817) #3 kj::_::TransformPromiseNodeBase::get(kj::_::ExceptionOrValue&) ??:? (libkj-async.so.1.1.0+0x3da3d) (BuildId: 21ff3d5ab929b5e72be4dbbab1a9223f705af6e4) #4 mp::EventLoop::loop() src/mp/proxy.cpp:229 (mptest+0x222f60) #5 mp::test::TestSetup::TestSetup(bool)::{lambda()#1}::operator()() const test/mp/test/test.cpp:90 (mptest+0x130d23) #6 void std::__invoke_impl<void, mp::test::TestSetup::TestSetup(bool)::{lambda()#1}>(std::__invoke_other, mp::test::TestSetup::TestSetup(bool)::{lambda()#1}&&) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/invoke.h:61 (mptest+0x130789) #7 std::__invoke_result<mp::test::TestSetup::TestSetup(bool)::{lambda()#1}>::type std::__invoke<mp::test::TestSetup::TestSetup(bool)::{lambda()#1}>(mp::test::TestSetup::TestSetup(bool)::{lambda()#1}&&) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/invoke.h:96 (mptest+0x130789) #8 void std::thread::_Invoker<std::tuple<mp::test::TestSetup::TestSetup(bool)::{lambda()#1}> >::_M_invoke<0ul>(std::_Index_tuple<0ul>) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/std_thread.h:301 (mptest+0x130789) #9 std::thread::_Invoker<std::tuple<mp::test::TestSetup::TestSetup(bool)::{lambda()#1}> >::operator()() /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/std_thread.h:308 (mptest+0x130789) #10 std::thread::_State_impl<std::thread::_Invoker<std::tuple<mp::test::TestSetup::TestSetup(bool)::{lambda()#1}> > >::_M_run() /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/std_thread.h:253 (mptest+0x130789) #11 execute_native_thread_routine ??:? (libstdc++.so.6+0xed063) Previous read of size 8 at 0x726000012c00 by thread T11 (mutexes: write M0): #0 mp::Connection::removeSyncCleanup(std::_List_iterator<std::function<void ()> >) include/mp/proxy.h:60 (mptest+0x221dbf) #1 std::_Function_handler<void (), mp::ProxyClientBase<mp::Thread, capnp::Void>::ProxyClientBase(mp::Thread::Client, mp::Connection*, bool)::{lambda()#2}>::_M_invoke(std::_Any_data const&) include/mp/proxy-io.h:419 (mptest+0x228814) #2 mp::CleanupRun(std::__cxx11::list<std::function<void ()>, std::allocator<std::function<void ()> > >&) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/std_function.h:591 (mptest+0x1ca65b) #3 mp::ProxyClientBase<mp::Thread, capnp::Void>::~ProxyClientBase() include/mp/proxy-io.h:446 (mptest+0x227790) #4 mp::ProxyClient<mp::Thread>::~ProxyClient() src/mp/proxy.cpp:339 (mptest+0x223fbd) #5 std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> >::~pair() /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/stl_iterator.h:3013 (mptest+0x1cad8a) #6 void std::destroy_at<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > >(std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> >*) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/stl_construct.h:88 (mptest+0x1cad8a) #7 void std::allocator_traits<std::allocator<std::_Rb_tree_node<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > > > >::destroy<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > >(std::allocator<std::_Rb_tree_node<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > > >&, std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> >*) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/alloc_traits.h:599 (mptest+0x1cad8a) #8 std::_Rb_tree<mp::Connection*, std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> >, std::_Select1st<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > >, std::less<mp::Connection*>, std::allocator<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > > >::_M_destroy_node(std::_Rb_tree_node<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > >*) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/stl_tree.h:621 (mptest+0x1cad8a) #9 std::_Rb_tree<mp::Connection*, std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> >, std::_Select1st<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > >, std::less<mp::Connection*>, std::allocator<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > > >::_M_drop_node(std::_Rb_tree_node<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > >*) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/stl_tree.h:629 (mptest+0x1cad8a) #10 std::_Rb_tree<mp::Connection*, std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> >, std::_Select1st<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > >, std::less<mp::Connection*>, std::allocator<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > > >::_M_erase(std::_Rb_tree_node<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > >*) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/stl_tree.h:1934 (mptest+0x1cad8a) #11 std::_Rb_tree<mp::Connection*, std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> >, std::_Select1st<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > >, std::less<mp::Connection*>, std::allocator<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > > >::_M_erase_aux(std::_Rb_tree_const_iterator<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > >, std::_Rb_tree_const_iterator<std::pair<mp::Connection* const, mp::ProxyClient<mp::Thread> > >) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/stl_tree.h:1251 (mptest+0x1cace1) #12 _ZZZN2mp9PassFieldINS_8AccessorINS_10foo_fields7ContextELi17EEENS_19ServerInvokeContextINS_11ProxyServerINS_4test8messages12FooInterfaceEEEN5capnp11CallContextINS9_17CallFnAsyncParamsENS9_18CallFnAsyncResultsEEEEENS_10ServerCallEJNS_8TypeListIJEEEEEENSt9enable_ifIXsr3std7is_sameIDTclsrT_3getcldtdtfL1p1_12call_context9getParamsEEENS_7Context6ReaderEEE5valueEN2kj7PromiseINT0_11CallContextEEEE4typeENS_8PriorityILi1EEESJ_RSR_RKT1_DpOT2_ENUlvE_clEvENKUlvE0_clEv /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/stl_tree.h:2519 (mptest+0x1f9b7e) #13 _ZN2kj1_8DeferredIZZN2mp9PassFieldINS2_8AccessorINS2_10foo_fields7ContextELi17EEENS2_19ServerInvokeContextINS2_11ProxyServerINS2_4test8messages12FooInterfaceEEEN5capnp11CallContextINSC_17CallFnAsyncParamsENSC_18CallFnAsyncResultsEEEEENS2_10ServerCallEJNS2_8TypeListIJEEEEEENSt9enable_ifIXsr3std7is_sameIDTclsrT_3getcldtdtfL1p1_12call_context9getParamsEEENS2_7Context6ReaderEEE5valueENS_7PromiseINT0_11CallContextEEEE4typeENS2_8PriorityILi1EEESM_RST_RKT1_DpOT2_ENUlvE_clEvEUlvE0_E3runEv /nix/store/46kiq9naswgbqfc14kc9nxcbgd0rv0m2-capnproto-1.1.0/include/kj/common.h:2007 (mptest+0x1f915d) #14 _ZN2kj1_8DeferredIZZN2mp9PassFieldINS2_8AccessorINS2_10foo_fields7ContextELi17EEENS2_19ServerInvokeContextINS2_11ProxyServerINS2_4test8messages12FooInterfaceEEEN5capnp11CallContextINSC_17CallFnAsyncParamsENSC_18CallFnAsyncResultsEEEEENS2_10ServerCallEJNS2_8TypeListIJEEEEEENSt9enable_ifIXsr3std7is_sameIDTclsrT_3getcldtdtfL1p1_12call_context9getParamsEEENS2_7Context6ReaderEEE5valueENS_7PromiseINT0_11CallContextEEEE4typeENS2_8PriorityILi1EEESM_RST_RKT1_DpOT2_ENUlvE_clEvEUlvE0_ED2Ev /nix/store/46kiq9naswgbqfc14kc9nxcbgd0rv0m2-capnproto-1.1.0/include/kj/common.h:1996 (mptest+0x1f915d) #15 _ZZN2mp9PassFieldINS_8AccessorINS_10foo_fields7ContextELi17EEENS_19ServerInvokeContextINS_11ProxyServerINS_4test8messages12FooInterfaceEEEN5capnp11CallContextINS9_17CallFnAsyncParamsENS9_18CallFnAsyncResultsEEEEENS_10ServerCallEJNS_8TypeListIJEEEEEENSt9enable_ifIXsr3std7is_sameIDTclsrT_3getcldtdtfL0p1_12call_context9getParamsEEENS_7Context6ReaderEEE5valueEN2kj7PromiseINT0_11CallContextEEEE4typeENS_8PriorityILi1EEESJ_RSR_RKT1_DpOT2_ENUlvE_clEv include/mp/type-context.h:122 (mptest+0x1f915d) #16 _ZN2kj8FunctionIFvvEE4ImplIZN2mp9PassFieldINS4_8AccessorINS4_10foo_fields7ContextELi17EEENS4_19ServerInvokeContextINS4_11ProxyServerINS4_4test8messages12FooInterfaceEEEN5capnp11CallContextINSE_17CallFnAsyncParamsENSE_18CallFnAsyncResultsEEEEENS4_10ServerCallEJNS4_8TypeListIJEEEEEENSt9enable_ifIXsr3std7is_sameIDTclsrT_3getcldtdtfL0p1_12call_context9getParamsEEENS4_7Context6ReaderEEE5valueENS_7PromiseINT0_11CallContextEEEE4typeENS4_8PriorityILi1EEESO_RSV_RKT1_DpOT2_EUlvE_EclEv /nix/store/46kiq9naswgbqfc14kc9nxcbgd0rv0m2-capnproto-1.1.0/include/kj/function.h:142 (mptest+0x1f8e89) #17 kj::Function<void ()>::operator()() /nix/store/46kiq9naswgbqfc14kc9nxcbgd0rv0m2-capnproto-1.1.0/include/kj/function.h:119 (mptest+0x1547f6) #18 void mp::Unlock<std::unique_lock<std::mutex>, kj::Function<void ()>&>(std::unique_lock<std::mutex>&, kj::Function<void ()>&) include/mp/util.h:198 (mptest+0x1547f6) #19 std::thread::_State_impl<std::thread::_Invoker<std::tuple<mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0> > >::_M_run() include/mp/proxy-io.h:294 (mptest+0x2266ca) #20 execute_native_thread_routine ??:? (libstdc++.so.6+0xed063) Location is heap block of size 1024 at 0x726000012c00 allocated by thread T10: #0 operator new(unsigned long) ??:? (mptest+0x127c3c) #1 mp::test::TestSetup::TestSetup(bool)::{lambda()#1}::operator()() const /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/unique_ptr.h:1077 (mptest+0x130879) #2 void std::__invoke_impl<void, mp::test::TestSetup::TestSetup(bool)::{lambda()#1}>(std::__invoke_other, mp::test::TestSetup::TestSetup(bool)::{lambda()#1}&&) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/invoke.h:61 (mptest+0x130789) #3 std::__invoke_result<mp::test::TestSetup::TestSetup(bool)::{lambda()#1}>::type std::__invoke<mp::test::TestSetup::TestSetup(bool)::{lambda()#1}>(mp::test::TestSetup::TestSetup(bool)::{lambda()#1}&&) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/invoke.h:96 (mptest+0x130789) #4 void std::thread::_Invoker<std::tuple<mp::test::TestSetup::TestSetup(bool)::{lambda()#1}> >::_M_invoke<0ul>(std::_Index_tuple<0ul>) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/std_thread.h:301 (mptest+0x130789) #5 std::thread::_Invoker<std::tuple<mp::test::TestSetup::TestSetup(bool)::{lambda()#1}> >::operator()() /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/std_thread.h:308 (mptest+0x130789) #6 std::thread::_State_impl<std::thread::_Invoker<std::tuple<mp::test::TestSetup::TestSetup(bool)::{lambda()#1}> > >::_M_run() /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/std_thread.h:253 (mptest+0x130789) #7 execute_native_thread_routine ??:? (libstdc++.so.6+0xed063) Mutex M0 (0x721c00003790) created at: #0 pthread_mutex_lock ??:? (mptest+0x9aa5c) #1 __gthread_mutex_lock(pthread_mutex_t*) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/x86_64-unknown-linux-gnu/bits/gthr-default.h:762 (mptest+0x2265e2) #2 std::mutex::lock() /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/std_mutex.h:113 (mptest+0x2265e2) #3 std::unique_lock<std::mutex>::lock() /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/unique_lock.h:147 (mptest+0x2265e2) #4 std::unique_lock<std::mutex>::unique_lock(std::mutex&) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/unique_lock.h:73 (mptest+0x2265e2) #5 mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0::operator()() const src/mp/proxy.cpp:399 (mptest+0x2265e2) #6 void std::__invoke_impl<void, mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0>(std::__invoke_other, mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0&&) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/invoke.h:61 (mptest+0x2265e2) #7 std::__invoke_result<mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0>::type std::__invoke<mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0>(mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0&&) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/invoke.h:96 (mptest+0x2265e2) #8 void std::thread::_Invoker<std::tuple<mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0> >::_M_invoke<0ul>(std::_Index_tuple<0ul>) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/std_thread.h:301 (mptest+0x2265e2) #9 std::thread::_Invoker<std::tuple<mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0> >::operator()() /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/std_thread.h:308 (mptest+0x2265e2) #10 std::thread::_State_impl<std::thread::_Invoker<std::tuple<mp::ProxyServer<mp::ThreadMap>::makeThread(capnp::CallContext<mp::ThreadMap::MakeThreadParams, mp::ThreadMap::MakeThreadResults>)::$_0> > >::_M_run() /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/std_thread.h:253 (mptest+0x2265e2) #11 execute_native_thread_routine ??:? (libstdc++.so.6+0xed063) Thread T10 (tid=1971515, running) created by main thread at: #0 pthread_create ??:? (mptest+0x9a2a5) #1 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) ??:? (libstdc++.so.6+0xed138) #2 mp::test::TestCase249::run() test/mp/test/test.cpp:269 (mptest+0x12b5bd) #3 kj::Maybe<kj::Exception> kj::runCatchingExceptions<kj::TestRunner::run()::{lambda()#1}>(kj::TestRunner::run()::{lambda()#1}&&) ??:? (libkj-test.so.1.1.0+0x7290) (BuildId: 50ddf81234cd06daf5f2d3f11713ed193ade4eb7) Thread T11 (tid=1971516, running) created by thread T10 at: #0 pthread_create ??:? (mptest+0x9a2a5) #1 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) ??:? (libstdc++.so.6+0xed138) #2 mp::ThreadMap::Server::dispatchCall(unsigned long, unsigned short, capnp::CallContext<capnp::AnyPointer, capnp::AnyPointer>) build-tsan/include/mp/proxy.capnp.c++:602 (mptest+0x220bb2) #3 virtual thunk to mp::ThreadMap::Server::dispatchCall(unsigned long, unsigned short, capnp::CallContext<capnp::AnyPointer, capnp::AnyPointer>) build-tsan/include/mp/proxy.capnp.c++:? (mptest+0x220bb2) #4 capnp::LocalClient::callInternal(unsigned long, unsigned short, capnp::CallContextHook&) ??:? (libcapnp-rpc.so.1.1.0+0x7551d) (BuildId: ec28fb7ea510d2e28e99522abd0ce44adde7cf44) #5 mp::EventLoop::loop() src/mp/proxy.cpp:229 (mptest+0x222f60) #6 mp::test::TestSetup::TestSetup(bool)::{lambda()#1}::operator()() const test/mp/test/test.cpp:90 (mptest+0x130d23) #7 void std::__invoke_impl<void, mp::test::TestSetup::TestSetup(bool)::{lambda()#1}>(std::__invoke_other, mp::test::TestSetup::TestSetup(bool)::{lambda()#1}&&) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/invoke.h:61 (mptest+0x130789) #8 std::__invoke_result<mp::test::TestSetup::TestSetup(bool)::{lambda()#1}>::type std::__invoke<mp::test::TestSetup::TestSetup(bool)::{lambda()#1}>(mp::test::TestSetup::TestSetup(bool)::{lambda()#1}&&) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/invoke.h:96 (mptest+0x130789) #9 void std::thread::_Invoker<std::tuple<mp::test::TestSetup::TestSetup(bool)::{lambda()#1}> >::_M_invoke<0ul>(std::_Index_tuple<0ul>) /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/std_thread.h:301 (mptest+0x130789) #10 std::thread::_Invoker<std::tuple<mp::test::TestSetup::TestSetup(bool)::{lambda()#1}> >::operator()() /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/std_thread.h:308 (mptest+0x130789) #11 std::thread::_State_impl<std::thread::_Invoker<std::tuple<mp::test::TestSetup::TestSetup(bool)::{lambda()#1}> > >::_M_run() /nix/store/9ds850ifd4jwcccpp3v14818kk74ldf2-gcc-14.2.1.20250322/include/c++/14.2.1.20250322/bits/std_thread.h:253 (mptest+0x130789) #12 execute_native_thread_routine ??:? (libstdc++.so.6+0xed063) SUMMARY: ThreadSanitizer: data race src/mp/proxy.cpp:61 in mp::EventLoopRef::reset(bool) ================== [ PASS ] test.cpp:249: Calling IPC method, disconnecting and blocking during the call (175629 μs) 5 test(s) passed ThreadSanitizer: reported 1 warnings
1 parent 456c8e5 commit cbe5d32

File tree

3 files changed

+27
-17
lines changed

3 files changed

+27
-17
lines changed

include/mp/proxy-io.h

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,8 @@ class Connection
332332
//! Register synchronous cleanup function to run on event loop thread (with
333333
//! access to capnp thread local variables) when disconnect() is called.
334334
//! any new i/o.
335-
CleanupIt addSyncCleanup(std::function<void()> fn);
336-
void removeSyncCleanup(CleanupIt it);
335+
CleanupIt addSyncCleanup(std::function<void()> fn, const Lock& lock);
336+
void removeSyncCleanup(CleanupIt it, const Lock& lock);
337337

338338
//! Add disconnect handler.
339339
template <typename F>
@@ -393,13 +393,17 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
393393

394394
{
395395
// Handler for the connection getting destroyed before this client object.
396-
auto cleanup_it = m_context.connection->addSyncCleanup([this]() {
396+
auto cleanup_it = [&]{
397+
Lock lock{m_context.loop->m_mutex};
398+
return m_context.connection->addSyncCleanup([this]() {
399+
assert (std::this_thread::get_id() == m_context.loop->m_thread_id);
397400
// Release client capability by move-assigning to temporary.
398401
{
399402
typename Interface::Client(std::move(m_client));
400403
}
404+
Lock lock{m_context.loop->m_mutex};
401405
m_context.connection = nullptr;
402-
});
406+
}, lock);}();
403407

404408
// Two shutdown sequences are supported:
405409
//
@@ -413,10 +417,15 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
413417
// second case is handled by the cleanup function, which sets m_context.connection to
414418
// null so nothing happens here.
415419
m_context.cleanup_fns.emplace_front([this, destroy_connection, cleanup_it]{
416-
if (m_context.connection) {
417420
// Remove cleanup callback so it doesn't run and try to access
418421
// this object after it's already destroyed.
419-
m_context.connection->removeSyncCleanup(cleanup_it);
422+
// Use sync() to run this on the event loop thread because otherwise
423+
// it's possible an onDisconnect handler could fire and delete the
424+
// connection object before or during the removeSyncCleanup call.
425+
m_context.loop->sync([&]() {
426+
Lock lock{m_context.loop->m_mutex};
427+
if (m_context.connection) m_context.connection->removeSyncCleanup(cleanup_it, lock);
428+
});
420429

421430
// If the capnp interface defines a destroy method, call it to destroy
422431
// the remote object, waiting for it to be deleted server side. If the
@@ -430,12 +439,11 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
430439
{
431440
typename Interface::Client(std::move(m_client));
432441
}
433-
if (destroy_connection) {
442+
if (m_context.connection && destroy_connection) {
434443
delete m_context.connection;
435444
m_context.connection = nullptr;
436445
}
437446
});
438-
}
439447
});
440448
Sub::construct(*this);
441449
}

include/mp/util.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ class MP_SCOPED_CAPABILITY Lock {
173173
~Lock() MP_RELEASE() = default;
174174
void unlock() MP_RELEASE() { m_lock.unlock(); }
175175
void lock() MP_ACQUIRE() { m_lock.lock(); }
176-
void assert_locked(Mutex& mutex) MP_ASSERT_CAPABILITY() MP_ASSERT_CAPABILITY(mutex)
176+
void assert_locked(Mutex& mutex) const MP_ASSERT_CAPABILITY() MP_ASSERT_CAPABILITY(mutex)
177177
{
178178
assert(m_lock.mutex() == &mutex.m_mutex);
179179
assert(m_lock);

src/mp/proxy.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(
5858
void EventLoopRef::reset(bool relock) MP_NO_TSA
5959
{
6060
if (auto* loop{m_loop}) {
61-
m_loop = nullptr;
6261
auto loop_lock{PtrOrValue{m_lock, loop->m_mutex}};
62+
m_loop = nullptr;
6363
loop_lock->assert_locked(loop->m_mutex);
6464
assert(loop->m_num_clients > 0);
6565
loop->m_num_clients -= 1;
@@ -138,9 +138,9 @@ Connection::~Connection()
138138
}
139139
}
140140

141-
CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
141+
CleanupIt Connection::addSyncCleanup(std::function<void()> fn, const Lock& lock)
142142
{
143-
const Lock lock(m_loop->m_mutex);
143+
lock.assert_locked(m_loop->m_mutex);
144144
// Add cleanup callbacks to the front of list, so sync cleanup functions run
145145
// in LIFO order. This is a good approach because sync cleanup functions are
146146
// added as client objects are created, and it is natural to clean up
@@ -152,9 +152,9 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
152152
return m_sync_cleanup_fns.emplace(m_sync_cleanup_fns.begin(), std::move(fn));
153153
}
154154

155-
void Connection::removeSyncCleanup(CleanupIt it)
155+
void Connection::removeSyncCleanup(CleanupIt it, const Lock& lock)
156156
{
157-
const Lock lock(m_loop->m_mutex);
157+
lock.assert_locked(m_loop->m_mutex);
158158
m_sync_cleanup_fns.erase(it);
159159
}
160160

@@ -320,9 +320,9 @@ std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex,
320320
// Connection is being destroyed before thread client is, so reset
321321
// thread client m_cleanup_it member so thread client destructor does not
322322
// try unregister this callback after connection is destroyed.
323-
thread->second.m_cleanup_it.reset();
324323
// Remove connection pointer about to be destroyed from the map
325324
const std::unique_lock<std::mutex> lock(mutex);
325+
thread->second.m_cleanup_it.reset();
326326
threads.erase(thread);
327327
});
328328
return {thread, true};
@@ -333,16 +333,18 @@ ProxyClient<Thread>::~ProxyClient()
333333
// If thread is being destroyed before connection is destroyed, remove the
334334
// cleanup callback that was registered to handle the connection being
335335
// destroyed before the thread being destroyed.
336+
Lock lock{m_context.loop->m_mutex};
336337
if (m_cleanup_it) {
337-
m_context.connection->removeSyncCleanup(*m_cleanup_it);
338+
m_context.connection->removeSyncCleanup(*m_cleanup_it, lock);
338339
}
339340
}
340341

341342
void ProxyClient<Thread>::setCleanup(const std::function<void()>& fn)
342343
{
343344
assert(fn);
345+
Lock lock{m_context.loop->m_mutex};
344346
assert(!m_cleanup_it);
345-
m_cleanup_it = m_context.connection->addSyncCleanup(fn);
347+
m_cleanup_it = m_context.connection->addSyncCleanup(fn, lock);
346348
}
347349

348350
ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)

0 commit comments

Comments
 (0)