Skip to content

Commit 92a2c5f

Browse files
committed
Prevent crash on unclean disconnect if abandoned IPC call returns interface pointer
This is a potential fix for bitcoin/bitcoin#34250 which reports that bitcoin node crashes if a rust stratrumv2 mining client calls BlockTemplate.waitNext() and disconnects without waiting for a response from the call, and if mempool fees increased so the call returns a non-null interface BlockTemplate pointer. The node would crash in this case while trying to call MakeProxyServer on the returned BlockTemplate pointer, which would fail because MakeProxyServer would try to use a reference to the Connection object that had been deleted as a as a result of the disconnect. The fix works by: - Adding a Connection::m_canceler member variable and using it to cancel any IPC response promises that are pending when the connection is destroyed. - Updating type-context.h PassField() function to use promise.attach() as described https://capnproto.org/cxxrpc.html#cancellation to detect cancellation and set a ServerContext::m_cancelled variable. - Updating ServerCall to check the ServerContext::m_cancelled status after any C++ server method returns, and throw an exception if it is set. - Updating type-context.h PassField() function to deal with the exception by catching and logging it, and to deal with cancelled status by not trying to fulfill the cancelled promise.
1 parent 7d28183 commit 92a2c5f

File tree

7 files changed

+135
-13
lines changed

7 files changed

+135
-13
lines changed

include/mp/proxy-io.h

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ struct ServerInvokeContext : InvokeContext
4848
ProxyServer& proxy_server;
4949
CallContext& call_context;
5050
int req;
51+
//! If the IPC method executes asynchronously (not on the event loop thread)
52+
//! and the IPC call was cancelled by the client, or cancelled by a
53+
//! disconnection, this is set to true. If the call runs on the event loop
54+
//! thread, it can't be cancelled.
55+
std::atomic<bool> cancelled{false};
5156

5257
ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
5358
: InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
@@ -422,14 +427,17 @@ class Connection
422427
// will never run after connection object is destroyed. But when disconnect
423428
// handler fires, do not call the function f right away, instead add it
424429
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
425-
// error in cases where f deletes this Connection object.
430+
// error in the typical case where f deletes this Connection object.
426431
m_on_disconnect.add(m_network.onDisconnect().then(
427432
[f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
428433
}
429434

430435
EventLoopRef m_loop;
431436
kj::Own<kj::AsyncIoStream> m_stream;
432437
LoggingErrorHandler m_error_handler{*m_loop};
438+
//! TaskSet used to cancel the m_network.onDisconnect() handler for remote
439+
//! disconnections, if the connection is closed locally first by deleting
440+
//! this Connection object.
433441
kj::TaskSet m_on_disconnect{m_error_handler};
434442
::capnp::TwoPartyVatNetwork m_network;
435443
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
@@ -442,6 +450,11 @@ class Connection
442450
//! ThreadMap.makeThread) used to service requests to clients.
443451
::capnp::CapabilityServerSet<Thread> m_threads;
444452

453+
//! Canceler for canceling promises that we want to discard when the
454+
//! connection is destroyed. This is used to interrupt method calls that are
455+
//! still executing at time of disconnection.
456+
kj::Canceler m_canceler;
457+
445458
//! Cleanup functions to run if connection is broken unexpectedly. List
446459
//! will be empty if all ProxyClient are destroyed cleanly before the
447460
//! connection is destroyed.
@@ -693,20 +706,26 @@ template<typename T, typename Fn>
693706
kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
694707
{
695708
auto ready = kj::newPromiseAndFulfiller<void>(); // Signaled when waiter is idle again.
696-
auto ret = m_thread_ready.then([this, fn = std::move(fn), ready_fulfiller = kj::mv(ready.fulfiller)]() mutable {
709+
auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
710+
CancelMonitor& cancel_monitor = *cancel_monitor_ptr;
711+
auto self = thisCap();
712+
auto ret = m_thread_ready.then([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
697713
auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
698-
bool posted = m_thread_context.waiter->post([this, fn = std::move(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller)]() mutable {
714+
bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
699715
m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable { ready_fulfiller->fulfill(); });
700716
std::optional<T> result;
701-
kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result.emplace(fn()); })};
702-
m_loop->sync([&result, &exception, result_fulfiller = kj::mv(result_fulfiller)]() mutable {
717+
kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result.emplace(fn(*cancel_monitor_ptr)); })};
718+
m_loop->sync([this, &result, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
719+
auto result_fulfiller_dispose{kj::mv(result_fulfiller)};
720+
auto cancel_monitor_dispose{kj::mv(cancel_monitor_ptr)};
703721
KJ_IF_MAYBE(e, exception) {
704722
assert(!result);
705-
result_fulfiller->reject(kj::mv(*e));
723+
result_fulfiller_dispose->reject(kj::mv(*e));
706724
} else {
707725
assert(result);
708-
result_fulfiller->fulfill(kj::mv(*result));
726+
result_fulfiller_dispose->fulfill(kj::mv(*result));
709727
}
728+
m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
710729
});
711730
});
712731
// Assert that calling Waiter::post did not fail. It could only return
@@ -715,7 +734,7 @@ kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
715734
// is signaled, so this should never happen.
716735
assert(posted);
717736
return kj::mv(result.promise);
718-
});
737+
}).attach(kj::heap<CancelProbe>(cancel_monitor));
719738
m_thread_ready = kj::mv(ready.promise);
720739
return ret;
721740
}

