@@ -81,6 +81,11 @@ ProxyContext::ProxyContext(Connection* connection) : connection(connection), loo
8181
8282Connection::~Connection ()
8383{
84+ // Connection destructor is always called on the event loop thread. If this
85+ // is a local disconnect, it will trigger I/O, so this needs to run on the
86+ // event loop thread, and if there was a remote disconnect this, is called
87+ // by an onDisconnect callback directly from the event loop thrread.
88+ assert (std::this_thread::get_id () == m_loop->m_thread_id );
8489 // Shut down RPC system first, since this will garbage collect any
8590 // ProxyServer objects that were not freed before the connection was closed.
8691 // Typically all ProxyServer objects associated with this connection will be
@@ -156,6 +161,9 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
156161
157162void Connection::removeSyncCleanup (CleanupIt it)
158163{
164+ // Require cleanup functions to be removed on the event loop thread to avoid
165+ // needing to deal with them being removed in the middle of a disconnect.
166+ assert (std::this_thread::get_id () == m_loop->m_thread_id );
159167 const Lock lock (m_loop->m_mutex );
160168 m_sync_cleanup_fns.erase (it);
161169}
@@ -307,27 +315,32 @@ bool EventLoop::done() const
307315
308316std::tuple<ConnThread, bool > SetThread (ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
309317{
310- const std::unique_lock<std::mutex> lock (mutex);
311- auto thread = threads.find (connection);
312- if (thread != threads.end ()) return {thread, false };
313- thread = threads.emplace (
314- std::piecewise_construct, std::forward_as_tuple (connection),
315- std::forward_as_tuple (make_thread (), connection, /* destroy_connection= */ false )).first ;
316- thread->second .setDisconnectCallback ([&threads, &mutex, thread] {
317- // Note: it is safe to use the `thread` iterator in this cleanup
318- // function, because the iterator would only be invalid if the map entry
319- // was removed, and if the map entry is removed the ProxyClient<Thread>
320- // destructor unregisters the cleanup.
321-
322- // Connection is being destroyed before thread client is, so reset
323- // thread client m_disconnect_cb member so thread client destructor does not
324- // try to unregister this callback after connection is destroyed.
325- // Remove connection pointer about to be destroyed from the map
318+ assert (std::this_thread::get_id () == connection->m_loop ->m_thread_id );
319+ ConnThread thread;
320+ bool inserted;
321+ {
326322 const std::unique_lock<std::mutex> lock (mutex);
327- thread->second .m_disconnect_cb .reset ();
328- threads.erase (thread);
329- });
330- return {thread, true };
323+ std::tie (thread, inserted) = threads.try_emplace (connection);
324+ }
325+ if (inserted) {
326+ thread->second .emplace (make_thread (), connection, /* destroy_connection= */ false );
327+ thread->second ->m_disconnect_cb = connection->addSyncCleanup ([&threads, &mutex, thread] {
328+ // Note: it is safe to use the `thread` iterator in this cleanup
329+ // function, because the iterator would only be invalid if the map entry
330+ // was removed, and if the map entry is removed the ProxyClient<Thread>
331+ // destructor unregisters the cleanup.
332+
333+ // Connection is being destroyed before thread client is, so reset
334+ // thread client m_disconnect_cb member so thread client destructor does not
335+ // try to unregister this callback after connection is destroyed.
336+ thread->second ->m_disconnect_cb .reset ();
337+
338+ // Remove connection pointer about to be destroyed from the map
339+ const std::unique_lock<std::mutex> lock (mutex);
340+ threads.erase (thread);
341+ });
342+ }
343+ return {thread, inserted};
331344}
332345
333346ProxyClient<Thread>::~ProxyClient ()
@@ -336,17 +349,18 @@ ProxyClient<Thread>::~ProxyClient()
336349 // cleanup callback that was registered to handle the connection being
337350 // destroyed before the thread being destroyed.
338351 if (m_disconnect_cb) {
339- m_context.connection ->removeSyncCleanup (*m_disconnect_cb);
352+ // Remove disconnect callback on the event loop thread with
353+ // loop->sync(), so if the connection is broken there is not a race
354+ // between this thread trying to remove the callback and the disconnect
355+ // handler attempting to call it.
356+ m_context.loop ->sync ([&]() {
357+ if (m_disconnect_cb) {
358+ m_context.connection ->removeSyncCleanup (*m_disconnect_cb);
359+ }
360+ });
340361 }
341362}
342363
343- void ProxyClient<Thread>::setDisconnectCallback(const std::function<void ()>& fn)
344- {
345- assert (fn);
346- assert (!m_disconnect_cb);
347- m_disconnect_cb = m_context.connection ->addSyncCleanup (fn);
348- }
349-
350364ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
351365 : m_thread_context(thread_context), m_thread(std::move(thread))
352366{
0 commit comments