Skip to content

Commit 11c023e

Browse files
committed
Prevent crash on unclean disconnect if abandoned IPC call returns interface pointer
This is a fix for bitcoin/bitcoin#34250 which reports that bitcoin node crashes if a rust stratrumv2 mining client calls BlockTemplate.waitNext() and disconnects without waiting for a response from the call, and if mempool fees increased so the call returns a non-null interface BlockTemplate pointer. The node would crash in this case while trying to call MakeProxyServer on the returned BlockTemplate pointer, which would fail because MakeProxyServer would try to use a reference to the Connection object that had been deleted as a as a result of the disconnect. The fix works by: - Adding a Connection::m_canceler member variable and using it to cancel any IPC response promises that are pending when the connection is destroyed. - Updating type-context.h PassField() function to use promise.attach() as described https://capnproto.org/cxxrpc.html#cancellation to detect cancellation and set a ServerContext::m_canceled variable. - Updating ServerCall to check the ServerContext::m_canceled status after any C++ server method returns, and throw an exception if it is set. - Updating type-context.h PassField() function to deal with the exception by catching and logging it, and to deal with canceled status by not trying to fulfill the canceled promise.
1 parent 8d3efbc commit 11c023e

File tree

7 files changed

+199
-22
lines changed

7 files changed

+199
-22
lines changed

include/mp/proxy-io.h

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,17 @@ struct ServerInvokeContext : InvokeContext
4848
ProxyServer& proxy_server;
4949
CallContext& call_context;
5050
int req;
51+
//! For IPC methods that execute asynchronously, not on the event-loop
52+
//! thread: lock preventing the event-loop thread from freeing the params or
53+
//! results structs if the request is canceled while the worker thread is
54+
//! reading params (`call_context.getParams()`) or writing results
55+
//! (`call_context.getResults()`).
56+
Lock* cancel_lock{nullptr};
57+
//! For IPC methods that execute asynchronously, not on the event-loop
58+
//! thread, this is set to true if the IPC call was canceled by the client
59+
//! or canceled by a disconnection. If the call runs on the event-loop
60+
//! thread, it can't be canceled.
61+
bool request_canceled{false};
5162