include/mp/proxy-types.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,16 @@ struct ServerRet : Parent
469469
void invoke(ServerContext& server_context, TypeList<>, Args&&... args) const
470470
{
471471
auto&& result = Parent::invoke(server_context, TypeList<>(), std::forward<Args>(args)...);
472+
// If IPC request was cancelled, there is no point continuing to execute.
473+
// It's also important to stop executing because the connection may have
474+
// been destroyed as described in
475+
// https://github.com/bitcoin/bitcoin/issues/34250 and there would be a
476+
// crash if execution continued.
477+
// TODO: Note this detection is racy because cancellation could happen
478+
// after this check. However, fixing this would require changing the
479+
// definition of the InvokeContext struct and updating a lot of code, so
480+
// this can be done in a followup.
481+
if (server_context.cancelled) throw InterruptException{"cancelled"};
472482
auto&& results = server_context.call_context.getResults();
473483
InvokeContext& invoke_context = server_context;
474484
BuildField(TypeList<decltype(result)>(), invoke_context, Make<StructField, Accessor>(results),

include/mp/proxy.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ struct ProxyServerBase : public virtual Interface_::Server
153153
ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection);
154154
virtual ~ProxyServerBase();
155155
void invokeDestroy();
156+
using Interface_::Server::thisCap;
156157

157158
/**
158159
* Implementation pointer that may or may not be owned and deleted when this

include/mp/type-context.h

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
6363
Context::Reader context_arg = Accessor::get(params);
6464
auto& server = server_context.proxy_server;
6565
int req = server_context.req;
66-
auto invoke = [call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
66+
auto self = server.thisCap();
67+
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
6768
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req;
6869
const auto& params = call_context.getParams();
6970
Context::Reader context_arg = Accessor::get(params);
@@ -91,6 +92,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
9192
ConnThread request_thread;
9293
bool inserted;
9394
server.m_context.loop->sync([&] {
95+
// Detect request being cancelled before or while it executes.
96+
if (cancel_monitor.m_cancelled) MP_LOG(*server.m_context.loop, Log::Raise) << "IPC server request #" << req << " cancelled before it could be executed";
97+
assert(!cancel_monitor.m_on_cancel);
98+
cancel_monitor.m_on_cancel = [&server, &server_context, req]() {
99+
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " cancelled while executing.";
100+
server_context.cancelled = true;
101+
};
102+
103+
// Update requests_threads map if not cancelled.
94104
std::tie(request_thread, inserted) = SetThread(
95105
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
96106
[&] { return context_arg.getCallbackThread(); });
@@ -102,13 +112,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
102112
// recursive call (IPC call calling back to the caller which
103113
// makes another IPC call), so avoid modifying the map.
104114
const bool erase_thread{inserted};
105-
KJ_DEFER(if (erase_thread) {
115+
KJ_DEFER(
106116
// Erase the request_threads entry on the event loop
107117
// thread with loop->sync(), so if the connection is
108118
// broken there is not a race between this thread and
109119
// the disconnect handler trying to destroy the thread
110120
// client object.
111121
server.m_context.loop->sync([&] {
122+
auto self_dispose{kj::mv(self)};
123+
if (erase_thread) {
112124
// Look up the thread again without using existing
113125
// iterator since entry may no longer be there after
114126
// a disconnect. Destroy node after releasing
@@ -120,9 +132,19 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
120132
Lock lock(thread_context.waiter->m_mutex);
121133
removed = request_threads.extract(server.m_context.connection);
122134
}
135+
}
123136
});
124-
});
125-
fn.invoke(server_context, args...);
137+
);
138+
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
139+
try {
140+
fn.invoke(server_context, args...);
141+
} catch (const InterruptException& e) {
142+
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
143+
}
144+
})) {
145+
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " uncaught exception.";
146+
throw exception;
147+
}
126148
}
127149
return call_context;
128150
};
@@ -131,7 +153,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
131153
// be a local Thread::Server object, but it needs to be looked up
132154
// asynchronously with getLocalServer().
133155
auto thread_client = context_arg.getThread();
134-
return server.m_context.connection->m_threads.getLocalServer(thread_client)
156+
auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
135157
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
136158
// Assuming the thread object is found, pass it a pointer to the
137159
// `invoke` lambda above which will invoke the function on that
@@ -147,6 +169,12 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
147169
throw std::runtime_error("invalid thread handle");
148170
}
149171
});
172+
// Use connection m_canceler object to cancel the result promise if the
173+
// connection is destroyed. (By default. Cap'n Proto does not cancel
174+
// requests on disconnect, since it's possible clients could send requests
175+
// and disconnect without waiting for the results and not want those
176+
// requests to be cancelled.)
177+
return server.m_context.connection->m_canceler.wrap(kj::mv(result));
150178
}
151179
} // namespace mp
152180

include/mp/util.h

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <cassert>
1010
#include <cstddef>
1111
#include <cstring>
12+
#include <exception>
1213
#include <functional>
1314
#include <kj/string-tree.h>
1415
#include <mutex>
@@ -238,6 +239,64 @@ inline char* CharCast(unsigned char* c) { return (char*)c; }
238239
inline const char* CharCast(const char* c) { return c; }
239240
inline const char* CharCast(const unsigned char* c) { return (const char*)c; }
240241

242+
//! Exception that can be thrown from code executing an IPC call that is interrupted.
243+
struct InterruptException final : std::exception {
244+
explicit InterruptException(std::string message) : m_message(std::move(message)) {}
245+
const char* what() const noexcept override { return m_message.c_str(); }
246+
std::string m_message;
247+
};
248+
249+
class CancelProbe;
250+
251+
//! Helper class that detects when a promise is cancelled. Used to detect
252+
//! cancelled requests and prevent potential crashes on unclean disconnects.
253+
//!
254+
//! In the future, this could also be used to support a way for wrapped C++
255+
//! methods to detect cancellation (like approach #4 in
256+
//! https://github.com/bitcoin/bitcoin/issues/33575).
257+
class CancelMonitor
258+
{
259+
public:
260+
inline ~CancelMonitor();
261+
inline void setCancelled(CancelProbe& probe);
262+
263+
bool m_cancelled{false};
264+
std::function<void()> m_on_cancel;
265+
CancelProbe* m_probe{nullptr};
266+
};
267+
268+
//! Helper object to attach to a promise and update a CancelMonitor.
269+
class CancelProbe
270+
{
271+
public:
272+
CancelProbe(CancelMonitor& monitor) : m_monitor(&monitor)
273+
{
274+
assert(!monitor.m_probe);
275+
monitor.m_probe = this;
276+
}
277+
~CancelProbe()
278+
{
279+
if (m_monitor) m_monitor->setCancelled(*this);
280+
}
281+
CancelMonitor* m_monitor;
282+
};
283+
284+
CancelMonitor::~CancelMonitor()
285+
{
286+
if (m_probe) {
287+
assert(m_probe->m_monitor == this);
288+
m_probe->m_monitor = nullptr;
289+
m_probe = nullptr;
290+
}
291+
}
292+
293+
void CancelMonitor::setCancelled(CancelProbe& probe)
294+
{
295+
assert(m_probe == &probe);
296+
m_cancelled = true;
297+
if (m_on_cancel) m_on_cancel();
298+
m_probe = nullptr;
299+
}
241300
} // namespace mp
242301

243302
#endif // MP_UTIL_H

src/mp/gen.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ static void Generate(kj::StringPtr src_prefix,
211211
cpp_server << "#include <kj/async.h>\n";
212212
cpp_server << "#include <kj/common.h>\n";
213213
cpp_server << "#include <kj/exception.h>\n";
214+
cpp_server << "#include <kj/tuple.h>\n";
214215
cpp_server << "#include <mp/proxy.h>\n";
215216
cpp_server << "#include <mp/util.h>\n";
216217
cpp_server << "#include <" << PROXY_TYPES << ">\n";

src/mp/proxy.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ Connection::~Connection()
8787
// event loop thread, and if there was a remote disconnect, this is called
8888
// by an onDisconnect callback directly from the event loop thread.
8989
assert(std::this_thread::get_id() == m_loop->m_thread_id);
90+
91+
// Try to cancel any calls that may be executing.
92+
m_canceler.cancel("Interrupted by disconnect");
93+
9094
// Shut down RPC system first, since this will garbage collect any
9195
// ProxyServer objects that were not freed before the connection was closed.
9296
// Typically all ProxyServer objects associated with this connection will be

0 commit comments

Comments
 (0)