Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 67 additions & 3 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ struct ServerInvokeContext : InvokeContext
ProxyServer& proxy_server;
CallContext& call_context;
int req;
//! If the IPC method executes asynchronously (not on the event loop thread)
//! and the IPC call was cancelled by the client, or cancelled by a
//! disconnection, this is set to true. If the call runs on the event loop
//! thread, it can't be cancelled.
std::atomic<bool> cancelled{false};

ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
: InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
Expand Down Expand Up @@ -82,11 +87,20 @@ template <>
struct ProxyServer<Thread> final : public Thread::Server
{
public:
ProxyServer(ThreadContext& thread_context, std::thread&& thread);
ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread);
~ProxyServer();
kj::Promise<void> getName(GetNameContext context) override;

//! Run a callback function returning T on this thread.
template<typename T, typename Fn>
kj::Promise<T> post(Fn&& fn);

EventLoopRef m_loop;
ThreadContext& m_thread_context;
std::thread m_thread;
//! Promise signaled when m_thread_context.waiter is idle and there is no
//! post() callback function waiting to execute.
kj::Promise<void> m_thread_ready{kj::READY_NOW};
};

//! Handler for kj::TaskSet failed task events.
Expand Down Expand Up @@ -322,7 +336,12 @@ class EventLoop
//! thread is blocked waiting for server response, this is what allows the
//! client to run the request in the same thread, the same way code would run in a
//! single process, with the callback sharing the same thread stack as the original
//! call.)
//! call.) To support this, the clientInvoke function calls Waiter::wait() to
//! block the client IPC thread while initial request is in progress. Then if
//! there is a callback, it is executed with Waiter::post().
//!
//! The Waiter class is also used server-side by `ProxyServer<Thread>::post()`
//! to execute IPC calls on worker threads.
struct Waiter
{
Waiter() = default;
Expand Down Expand Up @@ -408,14 +427,17 @@ class Connection
// will never run after connection object is destroyed. But when disconnect
// handler fires, do not call the function f right away, instead add it
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
// error in cases where f deletes this Connection object.
// error in the typical case where f deletes this Connection object.
m_on_disconnect.add(m_network.onDisconnect().then(
[f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
}

EventLoopRef m_loop;
kj::Own<kj::AsyncIoStream> m_stream;
LoggingErrorHandler m_error_handler{*m_loop};
//! TaskSet used to cancel the m_network.onDisconnect() handler for remote
//! disconnections, if the connection is closed locally first by deleting
//! this Connection object.
kj::TaskSet m_on_disconnect{m_error_handler};
::capnp::TwoPartyVatNetwork m_network;
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
Expand All @@ -428,6 +450,11 @@ class Connection
//! ThreadMap.makeThread) used to service requests to clients.
::capnp::CapabilityServerSet<Thread> m_threads;

//! Canceler for canceling promises that we want to discard when the
//! connection is destroyed. This is used to interrupt method calls that are
//! still executing at time of disconnection.
kj::Canceler m_canceler;

//! Cleanup functions to run if connection is broken unexpectedly. List
//! will be empty if all ProxyClient are destroyed cleanly before the
//! connection is destroyed.
Expand Down Expand Up @@ -675,6 +702,43 @@ struct ThreadContext
bool loop_thread = false;
};

template<typename T, typename Fn>
kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
{
auto ready = kj::newPromiseAndFulfiller<void>(); // Signaled when waiter is idle again.
auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
CancelMonitor& cancel_monitor = *cancel_monitor_ptr;
auto self = thisCap();
auto ret = m_thread_ready.then([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable { ready_fulfiller->fulfill(); });
std::optional<T> result;
kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result.emplace(fn(*cancel_monitor_ptr)); })};
m_loop->sync([this, &result, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
auto result_fulfiller_dispose{kj::mv(result_fulfiller)};
auto cancel_monitor_dispose{kj::mv(cancel_monitor_ptr)};
KJ_IF_MAYBE(e, exception) {
assert(!result);
result_fulfiller_dispose->reject(kj::mv(*e));
} else {
assert(result);
result_fulfiller_dispose->fulfill(kj::mv(*result));
}
m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
});
});
// Assert that calling Waiter::post did not fail. It could only return
// false if a new function was posted before the previous one finished
// executing, but new functions are only posted when m_thread_ready is
// is signaled, so this should never happen.
assert(posted);
return kj::mv(result.promise);
}).attach(kj::heap<CancelProbe>(cancel_monitor));
m_thread_ready = kj::mv(ready.promise);
return ret;
}

