Skip to content

Commit c436ae4

Browse files
committed
Prevent crash on unclean disconnect if abandoned IPC call returns interface pointer
This is a potential fix for bitcoin/bitcoin#34250 which reports that bitcoin node crashes if a rust stratrumv2 mining client calls BlockTemplate.waitNext() and disconnects without waiting for a response from the call, and if mempool fees increased so the call returns a non-null interface BlockTemplate pointer. The node would crash in this case while trying to call MakeProxyServer on the returned BlockTemplate pointer, which would fail because MakeProxyServer would try to use a reference to the Connection object that had been deleted as a as a result of the disconnect. The fix works by: - Adding a Connection::m_canceler member variable and using it to cancel any IPC response promises that are pending when the connection is destroyed. - Updating type-context.h PassField() function to use promise.attach() as described https://capnproto.org/cxxrpc.html#cancellation to detect cancellation and set a ServerContext::m_cancelled variable. - Updating ServerCall to check the ServerContext::m_cancelled status after any C++ server method returns, and throw an exception if it is set. - Updating type-context.h PassField() function to deal with the exception by catching and logging it, and to deal with cancelled status by not trying to fulfill the cancelled promise.
1 parent 8d3efbc commit c436ae4

File tree

7 files changed

+172
-12
lines changed

7 files changed

+172
-12
lines changed

include/mp/proxy-io.h

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,17 @@ struct ServerInvokeContext : InvokeContext
4848
ProxyServer& proxy_server;
4949
CallContext& call_context;
5050
int req;
51+
//! For IPC methods that execute asynchronously, not on the event-loop
52+
//! thread: lock preventing the event-loop thread from freeing the params or
53+
//! results structs while the worker thread is reading params
54+
//! (`call_context.getParams()`) or writing results
55+
//! (`call_context.getResults()`).
56+
Lock* cancel_lock{nullptr};
57+
//! For IPC methods that execute asynchronously, not on the event-loop
58+
//! thread, this is set to true if the IPC call was canceled by the client
59+
//! or canceled by a disconnection. If the call runs on the event-loop
60+
//! thread, it can't be canceled.
61+
std::atomic<bool> canceled{false};
5162

5263
ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
5364
: InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
@@ -421,18 +432,21 @@ class Connection
421432
template <typename F>
422433
void onDisconnect(F&& f)
423434
{
424-
// Add disconnect handler to local TaskSet to ensure it is cancelled and
435+
// Add disconnect handler to local TaskSet to ensure it is canceled and
425436
// will never run after connection object is destroyed. But when disconnect
426437
// handler fires, do not call the function f right away, instead add it
427438
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
428-
// error in cases where f deletes this Connection object.
439+
// error in the typical case where f deletes this Connection object.
429440
m_on_disconnect.add(m_network.onDisconnect().then(
430441
[f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
431442
}
432443

433444
EventLoopRef m_loop;
434445
kj::Own<kj::AsyncIoStream> m_stream;
435446
LoggingErrorHandler m_error_handler{*m_loop};
447+
//! TaskSet used to cancel the m_network.onDisconnect() handler for remote
448+
//! disconnections, if the connection is closed locally first by deleting
449+
//! this Connection object.
436450
kj::TaskSet m_on_disconnect{m_error_handler};
437451
::capnp::TwoPartyVatNetwork m_network;
438452
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
@@ -445,6 +459,11 @@ class Connection
445459
//! ThreadMap.makeThread) used to service requests to clients.
446460
::capnp::CapabilityServerSet<Thread> m_threads;
447461

462+
//! Canceler for canceling promises that we want to discard when the
463+
//! connection is destroyed. This is used to interrupt method calls that are
464+
//! still executing at time of disconnection.
465+
kj::Canceler m_canceler;
466+
448467
//! Cleanup functions to run if connection is broken unexpectedly. List
449468
//! will be empty if all ProxyClient are destroyed cleanly before the
450469
//! connection is destroyed.
@@ -696,9 +715,12 @@ template<typename T, typename Fn>
696715
kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
697716
{
698717
auto ready = kj::newPromiseAndFulfiller<void>(); // Signaled when waiter is idle again.
699-
auto ret = m_thread_ready.then([this, fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller)]() mutable {
718+
auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
719+
CancelMonitor& cancel_monitor = *cancel_monitor_ptr;
720+
auto self = thisCap();
721+
auto ret = m_thread_ready.then([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
700722
auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
701-
bool posted = m_thread_context.waiter->post([this, fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller)]() mutable {
723+
bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
702724
// Fullfill ready.promise now, as soon as the Waiter starts
703725
// executing this lambda, so the next ProxyServer<Thread>::post()
704726
// call can immediately call waiter->post() without needing to wait
@@ -713,8 +735,9 @@ kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
713735
ready_fulfiller = nullptr;
714736
});
715737
std::optional<T> result_value;
716-
kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn()); })};
717-
m_loop->sync([&result_value, &exception, result_fulfiller = kj::mv(result_fulfiller)]() mutable {
738+
kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn(*cancel_monitor_ptr)); })};
739+
m_loop->sync([this, &result_value, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
740+
cancel_monitor_ptr = nullptr;
718741
KJ_IF_MAYBE(e, exception) {
719742
assert(!result_value);
720743
result_fulfiller->reject(kj::mv(*e));
@@ -724,6 +747,7 @@ kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
724747
result_value.reset();
725748
}
726749
result_fulfiller = nullptr;
750+
m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
727751
});
728752
});
729753
// Assert that calling Waiter::post did not fail. It could only return
@@ -732,7 +756,7 @@ kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
732756
// signaled, so this should never happen.
733757
assert(posted);
734758
return kj::mv(result.promise);
735-
});
759+
}).attach(kj::heap<CancelProbe>(cancel_monitor));
736760
m_thread_ready = kj::mv(ready.promise);
737761
return ret;
738762
}

