Skip to content

Commit 21886f2

Browse files
committed
test: simultaneous IPC calls using same thread
Although the libmultiprocess client prevents this kind of situation from occuring, it easily occurs in non-libmultiprocess clients
1 parent eb069ab commit 21886f2

File tree

1 file changed

+75
-0
lines changed

1 file changed

+75
-0
lines changed

test/mp/test/test.cpp

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
#include <mp/test/foo.capnp.h>
66
#include <mp/test/foo.capnp.proxy.h>
77

8+
#include <atomic>
89
#include <capnp/capability.h>
910
#include <capnp/rpc.h>
11+
#include <condition_variable>
1012
#include <cstring>
1113
#include <functional>
1214
#include <future>
@@ -15,11 +17,15 @@
1517
#include <kj/async-io.h>
1618
#include <kj/common.h>
1719
#include <kj/debug.h>
20+
#include <kj/exception.h>
1821
#include <kj/memory.h>
22+
#include <kj/string.h>
1923
#include <kj/test.h>
2024
#include <memory>
2125
#include <mp/proxy.h>
26+
#include "mp/proxy.capnp.h"
2227
#include <mp/proxy-io.h>
28+
#include "mp/util.h"
2329
#include <optional>
2430
#include <set>
2531
#include <stdexcept>
@@ -297,5 +303,74 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
297303
signal.set_value();
298304
}
299305

306+
KJ_TEST("Make simultaneous IPC callbacks with same request_thread and callback_thread")
307+
{
308+
TestSetup setup;
309+
ProxyClient<messages::FooInterface>* foo = setup.client.get();
310+
311+
class Callback : public FooCallback
312+
{
313+
public:
314+
Callback(int expect, int ret) : m_expect(expect), m_ret(ret) {}
315+
int call(int arg) override
316+
{
317+
KJ_EXPECT(arg == m_expect);
318+
return m_ret;
319+
}
320+
int m_expect, m_ret;
321+
};
322+
323+
foo->initThreadMap();
324+
auto callback{std::make_shared<Callback>(1, 2)};
325+
// Note: Calling saveCallback initializes the callback_thread and request_thread
326+
// for this connection. These threads will be reused for the simultaneous calls below.
327+
foo->saveCallback(callback);
328+
329+
ThreadContext& tc{g_thread_context};
330+
std::optional<Thread::Client> callback_thread, request_thread;
331+
{
332+
Lock lock(tc.waiter->m_mutex);
333+
callback_thread = tc.callback_threads.at(foo->m_context.connection)->m_client;
334+
request_thread = tc.request_threads.at(foo->m_context.connection)->m_client;
335+
}
336+
337+
auto client{foo->m_client};
338+
bool caught_thread_busy = false;
339+
// NOTE: '3' was choosen because it was the lowest number
340+
// of simultaneous calls required to reliably catch a "thread busy" error
341+
std::atomic<size_t> running{3};
342+
foo->m_context.loop->sync([&]
343+
{
344+
for (size_t i = 0; i < running; i++)
345+
{
346+
auto callbackReq{client.callbackSavedRequest()};
347+
auto context{callbackReq.initContext()};
348+
context.setCallbackThread(*callback_thread);
349+
context.setThread(*request_thread);
350+
callbackReq.setArg(1);
351+
foo->m_context.loop->m_task_set->add(callbackReq.send().then(
352+
[&](auto&& results) {
353+
KJ_EXPECT(results.getResult() == 2);
354+
running -= 1;
355+
tc.waiter->m_cv.notify_all();
356+
},
357+
[&](kj::Exception&& e) {
358+
KJ_EXPECT(std::string_view{e.getDescription().cStr()} ==
359+
"remote exception: std::exception: thread busy");
360+
caught_thread_busy = true;
361+
running -= 1;
362+
tc.waiter->m_cv.notify_all();
363+
}
364+
));
365+
}
366+
});
367+
{
368+
Lock lock(tc.waiter->m_mutex);
369+
tc.waiter->wait(lock, [&running] { return running == 0; });
370+
}
371+
foo->saveCallback(nullptr); // Clear saved callback
372+
KJ_EXPECT(caught_thread_busy);
373+
}
374+
300375
} // namespace test
301376
} // namespace mp

0 commit comments

Comments
 (0)