Skip to content

Commit bfc732e

Browse files
authored
[orc-rt] Add ControllerAccess interface. (#169598)
ControllerAccess provides an abstract interface for bidirectional RPC between the executor (running JIT'd code) and the controller (containing the llvm::orc::ExecutionSession). ControllerAccess implementations are expected to implement IPC / RPC using a concrete communication method (shared memory, pipes, sockets, native system IPC, etc). Calls from executor to controller are made via callController, with "handler tags" (addresses in the executor) specifying the target handler in the controller. A handler must be associated in the controller with the given tag for the call to succeed. This ensures that only registered entry points in the controller can be used, and avoids leaking controller addresses into the executor. Calls in both directions are to "wrapper functions" that take a buffer of bytes as input and return a buffer of bytes as output. In the ORC runtime these must be `orc_rt_WrapperFunction`s (see Session::handleWrapperCall). The interpretation of the byte buffers is up to the wrapper functions: the ORC runtime imposes no restrictions on how the bytes are to be interpreted. ControllerAccess objects may be detached from the Session prior to Session shutdown, in which case no further calls may be made in either direction, and any pending results (from calls made that haven't returned yet) should return errors. If the ControllerAccess class is still attached at Session shutdown time it will be detached as part of the shutdown process. The ControllerAccess::disconnect method must support concurrent entry on multiple threads, and all callers must block until they can guarantee that no further calls will be received or accepted.
1 parent 222ba6f commit bfc732e

File tree

3 files changed

+428
-9
lines changed

3 files changed

+428
-9
lines changed

orc-rt/include/orc-rt/Session.h

Lines changed: 118 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,97 @@
1616
#include "orc-rt/Error.h"
1717
#include "orc-rt/ResourceManager.h"
1818
#include "orc-rt/TaskDispatcher.h"
19+
#include "orc-rt/WrapperFunction.h"
1920
#include "orc-rt/move_only_function.h"
2021

2122
#include "orc-rt-c/CoreTypes.h"
23+
#include "orc-rt-c/WrapperFunction.h"
2224

25+
#include <cassert>
2326
#include <condition_variable>
2427
#include <memory>
2528
#include <mutex>
2629
#include <vector>
2730

2831
namespace orc_rt {
2932

33+
class Session;
34+
35+
inline orc_rt_SessionRef wrap(Session *S) noexcept {
36+
return reinterpret_cast<orc_rt_SessionRef>(S);
37+
}
38+
39+
inline Session *unwrap(orc_rt_SessionRef S) noexcept {
40+
return reinterpret_cast<Session *>(S);
41+
}
42+
3043
/// Represents an ORC executor Session.
3144
class Session {
3245
public:
3346
using ErrorReporterFn = move_only_function<void(Error)>;
3447
using OnShutdownCompleteFn = move_only_function<void()>;
3548

49+
using HandlerTag = void *;
50+
using OnCallHandlerCompleteFn =
51+
move_only_function<void(WrapperFunctionBuffer)>;
52+
53+
/// Provides access to the controller.
54+
class ControllerAccess {
55+
friend class Session;
56+
57+
public:
58+
virtual ~ControllerAccess();
59+
60+
protected:
61+
using HandlerTag = Session::HandlerTag;
62+
using OnCallHandlerCompleteFn = Session::OnCallHandlerCompleteFn;
63+
64+
ControllerAccess(Session &S) : S(&S) {}
65+
66+
/// Called by the Session to disconnect the session with the Controller.
67+
///
68+
/// disconnect implementations must support concurrent entry on multiple
69+
/// threads, and all calls must block until the disconnect operation is
70+
/// complete.
71+
///
72+
/// Once disconnect completes, implementations should make no further
73+
/// calls to the Session, and should ignore any calls from the session
74+
/// (implementations are free to ignore any calls from the Session after
75+
/// disconnect is called).
76+
virtual void disconnect() = 0;
77+
78+
/// Report an error to the session.
79+
void reportError(Error Err) {
80+
assert(S && "Already disconnected");
81+
S->reportError(std::move(Err));
82+
}
83+
84+
/// Call the handler in the controller associated with the given tag.
85+
virtual void callController(OnCallHandlerCompleteFn OnComplete,
86+
HandlerTag T,
87+
WrapperFunctionBuffer ArgBytes) = 0;
88+
89+
/// Send the result of the given wrapper function call to the controller.
90+
virtual void sendWrapperResult(uint64_t CallId,
91+
WrapperFunctionBuffer ResultBytes) = 0;
92+
93+
/// Ask the Session to run the given wrapper function.
94+
///
95+
/// Subclasses must not call this method after disconnect returns.
96+
void handleWrapperCall(uint64_t CallId, orc_rt_WrapperFunction Fn,
97+
WrapperFunctionBuffer ArgBytes) {
98+
assert(S && "Already disconnected");
99+
S->handleWrapperCall(CallId, Fn, std::move(ArgBytes));
100+
}
101+
102+
private:
103+
void doDisconnect() {
104+
disconnect();
105+
S = nullptr;
106+
}
107+
Session *S;
108+
};
109+
36110
/// Create a session object. The ReportError function will be called to
37111
/// report errors generated while serving JIT'd code, e.g. if a memory
38112
/// management request cannot be fulfilled. (Error's within the JIT'd
@@ -69,9 +143,22 @@ class Session {
69143
void waitForShutdown();
70144

71145
/// Add a ResourceManager to the session.
72-
void addResourceManager(std::unique_ptr<ResourceManager> RM) {
73-
std::scoped_lock<std::mutex> Lock(M);
74-
ResourceMgrs.push_back(std::move(RM));
146+
void addResourceManager(std::unique_ptr<ResourceManager> RM);
147+
148+
/// Set the ControllerAccess object.
149+
void setController(std::shared_ptr<ControllerAccess> CA);
150+
151+
/// Disconnect the ControllerAccess object.
152+
void detachFromController();
153+
154+
void callController(OnCallHandlerCompleteFn OnComplete, HandlerTag T,
155+
WrapperFunctionBuffer ArgBytes) {
156+
if (auto TmpCA = CA)
157+
CA->callController(std::move(OnComplete), T, std::move(ArgBytes));
158+
else
159+
OnComplete(
160+
WrapperFunctionBuffer::createOutOfBandError("no controller attached")
161+
.release());
75162
}
76163

77164
private:
@@ -85,21 +172,43 @@ class Session {
85172
void shutdownNext(Error Err);
86173
void shutdownComplete();
87174

175+
void handleWrapperCall(uint64_t CallId, orc_rt_WrapperFunction Fn,
176+
WrapperFunctionBuffer ArgBytes) {
177+
dispatch(makeGenericTask([=, ArgBytes = std::move(ArgBytes)]() mutable {
178+
Fn(wrap(this), CallId, wrapperReturn, ArgBytes.release());
179+
}));
180+
}
181+
182+
void sendWrapperResult(uint64_t CallId, WrapperFunctionBuffer ResultBytes) {
183+
if (auto TmpCA = CA)
184+
TmpCA->sendWrapperResult(CallId, std::move(ResultBytes));
185+
}
186+
187+
static void wrapperReturn(orc_rt_SessionRef S, uint64_t CallId,
188+
orc_rt_WrapperFunctionBuffer ResultBytes);
189+
88190
std::unique_ptr<TaskDispatcher> Dispatcher;
191+
std::shared_ptr<ControllerAccess> CA;
89192
ErrorReporterFn ReportError;
90193

91194
std::mutex M;
92195
std::vector<std::unique_ptr<ResourceManager>> ResourceMgrs;
93196
std::unique_ptr<ShutdownInfo> SI;
94197
};
95198

96-
inline orc_rt_SessionRef wrap(Session *S) noexcept {
97-
return reinterpret_cast<orc_rt_SessionRef>(S);
98-
}
199+
class CallViaSession {
200+
public:
201+
CallViaSession(Session &S, Session::HandlerTag T) : S(S), T(T) {}
99202

100-
inline Session *unwrap(orc_rt_SessionRef S) noexcept {
101-
return reinterpret_cast<Session *>(S);
102-
}
203+
void operator()(Session::OnCallHandlerCompleteFn &&HandleResult,
204+
WrapperFunctionBuffer ArgBytes) {
205+
S.callController(std::move(HandleResult), T, std::move(ArgBytes));
206+
}
207+
208+
private:
209+
Session &S;
210+
Session::HandlerTag T;
211+
};
103212

104213
} // namespace orc_rt
105214

orc-rt/lib/executor/Session.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,14 @@
1414

1515
namespace orc_rt {
1616

17+
Session::ControllerAccess::~ControllerAccess() = default;
18+
1719
Session::~Session() { waitForShutdown(); }
1820

1921
void Session::shutdown(OnShutdownCompleteFn OnShutdownComplete) {
22+
// Safe to call concurrently / redundantly.
23+
detachFromController();
24+
2025
{
2126
std::scoped_lock<std::mutex> Lock(M);
2227
if (SI) {
@@ -38,6 +43,27 @@ void Session::waitForShutdown() {
3843
SI->CompleteCV.wait(Lock, [&]() { return SI->Complete; });
3944
}
4045

46+
void Session::addResourceManager(std::unique_ptr<ResourceManager> RM) {
47+
std::scoped_lock<std::mutex> Lock(M);
48+
assert(!SI && "addResourceManager called after shutdown");
49+
ResourceMgrs.push_back(std::move(RM));
50+
}
51+
52+
void Session::setController(std::shared_ptr<ControllerAccess> CA) {
53+
assert(CA && "Cannot attach null controller");
54+
std::scoped_lock<std::mutex> Lock(M);
55+
assert(!this->CA && "Cannot re-attach controller");
56+
assert(!SI && "Cannot attach controller after shutdown");
57+
this->CA = std::move(CA);
58+
}
59+
60+
void Session::detachFromController() {
61+
if (auto TmpCA = CA) {
62+
TmpCA->doDisconnect();
63+
CA = nullptr;
64+
}
65+
}
66+
4167
void Session::shutdownNext(Error Err) {
4268
if (Err)
4369
reportError(std::move(Err));
@@ -72,4 +98,9 @@ void Session::shutdownComplete() {
7298
SI->CompleteCV.notify_all();
7399
}
74100

101+
void Session::wrapperReturn(orc_rt_SessionRef S, uint64_t CallId,
102+
orc_rt_WrapperFunctionBuffer ResultBytes) {
103+
unwrap(S)->sendWrapperResult(CallId, ResultBytes);
104+
}
105+
75106
} // namespace orc_rt

0 commit comments

Comments
 (0)