5263
ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
5364
: InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
@@ -421,18 +432,21 @@ class Connection
421432
template <typename F>
422433
void onDisconnect(F&& f)
423434
{
424-
// Add disconnect handler to local TaskSet to ensure it is cancelled and
435+
// Add disconnect handler to local TaskSet to ensure it is canceled and
425436
// will never run after connection object is destroyed. But when disconnect
426437
// handler fires, do not call the function f right away, instead add it
427438
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
428-
// error in cases where f deletes this Connection object.
439+
// error in the typical case where f deletes this Connection object.
429440
m_on_disconnect.add(m_network.onDisconnect().then(
430441
[f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
431442
}
432443

433444
EventLoopRef m_loop;
434445
kj::Own<kj::AsyncIoStream> m_stream;
435446
LoggingErrorHandler m_error_handler{*m_loop};
447+
//! TaskSet used to cancel the m_network.onDisconnect() handler for remote
448+
//! disconnections, if the connection is closed locally first by deleting
449+
//! this Connection object.
436450
kj::TaskSet m_on_disconnect{m_error_handler};
437451
::capnp::TwoPartyVatNetwork m_network;
438452
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
@@ -445,6 +459,11 @@ class Connection
445459
//! ThreadMap.makeThread) used to service requests to clients.
446460
::capnp::CapabilityServerSet<Thread> m_threads;
447461

462+
//! Canceler for canceling promises that we want to discard when the
463+
//! connection is destroyed. This is used to interrupt method calls that are
464+
//! still executing at time of disconnection.
465+
kj::Canceler m_canceler;
466+
448467
//! Cleanup functions to run if connection is broken unexpectedly. List
449468
//! will be empty if all ProxyClient are destroyed cleanly before the
450469
//! connection is destroyed.
@@ -696,25 +715,33 @@ template<typename T, typename Fn>
696715
kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
697716
{
698717
auto ready = kj::newPromiseAndFulfiller<void>(); // Signaled when waiter is idle again.
699-
auto ret = m_thread_ready.then([this, fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller)]() mutable {
718+
auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
719+
CancelMonitor& cancel_monitor = *cancel_monitor_ptr;
720+
// Keep a reference to the ProxyServer<Thread> instance by assigning it to
721+
// the self variable. ProxyServer instances are reference-counted and if the
722+
// client drops its reference, this variable keeps the instance alive until
723+
// the thread finishes executing. The self variable needs to be destroyed on
724+
// the event loop thread so it is freed in a sync() call below.
725+
auto self = thisCap();
726+
auto ret = m_thread_ready.then([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
700727
auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
701-
bool posted = m_thread_context.waiter->post([this, fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller)]() mutable {
702-
// Fullfill ready.promise now, as soon as the Waiter starts
703-
// executing this lambda, so the next ProxyServer<Thread>::post()
704-
// call can immediately call waiter->post() without needing to wait
705-
// again. It is important to do this before calling fn() because
706-
// fn() can make an IPC call back to the client, which can make
707-
// another IPC call to this server thread. (This typically happens
708-
// when IPC methods take std::function parameters.) When this
709-
// happens the second call to the server thread should not be
710-
// blocked waiting for the first call.
728+
bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
729+
// Fulfill ready.promise now, as soon as the Waiter starts executing
730+
// this lambda, so the next ProxyServer<Thread>::post() call can
731+
// immediately call waiter->post(). It is important to do this
732+
// before calling fn() because fn() can make an IPC call back to the
733+
// client, which can make another IPC call to this server thread.
734+
// (This typically happens when IPC methods take std::function
735+
// parameters.) When this happens the second call to the server
736+
// thread should not be blocked waiting for the first call.
711737
m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable {
712738
ready_fulfiller->fulfill();
713739
ready_fulfiller = nullptr;
714740
});
715741
std::optional<T> result_value;
716-
kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn()); })};
717-
m_loop->sync([&result_value, &exception, result_fulfiller = kj::mv(result_fulfiller)]() mutable {
742+
kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn(*cancel_monitor_ptr)); })};
743+
m_loop->sync([this, &result_value, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
744+
cancel_monitor_ptr = nullptr;
718745
KJ_IF_MAYBE(e, exception) {
719746
assert(!result_value);
720747
result_fulfiller->reject(kj::mv(*e));
@@ -724,6 +751,11 @@ kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
724751
result_value.reset();
725752
}
726753
result_fulfiller = nullptr;
754+
// Use evalLater to destroy the ProxyServer<Thread> self
755+
// reference, if it is the last reference, because the
756+
// ProxyServer<Thread> destructor needs to join the thread,
757+
// which can't happen until this sync() block has exited.
758+
m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
727759
});
728760
});
729761
// Assert that calling Waiter::post did not fail. It could only return
@@ -732,7 +764,7 @@ kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
732764
// signaled, so this should never happen.
733765
assert(posted);
734766
return kj::mv(result.promise);
735-
});
767+
}).attach(kj::heap<CancelProbe>(cancel_monitor));
736768
m_thread_ready = kj::mv(ready.promise);
737769
return ret;
738770
}

include/mp/proxy-types.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,23 @@ struct ServerCall
445445
template <typename ServerContext, typename... Args>
446446
decltype(auto) invoke(ServerContext& server_context, TypeList<>, Args&&... args) const
447447
{
448+
// If cancel_lock is set, release it while executing the method, and
449+
// reacquire it afterwards. The lock is needed to prevent params and
450+
// response structs from being deleted by the event loop thread if the
451+
// request is canceled, so it is only needed before and after method
452+
// execution. It is important to release the lock during execution
453+
// because the method can take arbitrarily long to return and the event
454+
// loop will need the lock itself in on_cancel if the call is canceled.
455+
if (server_context.cancel_lock) server_context.cancel_lock->m_lock.unlock();
456+
KJ_DEFER(
457+
if (server_context.cancel_lock) server_context.cancel_lock->m_lock.lock();
458+
// If the IPC request was canceled, there is no point continuing to
459+
// execute. It's also important to stop executing because the
460+
// connection may have been destroyed as described in
461+
// https://github.com/bitcoin/bitcoin/issues/34250 and there would
462+
// be a crash if execution continued.
463+
if (server_context.request_canceled) throw InterruptException{"canceled"};
464+
);
448465
return ProxyServerMethodTraits<typename decltype(server_context.call_context.getParams())::Reads>::invoke(
449466
server_context,
450467
std::forward<Args>(args)...);

include/mp/proxy.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ struct ProxyServerBase : public virtual Interface_::Server
153153
ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection);
154154
virtual ~ProxyServerBase();
155155
void invokeDestroy();
156+
using Interface_::Server::thisCap;
156157

