Skip to content

Commit db03a66

Browse files
committed
Merge bitcoin-core#214: Fix crash on simultaneous IPC calls using the same thread
1238170 test: simultaneous IPC calls using same thread (Novo) eb069ab Fix crash on simultaneous IPC calls using the same thread (Novo) Pull request description: Relevant Issue: bitcoin-core#206 This error occurs when non-C++ clients make simultaneous IPC calls to the server using the same Server thread. The libmultiprocess C++ client ensures a 1-to-1 mapping of client-to-server threads, so simultaneous calls using the same thread cannot be made. --- ### **Testing** I have added a unit test in the last commit. You can cherry-pick this commit on master to reproduce the crash. ACKs for top commit: ryanofsky: Code review ACK 1238170, since the code changes here all make sense except the initial `running = 3` value, and there is a comment about that, so at least it's called out and may be something that can be improved later. Tree-SHA512: 799793be3861694df3a97f6140c8d38909175fa20e428dbd49762175ee051297f20d9099e46ad0845dc99b7504b1a934a924c27198b5229a1c3fadb453763007
2 parents afcc40b + 1238170 commit db03a66

File tree

3 files changed

+82
-3
lines changed

3 files changed

+82
-3
lines changed

include/mp/proxy-io.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,12 +281,13 @@ struct Waiter
281281
Waiter() = default;
282282

283283
template <typename Fn>
284-
void post(Fn&& fn)
284+
bool post(Fn&& fn)
285285
{
286286
const Lock lock(m_mutex);
287-
assert(!m_fn);
287+
if (m_fn) return false;
288288
m_fn = std::forward<Fn>(fn);
289289
m_cv.notify_all();
290+
return true;
290291
}
291292

292293
template <class Predicate>

include/mp/type-context.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,12 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
152152
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
153153
server.m_context.loop->log()
154154
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
155-
thread.m_thread_context.waiter->post(std::move(invoke));
155+
if (!thread.m_thread_context.waiter->post(std::move(invoke))) {
156+
server.m_context.loop->log()
157+
<< "IPC server error request #" << req
158+
<< " {" << thread.m_thread_context.thread_name << "}" << ", thread busy";
159+
throw std::runtime_error("thread busy");
160+
}
156161
} else {
157162
server.m_context.loop->log()
158163
<< "IPC server error request #" << req << ", missing thread to execute request";

test/mp/test/test.cpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,28 @@
55
#include <mp/test/foo.capnp.h>
66
#include <mp/test/foo.capnp.proxy.h>
77

8+
#include <atomic>
89
#include <capnp/capability.h>
910
#include <capnp/rpc.h>
11+
#include <condition_variable>
1012
#include <cstring>
13+
#include <exception>
1114
#include <functional>
1215
#include <future>
1316
#include <iostream>
1417
#include <kj/async.h>
1518
#include <kj/async-io.h>
1619
#include <kj/common.h>
1720
#include <kj/debug.h>
21+
#include <kj/exception.h>
1822
#include <kj/memory.h>
23+
#include <kj/string.h>
1924
#include <kj/test.h>
2025
#include <memory>
2126
#include <mp/proxy.h>
27+
#include "mp/proxy.capnp.h"
2228
#include <mp/proxy-io.h>
29+
#include "mp/util.h"
2330
#include <optional>
2431
#include <set>
2532
#include <stdexcept>
@@ -297,5 +304,71 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
297304
signal.set_value();
298305
}
299306

307+
KJ_TEST("Make simultaneous IPC callbacks with same request_thread and callback_thread")
308+
{
309+
TestSetup setup;
310+
ProxyClient<messages::FooInterface>* foo = setup.client.get();
311+
std::promise<void> signal;
312+
313+
foo->initThreadMap();
314+
// Use callFnAsync() to get the client to setup the request_thread
315+
// that will be used for the test.
316+
setup.server->m_impl->m_fn = [&] {};
317+
foo->callFnAsync();
318+
ThreadContext& tc{g_thread_context};
319+
std::optional<Thread::Client> callback_thread, request_thread;
320+
{
321+
Lock lock(tc.waiter->m_mutex);
322+
callback_thread = tc.callback_threads.at(foo->m_context.connection)->m_client;
323+
request_thread = tc.request_threads.at(foo->m_context.connection)->m_client;
324+
}
325+
326+
setup.server->m_impl->m_fn = [&] {
327+
try
328+
{
329+
signal.get_future().get();
330+
}
331+
catch(const std::exception& e)
332+
{
333+
KJ_EXPECT(e.what() == std::string("Future already retrieved"));
334+
}
335+
};
336+
337+
auto client{foo->m_client};
338+
bool caught_thread_busy = false;
339+
// NOTE: '3' was choosen because it was the lowest number
340+
// of simultaneous calls required to reliably catch a "thread busy" error
341+
std::atomic<size_t> running{3};
342+
foo->m_context.loop->sync([&]
343+
{
344+
for (size_t i = 0; i < running; i++)
345+
{
346+
auto request{client.callFnAsyncRequest()};
347+
auto context{request.initContext()};
348+
context.setCallbackThread(*callback_thread);
349+
context.setThread(*request_thread);
350+
foo->m_context.loop->m_task_set->add(request.send().then(
351+
[&](auto&& results) {
352+
running -= 1;
353+
tc.waiter->m_cv.notify_all();
354+
},
355+
[&](kj::Exception&& e) {
356+
KJ_EXPECT(std::string_view{e.getDescription().cStr()} ==
357+
"remote exception: std::exception: thread busy");
358+
caught_thread_busy = true;
359+
running -= 1;
360+
signal.set_value();
361+
tc.waiter->m_cv.notify_all();
362+
}
363+
));
364+
}
365+
});
366+
{
367+
Lock lock(tc.waiter->m_mutex);
368+
tc.waiter->wait(lock, [&running] { return running == 0; });
369+
}
370+
KJ_EXPECT(caught_thread_busy);
371+
}
372+
300373
} // namespace test
301374
} // namespace mp

0 commit comments

Comments
 (0)