@@ -297,5 +297,68 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
297297 signal.set_value ();
298298}
299299
300+ KJ_TEST (" Make simultaneous IPC callbacks with same request_thread and callback_thread" )
301+ {
302+ TestSetup setup;
303+ ProxyClient<messages::FooInterface>* foo = setup.client .get ();
304+
305+ class Callback : public FooCallback
306+ {
307+ public:
308+ Callback (int expect, int ret) : m_expect(expect), m_ret(ret) {}
309+ int call (int arg) override
310+ {
311+ KJ_EXPECT (arg == m_expect);
312+ return m_ret;
313+ }
314+ int m_expect, m_ret;
315+ };
316+
317+ foo->initThreadMap ();
318+ auto callback{std::make_shared<Callback>(1 , 2 )};
319+ // Note: Calling saveCallback initializes the callback_thread and request_thread
320+ // for this connection. These threads will be reused for the simultaneous calls below.
321+ foo->saveCallback (callback);
322+
323+ ThreadContext& tc{g_thread_context};
324+ auto callback_thread{tc.callback_threads .at (foo->m_context .connection )->m_client };
325+ auto request_thread{tc.request_threads .at (foo->m_context .connection )->m_client };
326+
327+ auto client{foo->m_client };
328+ bool caught_thread_busy = false ;
329+ // NOTE: '3' was choosen because it was the lowest number
330+ // of simultaneous calls required to reliably catch a "thread busy" error
331+ size_t running{3 };
332+ foo->m_context .loop ->sync ([&] {
333+ for (size_t i = 0 ; i < running; i++) {
334+ auto callbackReq{client.callbackSavedRequest ()};
335+ auto context{callbackReq.initContext ()};
336+ context.setCallbackThread (callback_thread);
337+ context.setThread (request_thread);
338+ callbackReq.setArg (1 );
339+ foo->m_context .loop ->m_task_set ->add (callbackReq.send ().then (
340+ [&](auto && results) {
341+ KJ_EXPECT (results.getResult () == 2 );
342+ running -= 1 ;
343+ tc.waiter ->m_cv .notify_all ();
344+ },
345+ [&](kj::Exception&& e) {
346+ KJ_EXPECT (std::string_view{e.getDescription ().cStr ()} ==
347+ " remote exception: std::exception: thread busy" );
348+ caught_thread_busy = true ;
349+ running -= 1 ;
350+ tc.waiter ->m_cv .notify_all ();
351+ }
352+ ));
353+ }
354+ });
355+ {
356+ Lock lock (tc.waiter ->m_mutex );
357+ tc.waiter ->wait (lock, [&running] { return running == 0 ; });
358+ }
359+ foo->saveCallback (nullptr ); // Clear saved callback
360+ KJ_EXPECT (caught_thread_busy);
361+ }
362+
300363} // namespace test
301364} // namespace mp
0 commit comments