@@ -48,6 +48,19 @@ struct ServerInvokeContext : InvokeContext
4848 ProxyServer& proxy_server;
4949 CallContext& call_context;
5050 int req;
51+ // ! For IPC methods that execute asynchronously, not on the event-loop
52+ // ! thread: lock preventing the event-loop thread from freeing the params or
53+ // ! results structs if the request is canceled while the worker thread is
54+ // ! reading params (`call_context.getParams()`) or writing results
55+ // ! (`call_context.getResults()`).
56+ Lock* cancel_lock{nullptr };
57+ // ! For IPC methods that execute asynchronously, not on the event-loop
58+ // ! thread, this is set to true if the IPC call was canceled by the client
59+ // ! or canceled by a disconnection. If the call runs on the event-loop
60+ // ! thread, it can't be canceled. This should be accessed with cancel_lock
61+ // ! held if it is not null, since in the asynchronous case it is accessed
62+ // ! from multiple threads.
63+ bool request_canceled{false };
5164
5265 ServerInvokeContext (ProxyServer& proxy_server, CallContext& call_context, int req)
5366 : InvokeContext{*proxy_server.m_context .connection }, proxy_server{proxy_server}, call_context{call_context}, req{req}
@@ -82,11 +95,23 @@ template <>
8295struct ProxyServer <Thread> final : public Thread::Server
8396{
8497public:
85- ProxyServer (ThreadContext& thread_context, std::thread&& thread);
98+ ProxyServer (Connection& connection, ThreadContext& thread_context, std::thread&& thread);
8699 ~ProxyServer ();
87100 kj::Promise<void > getName (GetNameContext context) override ;
101+
102+ // ! Run a callback function fn returning T on this thread. The function will
103+ // ! be queued and executed as soon as the thread is idle, and when fn
104+ // ! returns, the promise returned by this method will be fulfilled with the
105+ // ! value fn returned.
106+ template <typename T, typename Fn>
107+ kj::Promise<T> post (Fn&& fn);
108+
109+ EventLoopRef m_loop;
88110 ThreadContext& m_thread_context;
89111 std::thread m_thread;
112+ // ! Promise signaled when m_thread_context.waiter is ready and there is no
113+ // ! post() callback function waiting to execute.
114+ kj::Promise<void > m_thread_ready{kj::READY_NOW};
90115};
91116
92117// ! Handler for kj::TaskSet failed task events.
@@ -322,7 +347,12 @@ class EventLoop
322347// ! thread is blocked waiting for server response, this is what allows the
323348// ! client to run the request in the same thread, the same way code would run in a
324349// ! single process, with the callback sharing the same thread stack as the original
325- // ! call.)
350+ // ! call.) To support this, the clientInvoke function calls Waiter::wait() to
351+ // ! block the client IPC thread while initial request is in progress. Then if
352+ // ! there is a callback, it is executed with Waiter::post().
353+ // !
354+ // ! The Waiter class is also used server-side by `ProxyServer<Thread>::post()`
355+ // ! to execute IPC calls on worker threads.
326356struct Waiter
327357{
328358 Waiter () = default ;
@@ -404,18 +434,21 @@ class Connection
404434 template <typename F>
405435 void onDisconnect (F&& f)
406436 {
407- // Add disconnect handler to local TaskSet to ensure it is cancelled and
437+ // Add disconnect handler to local TaskSet to ensure it is canceled and
408438 // will never run after connection object is destroyed. But when disconnect
409439 // handler fires, do not call the function f right away, instead add it
410440 // to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
411- // error in cases where f deletes this Connection object.
441+ // error in the typical case where f deletes this Connection object.
412442 m_on_disconnect.add (m_network.onDisconnect ().then (
413443 [f = std::forward<F>(f), this ]() mutable { m_loop->m_task_set ->add (kj::evalLater (kj::mv (f))); }));
414444 }
415445
416446 EventLoopRef m_loop;
417447 kj::Own<kj::AsyncIoStream> m_stream;
418448 LoggingErrorHandler m_error_handler{*m_loop};
449+ // ! TaskSet used to cancel the m_network.onDisconnect() handler for remote
450+ // ! disconnections, if the connection is closed locally first by deleting
451+ // ! this Connection object.
419452 kj::TaskSet m_on_disconnect{m_error_handler};
420453 ::capnp::TwoPartyVatNetwork m_network;
421454 std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
@@ -428,6 +461,11 @@ class Connection
428461 // ! ThreadMap.makeThread) used to service requests to clients.
429462 ::capnp::CapabilityServerSet<Thread> m_threads;
430463
464+ // ! Canceler for canceling promises that we want to discard when the
465+ // ! connection is destroyed. This is used to interrupt method calls that are
466+ // ! still executing at time of disconnection.
467+ kj::Canceler m_canceler;
468+
431469 // ! Cleanup functions to run if connection is broken unexpectedly. List
432470 // ! will be empty if all ProxyClient are destroyed cleanly before the
433471 // ! connection is destroyed.
@@ -675,6 +713,70 @@ struct ThreadContext
675713 bool loop_thread = false ;
676714};
677715
716+ template <typename T, typename Fn>
717+ kj::Promise<T> ProxyServer<Thread>::post (Fn&& fn)
718+ {
719+ auto ready = kj::newPromiseAndFulfiller<void >(); // Signaled when waiter is ready to post again.
720+ auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
721+ CancelMonitor& cancel_monitor = *cancel_monitor_ptr;
722+ // Keep a reference to the ProxyServer<Thread> instance by assigning it to
723+ // the self variable. ProxyServer instances are reference-counted and if the
724+ // client drops its reference, this variable keeps the instance alive until
725+ // the thread finishes executing. The self variable needs to be destroyed on
726+ // the event loop thread so it is freed in a sync() call below.
727+ auto self = thisCap ();
728+ auto ret = m_thread_ready.then ([this , self = std::move (self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv (ready.fulfiller ), cancel_monitor_ptr = kj::mv (cancel_monitor_ptr)]() mutable {
729+ auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
730+ bool posted = m_thread_context.waiter ->post ([this , self = std::move (self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv (ready_fulfiller), result_fulfiller = kj::mv (result.fulfiller ), cancel_monitor_ptr = kj::mv (cancel_monitor_ptr)]() mutable {
731+ // Fulfill ready.promise now, as soon as the Waiter starts executing
732+ // this lambda, so the next ProxyServer<Thread>::post() call can
733+ // immediately call waiter->post(). It is important to do this
734+ // before calling fn() because fn() can make an IPC call back to the
735+ // client, which can make another IPC call to this server thread.
736+ // (This typically happens when IPC methods take std::function
737+ // parameters.) When this happens the second call to the server
738+ // thread should not be blocked waiting for the first call.
739+ m_loop->sync ([ready_fulfiller = kj::mv (ready_fulfiller)]() mutable {
740+ ready_fulfiller->fulfill ();
741+ ready_fulfiller = nullptr ;
742+ });
743+ std::optional<T> result_value;
744+ kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions ([&]{ result_value.emplace (fn (*cancel_monitor_ptr)); })};
745+ m_loop->sync ([this , &result_value, &exception, self = kj::mv (self), result_fulfiller = kj::mv (result_fulfiller), cancel_monitor_ptr = kj::mv (cancel_monitor_ptr)]() mutable {
746+ // Destroy CancelMonitor here before fulfilling or rejecting the
747+ // promise so it doesn't get triggered when the promise is
748+ // destroyed.
749+ cancel_monitor_ptr = nullptr ;
750+ // Send results to the fulfiller. Technically it would be ok to
751+ // skip this if promise was canceled, but it's simpler to just
752+ // do it unconditionally.
753+ KJ_IF_MAYBE (e, exception) {
754+ assert (!result_value);
755+ result_fulfiller->reject (kj::mv (*e));
756+ } else {
757+ assert (result_value);
758+ result_fulfiller->fulfill (kj::mv (*result_value));
759+ result_value.reset ();
760+ }
761+ result_fulfiller = nullptr ;
762+ // Use evalLater to destroy the ProxyServer<Thread> self
763+ // reference, if it is the last reference, because the
764+ // ProxyServer<Thread> destructor needs to join the thread,
765+ // which can't happen until this sync() block has exited.
766+ m_loop->m_task_set ->add (kj::evalLater ([self = kj::mv (self)] {}));
767+ });
768+ });
769+ // Assert that calling Waiter::post did not fail. It could only return
770+ // false if a new function was posted before the previous one finished
771+ // executing, but new functions are only posted when m_thread_ready is
772+ // signaled, so this should never happen.
773+ assert (posted);
774+ return kj::mv (result.promise );
775+ }).attach (kj::heap<CancelProbe>(cancel_monitor));
776+ m_thread_ready = kj::mv (ready.promise );
777+ return ret;
778+ }
779+
678780// ! Given stream file descriptor, make a new ProxyClient object to send requests
679781// ! over the stream. Also create a new Connection object embedded in the
680782// ! client that is freed when the client is closed.
0 commit comments