Skip to content

Commit 8d3efbc

Browse files
committed
Allow simultaneous calls on same Context.thread
If multiple IPC requests happen at the same time specifying same Context.thread to run the requests on, queue the requests to execute in the order they are received instead of raising a "thread busy" exception. This change has no effect on C++ clients using libmultiprocess as a client library, since the libmultiprocess client only makes blocking calls and creates a server thread for every client thread, so it's not possible for there to be multiple calls on the same server thread. But this change may be useful for rust and python clients as discussed bitcoin/bitcoin#33923
1 parent cbbe121 commit 8d3efbc

File tree

4 files changed

+46
-33
lines changed

4 files changed

+46
-33
lines changed

include/mp/proxy-io.h

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ struct ProxyServer<Thread> final : public Thread::Server
9696
EventLoopRef m_loop;
9797
ThreadContext& m_thread_context;
9898
std::thread m_thread;
99+
//! Promise signaled when m_thread_context.waiter is idle and there is no
100+
//! post() callback function waiting to execute.
101+
kj::Promise<void> m_thread_ready{kj::READY_NOW};
99102
};
100103

101104
//! Handler for kj::TaskSet failed task events.
@@ -692,9 +695,23 @@ struct ThreadContext
692695
template<typename T, typename Fn>
693696
kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
694697
{
695-
{
698+
auto ready = kj::newPromiseAndFulfiller<void>(); // Signaled when waiter is idle again.
699+
auto ret = m_thread_ready.then([this, fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller)]() mutable {
696700
auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
697-
bool posted = m_thread_context.waiter->post([this, fn = std::forward<Fn>(fn), result_fulfiller = kj::mv(result.fulfiller)]() mutable {
701+
bool posted = m_thread_context.waiter->post([this, fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller)]() mutable {
702+
// Fullfill ready.promise now, as soon as the Waiter starts
703+
// executing this lambda, so the next ProxyServer<Thread>::post()
704+
// call can immediately call waiter->post() without needing to wait
705+
// again. It is important to do this before calling fn() because
706+
// fn() can make an IPC call back to the client, which can make
707+
// another IPC call to this server thread. (This typically happens
708+
// when IPC methods take std::function parameters.) When this
709+
// happens the second call to the server thread should not be
710+
// blocked waiting for the first call.
711+
m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable {
712+
ready_fulfiller->fulfill();
713+
ready_fulfiller = nullptr;
714+
});
698715
std::optional<T> result_value;
699716
kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn()); })};
700717
m_loop->sync([&result_value, &exception, result_fulfiller = kj::mv(result_fulfiller)]() mutable {
@@ -709,9 +726,15 @@ kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
709726
result_fulfiller = nullptr;
710727
});
711728
});
712-
if (!posted) throw std::runtime_error("thread busy");
729+
// Assert that calling Waiter::post did not fail. It could only return
730+
// false if a new function was posted before the previous one finished
731+
// executing, but new functions are only posted when m_thread_ready is
732+
// signaled, so this should never happen.
733+
assert(posted);
713734
return kj::mv(result.promise);
714-
}
735+
});
736+
m_thread_ready = kj::mv(ready.promise);
737+
return ret;
715738
}
716739

717740
//! Given stream file descriptor, make a new ProxyClient object to send requests

test/mp/test/foo.capnp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ interface FooInterface $Proxy.wrap("mp::test::FooImplementation") {
3333
passFn @16 (context :Proxy.Context, fn :FooFn) -> (result :Int32);
3434
callFn @17 () -> ();
3535
callFnAsync @18 (context :Proxy.Context) -> ();
36+
callIntFnAsync @21 (context :Proxy.Context, arg :Int32) -> (result :Int32);
3637
}
3738

3839
interface FooCallback $Proxy.wrap("mp::test::FooCallback") {

test/mp/test/foo.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ class FooImplementation
8383
std::shared_ptr<FooCallback> m_callback;
8484
void callFn() { assert(m_fn); m_fn(); }
8585
void callFnAsync() { assert(m_fn); m_fn(); }
86+
int callIntFnAsync(int arg) { assert(m_int_fn); return m_int_fn(arg); }
8687
std::function<void()> m_fn;
88+
std::function<int(int)> m_int_fn;
8789
};
8890

