Skip to content

Commit 396473f

Browse files
committed
refactor: Use loop variable in type-context.h
Replace `server.m_context.loop` references with `loop` in Context PassField implmentation after a `loop` variable was introduced in a recent commit. Also adjust PassField scopes and indentation without changing behavior. This commit is easiest to review ignoring whitespace.
1 parent 26e29ab commit 396473f

File tree

1 file changed

+124
-126
lines changed

1 file changed

+124
-126
lines changed

include/mp/type-context.h

Lines changed: 124 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
6262
kj::Promise<typename ServerContext::CallContext>>::type
6363
{
6464
auto& server = server_context.proxy_server;
65+
EventLoop& loop = *server.m_context.loop;
6566
int req = server_context.req;
6667
// Keep a reference to the ProxyServer instance by assigning it to the self
6768
// variable. ProxyServer instances are reference-counted and if the client
@@ -70,132 +71,129 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
7071
// needs to be destroyed on the event loop thread so it is freed in a sync()
7172
// call below.
7273
auto self = server.thisCap();
73-
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
74-
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req;
75-
EventLoop& loop = *server.m_context.loop;
76-
if (loop.testing_hook_async_request_start) loop.testing_hook_async_request_start();
77-
KJ_DEFER(if (loop.testing_hook_async_request_done) loop.testing_hook_async_request_done());
78-
ServerContext server_context{server, call_context, req};
79-
{
80-
// Before invoking the function, store a reference to the
81-
// callbackThread provided by the client in the
82-
// thread_local.request_threads map. This way, if this
83-
// server thread needs to execute any RPCs that call back to
84-
// the client, they will happen on the same client thread
85-
// that is waiting for this function, just like what would
86-
// happen if this were a normal function call made on the
87-
// local stack.
88-
//
89-
// If the request_threads map already has an entry for this
90-
// connection, it will be left unchanged, and it indicates
91-
// that the current thread is an RPC client thread which is
92-
// in the middle of an RPC call, and the current RPC call is
93-
// a nested call from the remote thread handling that RPC
94-
// call. In this case, the callbackThread value should point
95-
// to the same thread already in the map, so there is no
96-
// need to update the map.
97-
auto& thread_context = g_thread_context;
98-
auto& request_threads = thread_context.request_threads;
99-
ConnThread request_thread;
100-
bool inserted{false};
101-
Mutex cancel_mutex;
102-
Lock cancel_lock{cancel_mutex};
103-
server_context.cancel_lock = &cancel_lock;
104-
server.m_context.loop->sync([&] {
105-
// Detect request being canceled before it executes.
106-
if (cancel_monitor.m_canceled) {
107-
server_context.request_canceled = true;
108-
return;
109-
}
110-
// Detect request being canceled while it executes.
111-
assert(!cancel_monitor.m_on_cancel);
112-
cancel_monitor.m_on_cancel = [&server, &server_context, &cancel_mutex, req]() {
113-
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled while executing.";
114-
// Lock cancel_mutex here to block the event loop
115-
// thread and prevent it from deleting the request's
116-
// params and response structs while the execution
117-
// thread is accessing them. Because this lock is
118-
// released before the event loop thread does delete
119-
// the structs, the mutex does not provide any
120-
// protection from the event loop deleting the
121-
// structs _before_ the execution thread acquires
122-
// it. So in addition to locking the mutex, the
123-
// execution thread always checks request_canceled
124-
// as well before accessing the structs.
125-
Lock cancel_lock{cancel_mutex};
126-
server_context.request_canceled = true;
127-
};
128-
// Update requests_threads map if not canceled. We know
129-
// the request is not canceled currently because
130-
// cancel_monitor.m_canceled was checked above and this
131-
// code is running on the event loop thread.
132-
std::tie(request_thread, inserted) = SetThread(
133-
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
134-
[&] { return Accessor::get(call_context.getParams()).getCallbackThread(); });
135-
});
74+
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, &loop, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
75+
MP_LOG(loop, Log::Debug) << "IPC server executing request #" << req;
76+
if (loop.testing_hook_async_request_start) loop.testing_hook_async_request_start();
77+
KJ_DEFER(if (loop.testing_hook_async_request_done) loop.testing_hook_async_request_done());
78+
ServerContext server_context{server, call_context, req};
79+
// Before invoking the function, store a reference to the
80+
// callbackThread provided by the client in the
81+
// thread_local.request_threads map. This way, if this
82+
// server thread needs to execute any RPCs that call back to
83+
// the client, they will happen on the same client thread
84+
// that is waiting for this function, just like what would
85+
// happen if this were a normal function call made on the
86+
// local stack.
87+
//
88+
// If the request_threads map already has an entry for this
89+
// connection, it will be left unchanged, and it indicates
90+
// that the current thread is an RPC client thread which is
91+
// in the middle of an RPC call, and the current RPC call is
92+
// a nested call from the remote thread handling that RPC
93+
// call. In this case, the callbackThread value should point
94+
// to the same thread already in the map, so there is no
95+
// need to update the map.
96+
auto& thread_context = g_thread_context;
97+
auto& request_threads = thread_context.request_threads;
98+
ConnThread request_thread;
99+
bool inserted{false};
100+
Mutex cancel_mutex;
101+
Lock cancel_lock{cancel_mutex};
102+
server_context.cancel_lock = &cancel_lock;
103+
loop.sync([&] {
104+
// Detect request being canceled before it executes.
105+
if (cancel_monitor.m_canceled) {
106+
server_context.request_canceled = true;
107+
return;
108+
}
109+
// Detect request being canceled while it executes.
110+
assert(!cancel_monitor.m_on_cancel);
111+
cancel_monitor.m_on_cancel = [&loop, &server_context, &cancel_mutex, req]() {
112+
MP_LOG(loop, Log::Info) << "IPC server request #" << req << " canceled while executing.";
113+
// Lock cancel_mutex here to block the event loop
114+
// thread and prevent it from deleting the request's
115+
// params and response structs while the execution
116+
// thread is accessing them. Because this lock is
117+
// released before the event loop thread does delete
118+
// the structs, the mutex does not provide any
119+
// protection from the event loop deleting the
120+
// structs _before_ the execution thread acquires
121+
// it. So in addition to locking the mutex, the
122+
// execution thread always checks request_canceled
123+
// as well before accessing the structs.
124+
Lock cancel_lock{cancel_mutex};
125+
server_context.request_canceled = true;
126+
};
127+
// Update requests_threads map if not canceled. We know
128+
// the request is not canceled currently because
129+
// cancel_monitor.m_canceled was checked above and this
130+
// code is running on the event loop thread.
131+
std::tie(request_thread, inserted) = SetThread(
132+
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
133+
[&] { return Accessor::get(call_context.getParams()).getCallbackThread(); });
134+
});
136135

137-
// If an entry was inserted into the request_threads map,
138-
// remove it after calling fn.invoke. If an entry was not
139-
// inserted, one already existed, meaning this must be a
140-
// recursive call (IPC call calling back to the caller which
141-
// makes another IPC call), so avoid modifying the map.
142-
const bool erase_thread{inserted};
143-
KJ_DEFER(
144-
// Release the cancel lock before calling loop->sync and
145-
// waiting for the event loop thread, because if a
146-
// cancellation happened, it needs to run the on_cancel
147-
// callback above. It's safe to release cancel_lock at
148-
// this point because the fn.invoke() call below will be
149-
// finished and no longer accessing the params or
150-
// results structs.
151-
cancel_lock.m_lock.unlock();
152-
// Erase the request_threads entry on the event loop
153-
// thread with loop->sync(), so if the connection is
154-
// broken there is not a race between this thread and
155-
// the disconnect handler trying to destroy the thread
156-
// client object.
157-
server.m_context.loop->sync([&] {
158-
// Clear cancellation callback. At this point the
159-
// method invocation finished and the result is
160-
// either being returned, or discarded if a
161-
// cancellation happened. So we do not need to be
162-
// notified of cancellations after this point. Also
163-
// we do not want to be notified because
164-
// cancel_mutex and server_context could be out of
165-
// scope when it happens.
166-
cancel_monitor.m_on_cancel = nullptr;
167-
auto self_dispose{kj::mv(self)};
168-
if (erase_thread) {
169-
// Look up the thread again without using existing
170-
// iterator since entry may no longer be there after
171-
// a disconnect. Destroy node after releasing
172-
// Waiter::m_mutex, so the ProxyClient<Thread>
173-
// destructor is able to use EventLoop::mutex
174-
// without violating lock order.
175-
ConnThreads::node_type removed;
176-
{
177-
Lock lock(thread_context.waiter->m_mutex);
178-
removed = request_threads.extract(server.m_context.connection);
179-
}
180-
}
181-
});
182-
);
183-
if (server_context.request_canceled) {
184-
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed";
185-
} else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
186-
try {
187-
fn.invoke(server_context, args...);
188-
} catch (const InterruptException& e) {
189-
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
190-
}
191-
})) {
192-
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")";
193-
throw kj::mv(*exception);
136+
// If an entry was inserted into the request_threads map,
137+
// remove it after calling fn.invoke. If an entry was not
138+
// inserted, one already existed, meaning this must be a
139+
// recursive call (IPC call calling back to the caller which
140+
// makes another IPC call), so avoid modifying the map.
141+
const bool erase_thread{inserted};
142+
KJ_DEFER(
143+
// Release the cancel lock before calling loop->sync and
144+
// waiting for the event loop thread, because if a
145+
// cancellation happened, it needs to run the on_cancel
146+
// callback above. It's safe to release cancel_lock at
147+
// this point because the fn.invoke() call below will be
148+
// finished and no longer accessing the params or
149+
// results structs.
150+
cancel_lock.m_lock.unlock();
151+
// Erase the request_threads entry on the event loop
152+
// thread with loop->sync(), so if the connection is
153+
// broken there is not a race between this thread and
154+
// the disconnect handler trying to destroy the thread
155+
// client object.
156+
loop.sync([&] {
157+
// Clear cancellation callback. At this point the
158+
// method invocation finished and the result is
159+
// either being returned, or discarded if a
160+
// cancellation happened. So we do not need to be
161+
// notified of cancellations after this point. Also
162+
// we do not want to be notified because
163+
// cancel_mutex and server_context could be out of
164+
// scope when it happens.
165+
cancel_monitor.m_on_cancel = nullptr;
166+
auto self_dispose{kj::mv(self)};
167+
if (erase_thread) {
168+
// Look up the thread again without using existing
169+
// iterator since entry may no longer be there after
170+
// a disconnect. Destroy node after releasing
171+
// Waiter::m_mutex, so the ProxyClient<Thread>
172+
// destructor is able to use EventLoop::mutex
173+
// without violating lock order.
174+
ConnThreads::node_type removed;
175+
{
176+
Lock lock(thread_context.waiter->m_mutex);
177+
removed = request_threads.extract(server.m_context.connection);
194178
}
195-
// End of scope: if KJ_DEFER was reached, it runs here
196179
}
197-
return call_context;
198-
};
180+
});
181+
);
182+
if (server_context.request_canceled) {
183+
MP_LOG(loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed";
184+
} else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
185+
try {
186+
fn.invoke(server_context, args...);
187+
} catch (const InterruptException& e) {
188+
MP_LOG(loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
189+
}
190+
})) {
191+
MP_LOG(loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")";
192+
throw kj::mv(*exception);
193+
}
194+
return call_context;
195+
// End of scope: if KJ_DEFER was reached, it runs here
196+
};
199197

200198
// Lookup Thread object specified by the client. The specified thread should
201199
// be a local Thread::Server object, but it needs to be looked up
@@ -204,17 +202,17 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
204202
Context::Reader context_arg = Accessor::get(params);
205203
auto thread_client = context_arg.getThread();
206204
auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
207-
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
205+
.then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
208206
// Assuming the thread object is found, pass it a pointer to the
209207
// `invoke` lambda above which will invoke the function on that
210208
// thread.
211209
KJ_IF_MAYBE (thread_server, perhaps) {
212210
auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
213-
MP_LOG(*server.m_context.loop, Log::Debug)
211+
MP_LOG(loop, Log::Debug)
214212
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
215213
return thread.template post<typename ServerContext::CallContext>(std::move(invoke));
216214
} else {
217-
MP_LOG(*server.m_context.loop, Log::Error)
215+
MP_LOG(loop, Log::Error)
218216
<< "IPC server error request #" << req << ", missing thread to execute request";
219217
throw std::runtime_error("invalid thread handle");
220218
}

0 commit comments

Comments
 (0)