157158
/**
158159
* Implementation pointer that may or may not be owned and deleted when this

include/mp/type-context.h

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,14 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
6363
Context::Reader context_arg = Accessor::get(params);
6464
auto& server = server_context.proxy_server;
6565
int req = server_context.req;
66-
auto invoke = [call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
66+
// Keep a reference to the ProxyServer instance by assigning it to the self
67+
// variable. ProxyServer instances are reference-counted and if the client
68+
// drops its reference and the IPC call is canceled, this variable keeps the
69+
// instance alive until the method finishes executing. The self variable
70+
// needs to be destroyed on the event loop thread so it is freed in a sync()
71+
// call below.
72+
auto self = server.thisCap();
73+
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
6774
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req;
6875
const auto& params = call_context.getParams();
6976
Context::Reader context_arg = Accessor::get(params);
@@ -90,25 +97,62 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
9097
auto& request_threads = thread_context.request_threads;
9198
ConnThread request_thread;
9299
bool inserted;
100+
Mutex cancel_mutex;
101+
Lock cancel_lock{cancel_mutex};
102+
server_context.cancel_lock = &cancel_lock;
93103
server.m_context.loop->sync([&] {
104+
// Detect request being canceled before it executes.
105+
if (cancel_monitor.m_canceled) {
106+
server_context.request_canceled = true;
107+
return;
108+
}
109+
// Detect request being canceled while it executes.
110+
assert(!cancel_monitor.m_on_cancel);
111+
cancel_monitor.m_on_cancel = [&server, &server_context, &cancel_mutex, req]() {
112+
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled while executing.";
113+
// Lock cancel_mutex here to block the event loop
114+
// thread and prevent it from deleting the request's
115+
// params and response structs if the execution
116+
// thread is currently accessing them. Because the
117+
// lock is released here before the event loop
118+
// thread does delete the structs, the lock does not
119+
// provide protection against the event loop
120+
// deleting the structs _before_ the execution
121+
// thread acquires it. So in addition to acquiring
122+
// the lock, the execution thread always checks
123+
// request_canceled after acquiring it to check if
124+
// it is still safe to use the structs.
125+
Lock{cancel_mutex};
126+
server_context.request_canceled = true;
127+
};
128+
// Update requests_threads map if not canceled.
94129
std::tie(request_thread, inserted) = SetThread(
95130
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
96131
[&] { return context_arg.getCallbackThread(); });
97132
});
98-
99133
// If an entry was inserted into the request_threads map,
100134
// remove it after calling fn.invoke. If an entry was not
101135
// inserted, one already existed, meaning this must be a
102136
// recursive call (IPC call calling back to the caller which
103137
// makes another IPC call), so avoid modifying the map.
104138
const bool erase_thread{inserted};
105-
KJ_DEFER(if (erase_thread) {
139+
KJ_DEFER(
140+
// Release the cancel lock before calling loop->sync and
141+
// waiting for the event loop thread, because if a
142+
// cancellation happened, it needs to run the on_cancel
143+
// callback above. It's safe to release cancel_lock at
144+
// this point because the fn.invoke() call below will be
145+
// finished and no longer accessing the parameters or
146+
// response structs.
147+
cancel_lock.m_lock.unlock();
106148
// Erase the request_threads entry on the event loop
107149
// thread with loop->sync(), so if the connection is
108150
// broken there is not a race between this thread and
109151
// the disconnect handler trying to destroy the thread
110152
// client object.
111153
server.m_context.loop->sync([&] {
154+
auto self_dispose{kj::mv(self)};
155+
if (erase_thread) {
112156
// Look up the thread again without using existing
113157
// iterator since entry may no longer be there after
114158
// a disconnect. Destroy node after releasing
@@ -120,9 +164,22 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
120164
Lock lock(thread_context.waiter->m_mutex);
121165
removed = request_threads.extract(server.m_context.connection);
122166
}
167+
}
123168
});
124-
});
125-
fn.invoke(server_context, args...);
169+
);
170+
if (server_context.request_canceled) {
171+
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed";
172+
} else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
173+
try {
174+
fn.invoke(server_context, args...);
175+
} catch (const InterruptException& e) {
176+
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
177+
}
178+
})) {
179+
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " uncaught exception.";
180+
throw exception;
181+
}
182+
// End of scope: if KJ_DEFER was reached, it runs here
126183
}
127184
return call_context;
128185
};
@@ -131,7 +188,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
131188
// be a local Thread::Server object, but it needs to be looked up
132189
// asynchronously with getLocalServer().
133190
auto thread_client = context_arg.getThread();
134-
return server.m_context.connection->m_threads.getLocalServer(thread_client)
191+
auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
135192
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
136193
// Assuming the thread object is found, pass it a pointer to the
137194
// `invoke` lambda above which will invoke the function on that
@@ -147,6 +204,12 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
147204
throw std::runtime_error("invalid thread handle");
148205
}
149206
});
207+
// Use connection m_canceler object to cancel the result promise if the
208+
// connection is destroyed. (By default. Cap'n Proto does not cancel
209+
// requests on disconnect, since it's possible clients could send requests
210+
// and disconnect without waiting for the results and not want those
211+
// requests to be canceled.)
212+
return server.m_context.connection->m_canceler.wrap(kj::mv(result));
150213
}
151214
} // namespace mp
152215

include/mp/util.h

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <cassert>
1010
#include <cstddef>
1111
#include <cstring>
12+
#include <exception>
1213
#include <functional>
1314
#include <kj/string-tree.h>
1415
#include <mutex>
@@ -241,6 +242,64 @@ inline char* CharCast(unsigned char* c) { return (char*)c; }
241242
inline const char* CharCast(const char* c) { return c; }
242243
inline const char* CharCast(const unsigned char* c) { return (const char*)c; }
243244

245+
//! Exception that can be thrown from code executing an IPC call that is interrupted.
246+
struct InterruptException final : std::exception {
247+
explicit InterruptException(std::string message) : m_message(std::move(message)) {}
248+
const char* what() const noexcept override { return m_message.c_str(); }
249+
std::string m_message;
250+
};
251+
252+
class CancelProbe;
253+
254+
//! Helper class that detects when a promise is canceled. Used to detect
255+
//! canceled requests and prevent potential crashes on unclean disconnects.
256+
//!
257+
//! In the future, this could also be used to support a way for wrapped C++
258+
//! methods to detect cancellation (like approach #4 in
259+
//! https://github.com/bitcoin/bitcoin/issues/33575).
260+
class CancelMonitor
261+
{
262+
public:
263+
inline ~CancelMonitor();
264+
inline void setCanceled(CancelProbe& probe);
265+
266+
bool m_canceled{false};
267+
std::function<void()> m_on_cancel;
268+
CancelProbe* m_probe{nullptr};
269+
};
270+
271+
//! Helper object to attach to a promise and update a CancelMonitor.
272+
class CancelProbe
273+
{
274+
public:
275+
CancelProbe(CancelMonitor& monitor) : m_monitor(&monitor)
276+
{
277+
assert(!monitor.m_probe);
278+
monitor.m_probe = this;
279+
}
280+
~CancelProbe()
281+
{
282+
if (m_monitor) m_monitor->setCanceled(*this);
283+
}
284+
CancelMonitor* m_monitor;
285+
};
286+
287+
CancelMonitor::~CancelMonitor()
288+
{
289+
if (m_probe) {
290+
assert(m_probe->m_monitor == this);
291+
m_probe->m_monitor = nullptr;
292+
m_probe = nullptr;
293+
}
294+
}
295+
296+
void CancelMonitor::setCanceled(CancelProbe& probe)
297+
{
298+
assert(m_probe == &probe);
299+
m_canceled = true;
300+
if (m_on_cancel) m_on_cancel();
301+
m_probe = nullptr;
302+
}
244303
} // namespace mp
245304

246305
#endif // MP_UTIL_H

0 commit comments

Comments
 (0)