//! Given stream file descriptor, make a new ProxyClient object to send requests
//! over the stream. Also create a new Connection object embedded in the
//! client that is freed when the client is closed.
Expand Down
10 changes: 10 additions & 0 deletions include/mp/proxy-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,16 @@ struct ServerRet : Parent
void invoke(ServerContext& server_context, TypeList<>, Args&&... args) const
{
auto&& result = Parent::invoke(server_context, TypeList<>(), std::forward<Args>(args)...);
// If IPC request was cancelled, there is no point continuing to execute.
// It's also important to stop executing because the connection may have
// been destroyed as described in
// https://github.com/bitcoin/bitcoin/issues/34250 and there would be a
// crash if execution continued.
// TODO: Note this detection is racy because cancellation could happen
// after this check. However, fixing this would require changing the
// definition of the InvokeContext struct and updating a lot of code, so
// this can be done in a followup.
if (server_context.cancelled) throw InterruptException{"cancelled"};
auto&& results = server_context.call_context.getResults();
InvokeContext& invoke_context = server_context;
BuildField(TypeList<decltype(result)>(), invoke_context, Make<StructField, Accessor>(results),
Expand Down
1 change: 1 addition & 0 deletions include/mp/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ struct ProxyServerBase : public virtual Interface_::Server
ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection);
virtual ~ProxyServerBase();
void invokeDestroy();
using Interface_::Server::thisCap;

/**
* Implementation pointer that may or may not be owned and deleted when this
Expand Down
69 changes: 39 additions & 30 deletions include/mp/type-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void CustomBuildField(TypeList<>,
// future calls over this connection can reuse it.
auto [callback_thread, _]{SetThread(
GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
[&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
[&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(connection, thread_context, std::thread{})); })};

// Call remote ThreadMap.makeThread function so server will create a
// dedicated worker thread to run function calls from this thread. Store the
Expand Down Expand Up @@ -61,11 +61,11 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
{
const auto& params = server_context.call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>();
auto& server = server_context.proxy_server;
int req = server_context.req;
auto invoke = [fulfiller = kj::mv(future.fulfiller),
call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
auto self = server.thisCap();
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req;
const auto& params = call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
ServerContext server_context{server, call_context, req};
Expand All @@ -92,6 +92,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
ConnThread request_thread;
bool inserted;
server.m_context.loop->sync([&] {
// Detect request being cancelled before or while it executes.
if (cancel_monitor.m_cancelled) MP_LOG(*server.m_context.loop, Log::Raise) << "IPC server request #" << req << " cancelled before it could be executed";
assert(!cancel_monitor.m_on_cancel);
cancel_monitor.m_on_cancel = [&server, &server_context, req]() {
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " cancelled while executing.";
server_context.cancelled = true;
};

// Update requests_threads map if not cancelled.
std::tie(request_thread, inserted) = SetThread(
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
[&] { return context_arg.getCallbackThread(); });
Expand All @@ -103,13 +112,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// recursive call (IPC call calling back to the caller which
// makes another IPC call), so avoid modifying the map.
const bool erase_thread{inserted};
KJ_DEFER(if (erase_thread) {
KJ_DEFER(
// Erase the request_threads entry on the event loop
// thread with loop->sync(), so if the connection is
// broken there is not a race between this thread and
// the disconnect handler trying to destroy the thread
// client object.
server.m_context.loop->sync([&] {
auto self_dispose{kj::mv(self)};
if (erase_thread) {
// Look up the thread again without using existing
// iterator since entry may no longer be there after
// a disconnect. Destroy node after releasing
Expand All @@ -121,51 +132,49 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
Lock lock(thread_context.waiter->m_mutex);
removed = request_threads.extract(server.m_context.connection);
}
}
});
});
fn.invoke(server_context, args...);
}
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
server.m_context.loop->sync([&] {
auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->fulfill(kj::mv(call_context));
});
}))
{
server.m_context.loop->sync([&]() {
auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->reject(kj::mv(*exception));
});
);
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
try {
fn.invoke(server_context, args...);
} catch (const InterruptException& e) {
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
}
})) {
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " uncaught exception.";
throw exception;
}
}
return call_context;
};

// Lookup Thread object specified by the client. The specified thread should
// be a local Thread::Server object, but it needs to be looked up
// asynchronously with getLocalServer().
auto thread_client = context_arg.getThread();
return server.m_context.connection->m_threads.getLocalServer(thread_client)
auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
// Assuming the thread object is found, pass it a pointer to the
// `invoke` lambda above which will invoke the function on that
// thread.
KJ_IF_MAYBE (thread_server, perhaps) {
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
MP_LOG(*server.m_context.loop, Log::Debug)
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
if (!thread.m_thread_context.waiter->post(std::move(invoke))) {
MP_LOG(*server.m_context.loop, Log::Error)
<< "IPC server error request #" << req
<< " {" << thread.m_thread_context.thread_name << "}" << ", thread busy";
throw std::runtime_error("thread busy");
}
return thread.template post<typename ServerContext::CallContext>(std::move(invoke));
} else {
MP_LOG(*server.m_context.loop, Log::Error)
<< "IPC server error request #" << req << ", missing thread to execute request";
throw std::runtime_error("invalid thread handle");
}
})
// Wait for the invocation to finish before returning to the caller.
.then([invoke_wait = kj::mv(future.promise)]() mutable { return kj::mv(invoke_wait); });
});
// Use connection m_canceler object to cancel the result promise if the
// connection is destroyed. (By default. Cap'n Proto does not cancel
// requests on disconnect, since it's possible clients could send requests
// and disconnect without waiting for the results and not want those
// requests to be cancelled.)
return server.m_context.connection->m_canceler.wrap(kj::mv(result));
}
} // namespace mp

Expand Down
59 changes: 59 additions & 0 deletions include/mp/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <cassert>
#include <cstddef>
#include <cstring>
#include <exception>
#include <functional>
#include <kj/string-tree.h>
#include <mutex>
Expand Down Expand Up @@ -238,6 +239,64 @@ inline char* CharCast(unsigned char* c) { return (char*)c; }
inline const char* CharCast(const char* c) { return c; }
inline const char* CharCast(const unsigned char* c) { return (const char*)c; }

//! Exception that can be thrown from code executing an IPC call that is interrupted.
struct InterruptException final : std::exception {
explicit InterruptException(std::string message) : m_message(std::move(message)) {}
const char* what() const noexcept override { return m_message.c_str(); }
std::string m_message;
};

class CancelProbe;

//! Helper class that detects when a promise is cancelled. Used to detect
//! cancelled requests and prevent potential crashes on unclean disconnects.
//!
//! In the future, this could also be used to support a way for wrapped C++
//! methods to detect cancellation (like approach #4 in
//! https://github.com/bitcoin/bitcoin/issues/33575).
class CancelMonitor
{
public:
inline ~CancelMonitor();
inline void setCancelled(CancelProbe& probe);

bool m_cancelled{false};
std::function<void()> m_on_cancel;
CancelProbe* m_probe{nullptr};
};

//! Helper object to attach to a promise and update a CancelMonitor.
class CancelProbe
{
public:
CancelProbe(CancelMonitor& monitor) : m_monitor(&monitor)
{
assert(!monitor.m_probe);
monitor.m_probe = this;
}
~CancelProbe()
{
if (m_monitor) m_monitor->setCancelled(*this);
}
CancelMonitor* m_monitor;
};

CancelMonitor::~CancelMonitor()
{
if (m_probe) {
assert(m_probe->m_monitor == this);
m_probe->m_monitor = nullptr;
m_probe = nullptr;
}
}

void CancelMonitor::setCancelled(CancelProbe& probe)
{
assert(m_probe == &probe);
m_cancelled = true;
if (m_on_cancel) m_on_cancel();
m_probe = nullptr;
}
} // namespace mp

#endif // MP_UTIL_H
1 change: 1 addition & 0 deletions src/mp/gen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ static void Generate(kj::StringPtr src_prefix,
cpp_server << "#include <kj/async.h>\n";
cpp_server << "#include <kj/common.h>\n";
cpp_server << "#include <kj/exception.h>\n";
cpp_server << "#include <kj/tuple.h>\n";
cpp_server << "#include <mp/proxy.h>\n";
cpp_server << "#include <mp/util.h>\n";
cpp_server << "#include <" << PROXY_TYPES << ">\n";
Expand Down
Loading
Loading