8991
} // namespace test

test/mp/test/test.cpp

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@
88
#include <atomic>
99
#include <capnp/capability.h>
1010
#include <capnp/rpc.h>
11+
#include <cassert>
1112
#include <condition_variable>
13+
#include <cstdint>
1214
#include <cstring>
1315
#include <functional>
1416
#include <future>
1517
#include <kj/async.h>
1618
#include <kj/async-io.h>
1719
#include <kj/common.h>
1820
#include <kj/debug.h>
19-
#include <kj/exception.h>
2021
#include <kj/memory.h>
21-
#include <kj/string.h>
2222
#include <kj/test.h>
2323
#include <memory>
2424
#include <mp/proxy.h>
@@ -31,7 +31,6 @@
3131
#include <stdexcept>
3232
#include <string>
3333
#include <string_view>
34-
#include <system_error>
3534
#include <thread>
3635
#include <type_traits>
3736
#include <utility>
@@ -317,7 +316,7 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
317316
signal.set_value();
318317
}
319318

320-
KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error")
319+
KJ_TEST("Make simultaneous IPC calls on single remote thread")
321320
{
322321
TestSetup setup;
323322
ProxyClient<messages::FooInterface>* foo = setup.client.get();
@@ -336,51 +335,39 @@ KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error")
336335
request_thread = &tc.request_threads.at(foo->m_context.connection)->m_client;
337336
});
338337

339-
setup.server->m_impl->m_fn = [&] {
340-
try
341-
{
342-
signal.get_future().get();
343-
}
344-
catch (const std::future_error& e)
345-
{
346-
KJ_EXPECT(e.code() == std::make_error_code(std::future_errc::future_already_retrieved));
347-
}
338+
// Call callIntFnAsync 3 times with n=100, 200, 300
339+
std::atomic<int> expected = 100;
340+
341+
setup.server->m_impl->m_int_fn = [&](int n) {
342+
assert(n == expected);
343+
expected += 100;
344+
return n;
348345
};
349346

350347
auto client{foo->m_client};
351-
bool caught_thread_busy = false;
352-
// NOTE: '3' was chosen because it was the lowest number
353-
// of simultaneous calls required to reliably catch a "thread busy" error
354348
std::atomic<size_t> running{3};
355349
foo->m_context.loop->sync([&]
356350
{
357351
for (size_t i = 0; i < running; i++)
358352
{
359-
auto request{client.callFnAsyncRequest()};
353+
auto request{client.callIntFnAsyncRequest()};
360354
auto context{request.initContext()};
361355
context.setCallbackThread(*callback_thread);
362356
context.setThread(*request_thread);
357+
request.setArg(100 * (i+1));
363358
foo->m_context.loop->m_task_set->add(request.send().then(
364-
[&](auto&& results) {
365-
running -= 1;
366-
tc.waiter->m_cv.notify_all();
367-
},
368-
[&](kj::Exception&& e) {
369-
KJ_EXPECT(std::string_view{e.getDescription().cStr()} ==
370-
"remote exception: std::exception: thread busy");
371-
caught_thread_busy = true;
359+
[&running, &tc, i](auto&& results) {
360+
assert(results.getResult() == static_cast<int32_t>(100 * (i+1)));
372361
running -= 1;
373-
signal.set_value();
374362
tc.waiter->m_cv.notify_all();
375-
}
376-
));
363+
}));
377364
}
378365
});
379366
{
380367
Lock lock(tc.waiter->m_mutex);
381368
tc.waiter->wait(lock, [&running] { return running == 0; });
382369
}
383-
KJ_EXPECT(caught_thread_busy);
370+
KJ_EXPECT(expected == 400);
384371
}
385372

386373
} // namespace test

0 commit comments

Comments
 (0)