@@ -62,6 +62,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
6262 kj::Promise<typename ServerContext::CallContext>>::type
6363{
6464 auto & server = server_context.proxy_server ;
65+ EventLoop& loop = *server.m_context .loop ;
6566 int req = server_context.req ;
6667 // Keep a reference to the ProxyServer instance by assigning it to the self
6768 // variable. ProxyServer instances are reference-counted and if the client
@@ -70,132 +71,129 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
7071 // needs to be destroyed on the event loop thread so it is freed in a sync()
7172 // call below.
7273 auto self = server.thisCap ();
73- auto invoke = [self = kj::mv (self), call_context = kj::mv (server_context.call_context ), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
74- MP_LOG (*server.m_context .loop , Log::Debug) << " IPC server executing request #" << req;
75- EventLoop& loop = *server.m_context .loop ;
76- if (loop.testing_hook_async_request_start ) loop.testing_hook_async_request_start ();
77- KJ_DEFER (if (loop.testing_hook_async_request_done ) loop.testing_hook_async_request_done ());
78- ServerContext server_context{server, call_context, req};
79- {
80- // Before invoking the function, store a reference to the
81- // callbackThread provided by the client in the
82- // thread_local.request_threads map. This way, if this
83- // server thread needs to execute any RPCs that call back to
84- // the client, they will happen on the same client thread
85- // that is waiting for this function, just like what would
86- // happen if this were a normal function call made on the
87- // local stack.
88- //
89- // If the request_threads map already has an entry for this
90- // connection, it will be left unchanged, and it indicates
91- // that the current thread is an RPC client thread which is
92- // in the middle of an RPC call, and the current RPC call is
93- // a nested call from the remote thread handling that RPC
94- // call. In this case, the callbackThread value should point
95- // to the same thread already in the map, so there is no
96- // need to update the map.
97- auto & thread_context = g_thread_context;
98- auto & request_threads = thread_context.request_threads ;
99- ConnThread request_thread;
100- bool inserted{false };
101- Mutex cancel_mutex;
102- Lock cancel_lock{cancel_mutex};
103- server_context.cancel_lock = &cancel_lock;
104- server.m_context .loop ->sync ([&] {
105- // Detect request being canceled before it executes.
106- if (cancel_monitor.m_canceled ) {
107- server_context.request_canceled = true ;
108- return ;
109- }
110- // Detect request being canceled while it executes.
111- assert (!cancel_monitor.m_on_cancel );
112- cancel_monitor.m_on_cancel = [&server, &server_context, &cancel_mutex, req]() {
113- MP_LOG (*server.m_context .loop , Log::Info) << " IPC server request #" << req << " canceled while executing." ;
114- // Lock cancel_mutex here to block the event loop
115- // thread and prevent it from deleting the request's
116- // params and response structs while the execution
117- // thread is accessing them. Because this lock is
118- // released before the event loop thread does delete
119- // the structs, the mutex does not provide any
120- // protection from the event loop deleting the
121- // structs _before_ the execution thread acquires
122- // it. So in addition to locking the mutex, the
123- // execution thread always checks request_canceled
124- // as well before accessing the structs.
125- Lock cancel_lock{cancel_mutex};
126- server_context.request_canceled = true ;
127- };
128- // Update requests_threads map if not canceled. We know
129- // the request is not canceled currently because
130- // cancel_monitor.m_canceled was checked above and this
131- // code is running on the event loop thread.
132- std::tie (request_thread, inserted) = SetThread (
133- GuardedRef{thread_context.waiter ->m_mutex , request_threads}, server.m_context .connection ,
134- [&] { return Accessor::get (call_context.getParams ()).getCallbackThread (); });
135- });
74+ auto invoke = [self = kj::mv (self), call_context = kj::mv (server_context.call_context ), &server, &loop, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
75+ MP_LOG (loop, Log::Debug) << " IPC server executing request #" << req;
76+ if (loop.testing_hook_async_request_start ) loop.testing_hook_async_request_start ();
77+ KJ_DEFER (if (loop.testing_hook_async_request_done ) loop.testing_hook_async_request_done ());
78+ ServerContext server_context{server, call_context, req};
79+ // Before invoking the function, store a reference to the
80+ // callbackThread provided by the client in the
81+ // thread_local.request_threads map. This way, if this
82+ // server thread needs to execute any RPCs that call back to
83+ // the client, they will happen on the same client thread
84+ // that is waiting for this function, just like what would
85+ // happen if this were a normal function call made on the
86+ // local stack.
87+ //
88+ // If the request_threads map already has an entry for this
89+ // connection, it will be left unchanged, and it indicates
90+ // that the current thread is an RPC client thread which is
91+ // in the middle of an RPC call, and the current RPC call is
92+ // a nested call from the remote thread handling that RPC
93+ // call. In this case, the callbackThread value should point
94+ // to the same thread already in the map, so there is no
95+ // need to update the map.
96+ auto & thread_context = g_thread_context;
97+ auto & request_threads = thread_context.request_threads ;
98+ ConnThread request_thread;
99+ bool inserted{false };
100+ Mutex cancel_mutex;
101+ Lock cancel_lock{cancel_mutex};
102+ server_context.cancel_lock = &cancel_lock;
103+ loop.sync ([&] {
104+ // Detect request being canceled before it executes.
105+ if (cancel_monitor.m_canceled ) {
106+ server_context.request_canceled = true ;
107+ return ;
108+ }
109+ // Detect request being canceled while it executes.
110+ assert (!cancel_monitor.m_on_cancel );
111+ cancel_monitor.m_on_cancel = [&loop, &server_context, &cancel_mutex, req]() {
112+ MP_LOG (loop, Log::Info) << " IPC server request #" << req << " canceled while executing." ;
113+ // Lock cancel_mutex here to block the event loop
114+ // thread and prevent it from deleting the request's
115+ // params and response structs while the execution
116+ // thread is accessing them. Because this lock is
117+ // released before the event loop thread does delete
118+ // the structs, the mutex does not provide any
119+ // protection from the event loop deleting the
120+ // structs _before_ the execution thread acquires
121+ // it. So in addition to locking the mutex, the
122+ // execution thread always checks request_canceled
123+ // as well before accessing the structs.
124+ Lock cancel_lock{cancel_mutex};
125+ server_context.request_canceled = true ;
126+ };
127+ // Update requests_threads map if not canceled. We know
128+ // the request is not canceled currently because
129+ // cancel_monitor.m_canceled was checked above and this
130+ // code is running on the event loop thread.
131+ std::tie (request_thread, inserted) = SetThread (
132+ GuardedRef{thread_context.waiter ->m_mutex , request_threads}, server.m_context .connection ,
133+ [&] { return Accessor::get (call_context.getParams ()).getCallbackThread (); });
134+ });
136135
137- // If an entry was inserted into the request_threads map,
138- // remove it after calling fn.invoke. If an entry was not
139- // inserted, one already existed, meaning this must be a
140- // recursive call (IPC call calling back to the caller which
141- // makes another IPC call), so avoid modifying the map.
142- const bool erase_thread{inserted};
143- KJ_DEFER (
144- // Release the cancel lock before calling loop->sync and
145- // waiting for the event loop thread, because if a
146- // cancellation happened, it needs to run the on_cancel
147- // callback above. It's safe to release cancel_lock at
148- // this point because the fn.invoke() call below will be
149- // finished and no longer accessing the params or
150- // results structs.
151- cancel_lock.m_lock .unlock ();
152- // Erase the request_threads entry on the event loop
153- // thread with loop->sync(), so if the connection is
154- // broken there is not a race between this thread and
155- // the disconnect handler trying to destroy the thread
156- // client object.
157- server.m_context .loop ->sync ([&] {
158- // Clear cancellation callback. At this point the
159- // method invocation finished and the result is
160- // either being returned, or discarded if a
161- // cancellation happened. So we do not need to be
162- // notified of cancellations after this point. Also
163- // we do not want to be notified because
164- // cancel_mutex and server_context could be out of
165- // scope when it happens.
166- cancel_monitor.m_on_cancel = nullptr ;
167- auto self_dispose{kj::mv (self)};
168- if (erase_thread) {
169- // Look up the thread again without using existing
170- // iterator since entry may no longer be there after
171- // a disconnect. Destroy node after releasing
172- // Waiter::m_mutex, so the ProxyClient<Thread>
173- // destructor is able to use EventLoop::mutex
174- // without violating lock order.
175- ConnThreads::node_type removed;
176- {
177- Lock lock (thread_context.waiter ->m_mutex );
178- removed = request_threads.extract (server.m_context .connection );
179- }
180- }
181- });
182- );
183- if (server_context.request_canceled ) {
184- MP_LOG (*server.m_context .loop , Log::Info) << " IPC server request #" << req << " canceled before it could be executed" ;
185- } else KJ_IF_MAYBE (exception, kj::runCatchingExceptions ([&]{
186- try {
187- fn.invoke (server_context, args...);
188- } catch (const InterruptException& e) {
189- MP_LOG (*server.m_context .loop , Log::Info) << " IPC server request #" << req << " interrupted (" << e.what () << " )" ;
190- }
191- })) {
192- MP_LOG (*server.m_context .loop , Log::Error) << " IPC server request #" << req << " uncaught exception (" << kj::str (*exception).cStr () << " )" ;
193- throw kj::mv (*exception);
136+ // If an entry was inserted into the request_threads map,
137+ // remove it after calling fn.invoke. If an entry was not
138+ // inserted, one already existed, meaning this must be a
139+ // recursive call (IPC call calling back to the caller which
140+ // makes another IPC call), so avoid modifying the map.
141+ const bool erase_thread{inserted};
142+ KJ_DEFER (
143+ // Release the cancel lock before calling loop->sync and
144+ // waiting for the event loop thread, because if a
145+ // cancellation happened, it needs to run the on_cancel
146+ // callback above. It's safe to release cancel_lock at
147+ // this point because the fn.invoke() call below will be
148+ // finished and no longer accessing the params or
149+ // results structs.
150+ cancel_lock.m_lock .unlock ();
151+ // Erase the request_threads entry on the event loop
152+ // thread with loop->sync(), so if the connection is
153+ // broken there is not a race between this thread and
154+ // the disconnect handler trying to destroy the thread
155+ // client object.
156+ loop.sync ([&] {
157+ // Clear cancellation callback. At this point the
158+ // method invocation finished and the result is
159+ // either being returned, or discarded if a
160+ // cancellation happened. So we do not need to be
161+ // notified of cancellations after this point. Also
162+ // we do not want to be notified because
163+ // cancel_mutex and server_context could be out of
164+ // scope when it happens.
165+ cancel_monitor.m_on_cancel = nullptr ;
166+ auto self_dispose{kj::mv (self)};
167+ if (erase_thread) {
168+ // Look up the thread again without using existing
169+ // iterator since entry may no longer be there after
170+ // a disconnect. Destroy node after releasing
171+ // Waiter::m_mutex, so the ProxyClient<Thread>
172+ // destructor is able to use EventLoop::mutex
173+ // without violating lock order.
174+ ConnThreads::node_type removed;
175+ {
176+ Lock lock (thread_context.waiter ->m_mutex );
177+ removed = request_threads.extract (server.m_context .connection );
194178 }
195- // End of scope: if KJ_DEFER was reached, it runs here
196179 }
197- return call_context;
198- };
180+ });
181+ );
182+ if (server_context.request_canceled ) {
183+ MP_LOG (loop, Log::Info) << " IPC server request #" << req << " canceled before it could be executed" ;
184+ } else KJ_IF_MAYBE (exception, kj::runCatchingExceptions ([&]{
185+ try {
186+ fn.invoke (server_context, args...);
187+ } catch (const InterruptException& e) {
188+ MP_LOG (loop, Log::Info) << " IPC server request #" << req << " interrupted (" << e.what () << " )" ;
189+ }
190+ })) {
191+ MP_LOG (loop, Log::Error) << " IPC server request #" << req << " uncaught exception (" << kj::str (*exception).cStr () << " )" ;
192+ throw kj::mv (*exception);
193+ }
194+ return call_context;
195+ // End of scope: if KJ_DEFER was reached, it runs here
196+ };
199197
200198 // Lookup Thread object specified by the client. The specified thread should
201199 // be a local Thread::Server object, but it needs to be looked up
@@ -204,17 +202,17 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
204202 Context::Reader context_arg = Accessor::get (params);
205203 auto thread_client = context_arg.getThread ();
206204 auto result = server.m_context .connection ->m_threads .getLocalServer (thread_client)
207- .then ([&server , invoke = kj::mv (invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
205+ .then ([&loop , invoke = kj::mv (invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
208206 // Assuming the thread object is found, pass it a pointer to the
209207 // `invoke` lambda above which will invoke the function on that
210208 // thread.
211209 KJ_IF_MAYBE (thread_server, perhaps) {
212210 auto & thread = static_cast <ProxyServer<Thread>&>(*thread_server);
213- MP_LOG (*server. m_context . loop , Log::Debug)
211+ MP_LOG (loop, Log::Debug)
214212 << " IPC server post request #" << req << " {" << thread.m_thread_context .thread_name << " }" ;
215213 return thread.template post <typename ServerContext::CallContext>(std::move (invoke));
216214 } else {
217- MP_LOG (*server. m_context . loop , Log::Error)
215+ MP_LOG (loop, Log::Error)
218216 << " IPC server error request #" << req << " , missing thread to execute request" ;
219217 throw std::runtime_error (" invalid thread handle" );
220218 }
0 commit comments