include/mp/proxy-types.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,15 @@ struct ServerCall
445445
template <typename ServerContext, typename... Args>
446446
decltype(auto) invoke(ServerContext& server_context, TypeList<>, Args&&... args) const
447447
{
448+
// If cancel_lock is set, release it while executing the method, and
449+
// reacquire it afterwards. The lock is needed to prevent params and
450+
// response structs from being deleted by the event loop thread if the
451+
// request is cancelled, so it is only needed before and after method
452+
// execution. It is important to release the lock during execution the
453+
// method can take an arbitrarily long time to return and the event loop
454+
// will need the lock itself if the call is canceled.
455+
if (server_context.cancel_lock) server_context.cancel_lock->m_lock.unlock();
456+
KJ_DEFER(if (server_context.cancel_lock) server_context.cancel_lock->m_lock.lock());
448457
return ProxyServerMethodTraits<typename decltype(server_context.call_context.getParams())::Reads>::invoke(
449458
server_context,
450459
std::forward<Args>(args)...);
@@ -469,6 +478,12 @@ struct ServerRet : Parent
469478
void invoke(ServerContext& server_context, TypeList<>, Args&&... args) const
470479
{
471480
auto&& result = Parent::invoke(server_context, TypeList<>(), std::forward<Args>(args)...);
481+
// If IPC request was canceled, there is no point continuing to execute.
482+
// It's also important to stop executing because the connection may have
483+
// been destroyed as described in
484+
// https://github.com/bitcoin/bitcoin/issues/34250 and there would be a
485+
// crash if execution continued.
486+
if (server_context.canceled) throw InterruptException{"canceled"};
472487
auto&& results = server_context.call_context.getResults();
473488
InvokeContext& invoke_context = server_context;
474489
BuildField(TypeList<decltype(result)>(), invoke_context, Make<StructField, Accessor>(results),

include/mp/proxy.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ struct ProxyServerBase : public virtual Interface_::Server
153153
ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection);
154154
virtual ~ProxyServerBase();
155155
void invokeDestroy();
156+
using Interface_::Server::thisCap;
156157

157158
/**
158159
* Implementation pointer that may or may not be owned and deleted when this

include/mp/type-context.h

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,14 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
6363
Context::Reader context_arg = Accessor::get(params);
6464
auto& server = server_context.proxy_server;
6565
int req = server_context.req;
66-
auto invoke = [call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
66+
// Keep a reference to the ProxyServer instance by assigning it to the self
67+
// variable. ProxyServer instances are reference counted and if the client
68+
// drops its reference and the IPC call is cancelled, this reference keeps
69+
// the instance alive until the method finishes executing. The self variable
70+
// needs to be destroyed on the event loop thread so it is freed in a sync()
71+
// call below.
72+
auto self = server.thisCap();
73+
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
6774
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req;
6875
const auto& params = call_context.getParams();
6976
Context::Reader context_arg = Accessor::get(params);
@@ -90,25 +97,58 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
9097
auto& request_threads = thread_context.request_threads;
9198
ConnThread request_thread;
9299
bool inserted;
100+
Mutex cancel_mutex;
101+
Lock cancel_lock{cancel_mutex};
102+
server_context.cancel_lock = &cancel_lock;
93103
server.m_context.loop->sync([&] {
104+
// Detect request being canceled before it executes.
105+
if (cancel_monitor.m_canceled) {
106+
server_context.canceled = true;
107+
return;
108+
}
109+
// Detect request being canceled while it executes.
110+
assert(!cancel_monitor.m_on_cancel);
111+
cancel_monitor.m_on_cancel = [&server, &server_context, &cancel_mutex, req]() {
112+
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled while executing.";
113+
server_context.canceled = true;
114+
// Wait for cancel_mutex to be released by the
115+
// execution thread before returning to the event
116+
// loop and letting parameters be deleted.
117+
Lock{cancel_mutex};
118+
};
119+
// Update requests_threads map if not canceled.
94120
std::tie(request_thread, inserted) = SetThread(
95121
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
96122
[&] { return context_arg.getCallbackThread(); });
97123
});
124+
if (server_context.canceled) {
125+
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed";
126+
return call_context;
127+
}
98128

99129
// If an entry was inserted into the request_threads map,
100130
// remove it after calling fn.invoke. If an entry was not
101131
// inserted, one already existed, meaning this must be a
102132
// recursive call (IPC call calling back to the caller which
103133
// makes another IPC call), so avoid modifying the map.
104134
const bool erase_thread{inserted};
105-
KJ_DEFER(if (erase_thread) {
135+
KJ_DEFER(
136+
// Release the cancel lock before calling loop->sync and
137+
// waiting for the event loop thread because if a
138+
// cancellation happened it will run the on_cancel
139+
// callback, above. It's safe to release cancel_lock
140+
// lock at this point because the fn.invoke() call below
141+
// will be finished and no longer accessing the
142+
// parameters or response structs.
143+
cancel_lock.m_lock.unlock();
106144
// Erase the request_threads entry on the event loop
107145
// thread with loop->sync(), so if the connection is
108146
// broken there is not a race between this thread and
109147
// the disconnect handler trying to destroy the thread
110148
// client object.
111149
server.m_context.loop->sync([&] {
150+
auto self_dispose{kj::mv(self)};
151+
if (erase_thread) {
112152
// Look up the thread again without using existing
113153
// iterator since entry may no longer be there after
114154
// a disconnect. Destroy node after releasing
@@ -120,9 +160,19 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
120160
Lock lock(thread_context.waiter->m_mutex);
121161
removed = request_threads.extract(server.m_context.connection);
122162
}
163+
}
123164
});
124-
});
125-
fn.invoke(server_context, args...);
165+
);
166+
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
167+
try {
168+
fn.invoke(server_context, args...);
169+
} catch (const InterruptException& e) {
170+
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
171+
}
172+
})) {
173+
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " uncaught exception.";
174+
throw exception;
175+
}
126176
}
127177
return call_context;
128178
};
@@ -131,7 +181,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
131181
// be a local Thread::Server object, but it needs to be looked up
132182
// asynchronously with getLocalServer().
133183
auto thread_client = context_arg.getThread();
134-
return server.m_context.connection->m_threads.getLocalServer(thread_client)
184+
auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
135185
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
136186
// Assuming the thread object is found, pass it a pointer to the
137187
// `invoke` lambda above which will invoke the function on that
@@ -147,6 +197,12 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
147197
throw std::runtime_error("invalid thread handle");
148198
}
149199
});
200+
// Use connection m_canceler object to cancel the result promise if the
201+
// connection is destroyed. (By default. Cap'n Proto does not cancel
202+
// requests on disconnect, since it's possible clients could send requests
203+
// and disconnect without waiting for the results and not want those
204+
// requests to be canceled.)
205+
return server.m_context.connection->m_canceler.wrap(kj::mv(result));
150206
}
151207
} // namespace mp
152208

include/mp/util.h

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <cassert>
1010
#include <cstddef>
1111
#include <cstring>
12+
#include <exception>
1213
#include <functional>
1314
#include <kj/string-tree.h>
1415
#include <mutex>
@@ -241,6 +242,64 @@ inline char* CharCast(unsigned char* c) { return (char*)c; }
241242
inline const char* CharCast(const char* c) { return c; }
242243
inline const char* CharCast(const unsigned char* c) { return (const char*)c; }
243244

245+
//! Exception that can be thrown from code executing an IPC call that is interrupted.
246+
struct InterruptException final : std::exception {
247+
explicit InterruptException(std::string message) : m_message(std::move(message)) {}
248+
const char* what() const noexcept override { return m_message.c_str(); }
249+
std::string m_message;
250+
};
251+
252+
class CancelProbe;
253+
254+
//! Helper class that detects when a promise is canceled. Used to detect
255+
//! canceled requests and prevent potential crashes on unclean disconnects.
256+
//!
257+
//! In the future, this could also be used to support a way for wrapped C++
258+
//! methods to detect cancellation (like approach #4 in
259+
//! https://github.com/bitcoin/bitcoin/issues/33575).
260+
class CancelMonitor
261+
{
262+
public:
263+
inline ~CancelMonitor();
264+
inline void setCancelled(CancelProbe& probe);
265+
266+
bool m_canceled{false};
267+
std::function<void()> m_on_cancel;
268+
CancelProbe* m_probe{nullptr};
269+
};
270+
271+
//! Helper object to attach to a promise and update a CancelMonitor.
272+
class CancelProbe
273+
{
274+
public:
275+
CancelProbe(CancelMonitor& monitor) : m_monitor(&monitor)
276+
{
277+
assert(!monitor.m_probe);
278+
monitor.m_probe = this;
279+
}
280+
~CancelProbe()
281+
{
282+
if (m_monitor) m_monitor->setCancelled(*this);
283+
}
284+
CancelMonitor* m_monitor;
285+
};
286+
287+
CancelMonitor::~CancelMonitor()
288+
{
289+
if (m_probe) {
290+
assert(m_probe->m_monitor == this);
291+
m_probe->m_monitor = nullptr;
292+
m_probe = nullptr;
293+
}
294+
}
295+
296+
void CancelMonitor::setCancelled(CancelProbe& probe)
297+
{
298+
assert(m_probe == &probe);
299+
m_canceled = true;
300+
if (m_on_cancel) m_on_cancel();
301+
m_probe = nullptr;
302+
}
244303
} // namespace mp
245304

246305
#endif // MP_UTIL_H

src/mp/gen.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ static void Generate(kj::StringPtr src_prefix,
211211
cpp_server << "#include <kj/async.h>\n";
212212
cpp_server << "#include <kj/common.h>\n";
213213
cpp_server << "#include <kj/exception.h>\n";
214+
cpp_server << "#include <kj/tuple.h>\n";
214215
cpp_server << "#include <mp/proxy.h>\n";
215216
cpp_server << "#include <mp/util.h>\n";
216217
cpp_server << "#include <" << PROXY_TYPES << ">\n";

src/mp/proxy.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ Connection::~Connection()
8787
// event loop thread, and if there was a remote disconnect, this is called
8888
// by an onDisconnect callback directly from the event loop thread.
8989
assert(std::this_thread::get_id() == m_loop->m_thread_id);
90+
91+
// Try to cancel any calls that may be executing.
92+
m_canceler.cancel("Interrupted by disconnect");
93+
9094
// Shut down RPC system first, since this will garbage collect any
9195
// ProxyServer objects that were not freed before the connection was closed.
9296
// Typically all ProxyServer objects associated with this connection will be

0 commit comments

Comments
 (0)