Skip to content

Commit 1238170

Browse files
committed
test: simultaneous IPC calls using same thread
Although the libmultiprocess client prevents this kind of situation from occuring, it easily occurs in non-libmultiprocess clients
1 parent eb069ab commit 1238170

File tree

1 file changed

+73
-0
lines changed

1 file changed

+73
-0
lines changed

test/mp/test/test.cpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,28 @@
55
#include <mp/test/foo.capnp.h>
66
#include <mp/test/foo.capnp.proxy.h>
77

8+
#include <atomic>
89
#include <capnp/capability.h>
910
#include <capnp/rpc.h>
11+
#include <condition_variable>
1012
#include <cstring>
13+
#include <exception>
1114
#include <functional>
1215
#include <future>
1316
#include <iostream>
1417
#include <kj/async.h>
1518
#include <kj/async-io.h>
1619
#include <kj/common.h>
1720
#include <kj/debug.h>
21+
#include <kj/exception.h>
1822
#include <kj/memory.h>
23+
#include <kj/string.h>
1924
#include <kj/test.h>
2025
#include <memory>
2126
#include <mp/proxy.h>
27+
#include "mp/proxy.capnp.h"
2228
#include <mp/proxy-io.h>
29+
#include "mp/util.h"
2330
#include <optional>
2431
#include <set>
2532
#include <stdexcept>
@@ -297,5 +304,71 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
297304
signal.set_value();
298305
}
299306

307+
KJ_TEST("Make simultaneous IPC callbacks with same request_thread and callback_thread")
308+
{
309+
TestSetup setup;
310+
ProxyClient<messages::FooInterface>* foo = setup.client.get();
311+
std::promise<void> signal;
312+
313+
foo->initThreadMap();
314+
// Use callFnAsync() to get the client to setup the request_thread
315+
// that will be used for the test.
316+
setup.server->m_impl->m_fn = [&] {};
317+
foo->callFnAsync();
318+
ThreadContext& tc{g_thread_context};
319+
std::optional<Thread::Client> callback_thread, request_thread;
320+
{
321+
Lock lock(tc.waiter->m_mutex);
322+
callback_thread = tc.callback_threads.at(foo->m_context.connection)->m_client;
323+
request_thread = tc.request_threads.at(foo->m_context.connection)->m_client;
324+
}
325+
326+
setup.server->m_impl->m_fn = [&] {
327+
try
328+
{
329+
signal.get_future().get();
330+
}
331+
catch(const std::exception& e)
332+
{
333+
KJ_EXPECT(e.what() == std::string("Future already retrieved"));
334+
}
335+
};
336+
337+
auto client{foo->m_client};
338+
bool caught_thread_busy = false;
339+
// NOTE: '3' was choosen because it was the lowest number
340+
// of simultaneous calls required to reliably catch a "thread busy" error
341+
std::atomic<size_t> running{3};
342+
foo->m_context.loop->sync([&]
343+
{
344+
for (size_t i = 0; i < running; i++)
345+
{
346+
auto request{client.callFnAsyncRequest()};
347+
auto context{request.initContext()};
348+
context.setCallbackThread(*callback_thread);
349+
context.setThread(*request_thread);
350+
foo->m_context.loop->m_task_set->add(request.send().then(
351+
[&](auto&& results) {
352+
running -= 1;
353+
tc.waiter->m_cv.notify_all();
354+
},
355+
[&](kj::Exception&& e) {
356+
KJ_EXPECT(std::string_view{e.getDescription().cStr()} ==
357+
"remote exception: std::exception: thread busy");
358+
caught_thread_busy = true;
359+
running -= 1;
360+
signal.set_value();
361+
tc.waiter->m_cv.notify_all();
362+
}
363+
));
364+
}
365+
});
366+
{
367+
Lock lock(tc.waiter->m_mutex);
368+
tc.waiter->wait(lock, [&running] { return running == 0; });
369+
}
370+
KJ_EXPECT(caught_thread_busy);
371+
}
372+
300373
} // namespace test
301374
} // namespace mp

0 commit comments

Comments
 (0)