@@ -126,6 +126,23 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
126
126
Context::Reader context_arg = Accessor::get (params);
127
127
ServerContext server_context{server, call_context, req};
128
128
{
129
+ // Before invoking the function, store a reference to the
130
+ // callbackThread provided by the client in the
131
+ // thread_local.request_threads map. This way, if this
132
+ // server thread needs to execute any RPCs that call back to
133
+ // the client, they will happen on the same client thread
134
+ // that is waiting for this function, just like what would
135
+ // happen if this were a normal function call made on the
136
+ // local stack.
137
+ //
138
+ // If the request_threads map already has an entry for this
139
+ // connection, it will be left unchanged, and it indicates
140
+ // that the current thread is an RPC client thread which is
141
+ // in the middle of an RPC call, and the current RPC call is
142
+ // a nested call from the remote thread handling that RPC
143
+ // call. In this case, the callbackThread value should point
144
+ // to the same thread already in the map, so there is no
145
+ // need to update the map.
129
146
auto & request_threads = g_thread_context.request_threads ;
130
147
auto request_thread = request_threads.find (server.m_context .connection );
131
148
if (request_thread == request_threads.end ()) {
@@ -136,8 +153,11 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
136
153
/* destroy_connection= */ false ))
137
154
.first ;
138
155
} else {
139
- // If recursive call, avoid remove request_threads map
140
- // entry in KJ_DEFER below.
156
+ // The requests_threads map already has an entry for
157
+ // this connection, so this must be a recursive call.
158
+ // Avoid modifying the map in this case by resetting the
159
+ // request_thread iterator, so the KJ_DEFER statement
160
+ // below doesn't do anything.
141
161
request_thread = request_threads.end ();
142
162
}
143
163
KJ_DEFER (if (request_thread != request_threads.end ()) request_threads.erase (request_thread));
0 commit comments