@@ -62,36 +62,33 @@ void CustomBuildField(TypeList<>,
6262{
6363 auto & connection = invoke_context.connection ;
6464 auto & thread_context = invoke_context.thread_context ;
65- auto & request_threads = thread_context.request_threads ;
66- auto & callback_threads = thread_context.callback_threads ;
67-
68- auto callback_thread = callback_threads.find (&connection);
69- if (callback_thread == callback_threads.end ()) {
70- callback_thread =
71- callback_threads
72- .emplace (std::piecewise_construct, std::forward_as_tuple (&connection),
73- std::forward_as_tuple (
74- connection.m_threads .add (kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})),
75- &connection, /* destroy_connection= */ false ))
76- .first ;
77- }
7865
79- auto request_thread = request_threads.find (&connection);
80- if (request_thread == request_threads.end ()) {
81- // This code will only run if IPC client call is being made for the
82- // first time on a new thread. After the first call, subsequent calls
66+ // Create local Thread::Server object corresponding to the current thread
67+ // and pass a Thread::Client reference to it in the Context.callbackThread
68+ // field so the function being called can make callbacks to this thread.
69+ // Also store the Thread::Client reference in the callback_threads map so
70+ // future calls over this connection can reuse it.
71+ auto [callback_thread, _]{SetThread (
72+ thread_context.callback_threads , thread_context.waiter ->m_mutex , &connection,
73+ [&] { return connection.m_threads .add (kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
74+
75+ // Call remote ThreadMap.makeThread function so server will create a
76+ // dedicated worker thread to run function calls from this thread. Store the
77+ // Thread::Client reference it returns in the request_threads map.
78+ auto make_request_thread{[&]{
79+ // This code will only run if an IPC client call is being made for the
80+ // first time on this thread. After the first call, subsequent calls
8381 // will use the existing request thread. This code will also never run at
8482 // all if the current thread is a request thread created for a different
8583 // IPC client, because in that case PassField code (below) will have set
8684 // request_thread to point to the calling thread.
8785 auto request = connection.m_thread_map .makeThreadRequest ();
8886 request.setName (thread_context.thread_name );
89- request_thread =
90- request_threads
91- .emplace (std::piecewise_construct, std::forward_as_tuple (&connection),
92- std::forward_as_tuple (request.send ().getResult (), &connection, /* destroy_connection= */ false ))
93- .first ; // Nonblocking due to capnp request pipelining.
94- }
87+ return request.send ().getResult (); // Nonblocking due to capnp request pipelining.
88+ }};
89+ auto [request_thread, _1]{SetThread (
90+ thread_context.request_threads , thread_context.waiter ->m_mutex ,
91+ &connection, make_request_thread)};
9592
9693 auto context = output.init ();
9794 context.setThread (request_thread->second .m_client );
@@ -143,24 +140,23 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
143140 // call. In this case, the callbackThread value should point
144141 // to the same thread already in the map, so there is no
145142 // need to update the map.
146- auto & request_threads = g_thread_context.request_threads ;
147- auto request_thread = request_threads.find (server.m_context .connection );
148- if (request_thread == request_threads.end ()) {
149- request_thread =
150- g_thread_context.request_threads
151- .emplace (std::piecewise_construct, std::forward_as_tuple (server.m_context .connection ),
152- std::forward_as_tuple (context_arg.getCallbackThread (), server.m_context .connection ,
153- /* destroy_connection= */ false ))
154- .first ;
155- } else {
156- // The requests_threads map already has an entry for
157- // this connection, so this must be a recursive call.
158- // Avoid modifying the map in this case by resetting the
159- // request_thread iterator, so the KJ_DEFER statement
160- // below doesn't do anything.
161- request_thread = request_threads.end ();
162- }
163- KJ_DEFER (if (request_thread != request_threads.end ()) request_threads.erase (request_thread));
143+ auto & thread_context = g_thread_context;
144+ auto & request_threads = thread_context.request_threads ;
145+ auto [request_thread, inserted]{SetThread (
146+ request_threads, thread_context.waiter ->m_mutex ,
147+ server.m_context .connection ,
148+ [&] { return context_arg.getCallbackThread (); })};
149+
150+ // If an entry was inserted into the requests_threads map,
151+ // remove it after calling fn.invoke. If an entry was not
152+ // inserted, one already existed, meaning this must be a
153+ // recursive call (IPC call calling back to the caller which
154+ // makes another IPC call), so avoid modifying the map.
155+ auto erase_thread{inserted ? request_thread : request_threads.end ()};
156+ KJ_DEFER (if (erase_thread != request_threads.end ()) {
157+ std::unique_lock<std::mutex> lock (thread_context.waiter ->m_mutex );
158+ request_threads.erase (erase_thread);
159+ });
164160 fn.invoke (server_context, args...);
165161 }
166162 KJ_IF_MAYBE (exception, kj::runCatchingExceptions ([&]() {
0 commit comments