Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions orc-rt/include/orc-rt-c/WrapperFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,19 @@ typedef struct {
* Asynchronous return function for an orc-rt wrapper function.
*/
typedef void (*orc_rt_WrapperFunctionReturn)(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionBuffer ResultBytes);

/**
* orc-rt wrapper function prototype.
*
* ArgBytes contains the serialized arguments for the wrapper function.
* Session holds a reference to the session object.
* CallCtx holds a pointer to the context object for this particular call.
* CallId holds a pointer to the context object for this particular call.
* Return holds a pointer to the return function.
*/
typedef void (*orc_rt_WrapperFunction)(orc_rt_SessionRef Session, void *CallCtx,
typedef void (*orc_rt_WrapperFunction)(orc_rt_SessionRef Session,
uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes);

Expand Down
4 changes: 2 additions & 2 deletions orc-rt/include/orc-rt/SPSWrapperFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ template <typename SPSSig> struct SPSWrapperFunction {
}

template <typename Handler>
static void handle(orc_rt_SessionRef Session, void *CallCtx,
static void handle(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
WrapperFunctionBuffer ArgBytes, Handler &&H) {
WrapperFunction::handle(Session, CallCtx, Return, std::move(ArgBytes),
WrapperFunction::handle(Session, CallId, Return, std::move(ArgBytes),
WrapperFunctionSPSSerializer<SPSSig>(),
std::forward<Handler>(H));
}
Expand Down
8 changes: 4 additions & 4 deletions orc-rt/include/orc-rt/SimpleNativeMemoryMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,21 @@ class SimpleNativeMemoryMap : public ResourceManager {
} // namespace orc_rt

ORC_RT_SPS_INTERFACE void orc_rt_SimpleNativeMemoryMap_reserve_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return, orc_rt_WrapperFunctionBuffer ArgBytes);

ORC_RT_SPS_INTERFACE void
orc_rt_SimpleNativeMemoryMap_releaseMultiple_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return, orc_rt_WrapperFunctionBuffer ArgBytes);

ORC_RT_SPS_INTERFACE void orc_rt_SimpleNativeMemoryMap_initialize_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return, orc_rt_WrapperFunctionBuffer ArgBytes);

ORC_RT_SPS_INTERFACE void
orc_rt_SimpleNativeMemoryMap_deinitializeMultiple_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return, orc_rt_WrapperFunctionBuffer ArgBytes);

#endif // ORC_RT_SIMPLENATIVEMEMORYMAP_H
28 changes: 14 additions & 14 deletions orc-rt/include/orc-rt/WrapperFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ using WFHandlerTraits = CallableTraitsHelper<WFHandlerTraitsImpl, C>;

template <typename Serializer> class StructuredYieldBase {
public:
StructuredYieldBase(orc_rt_SessionRef Session, void *CallCtx,
StructuredYieldBase(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return, Serializer &&S)
: Session(Session), CallCtx(CallCtx), Return(Return),
: Session(Session), CallId(CallId), Return(Return),
S(std::forward<Serializer>(S)) {}

protected:
orc_rt_SessionRef Session;
void *CallCtx;
uint64_t CallId;
orc_rt_WrapperFunctionReturn Return;
std::decay_t<Serializer> S;
};
Expand All @@ -158,9 +158,9 @@ class StructuredYield<std::tuple<RetT>, Serializer>
using StructuredYieldBase<Serializer>::StructuredYieldBase;
void operator()(RetT &&R) {
if (auto ResultBytes = this->S.result().serialize(std::forward<RetT>(R)))
this->Return(this->Session, this->CallCtx, ResultBytes->release());
this->Return(this->Session, this->CallId, ResultBytes->release());
else
this->Return(this->Session, this->CallCtx,
this->Return(this->Session, this->CallId,
WrapperFunctionBuffer::createOutOfBandError(
"Could not serialize wrapper function result data")
.release());
Expand All @@ -173,7 +173,7 @@ class StructuredYield<std::tuple<>, Serializer>
public:
using StructuredYieldBase<Serializer>::StructuredYieldBase;
void operator()() {
this->Return(this->Session, this->CallCtx,
this->Return(this->Session, this->CallId,
WrapperFunctionBuffer().release());
}
};
Expand Down Expand Up @@ -251,12 +251,12 @@ struct WrapperFunction {
///
///
/// static void adder_add_async_sps_wrapper(
/// orc_rt_SessionRef Session, void *CallCtx,
/// orc_rt_SessionRef Session, uint64_t CallId,
/// orc_rt_WrapperFunctionReturn Return,
/// orc_rt_WrapperFunctionBuffer ArgBytes) {
/// using SPSSig = SPSString(SPSExecutorAddr, int32_t, bool);
/// SPSWrapperFunction<SPSSig>::handle(
/// Session, CallCtx, Return, ArgBytes,
/// Session, CallId, Return, ArgBytes,
/// WrapperFunction::handleWithAsyncMethod(&MyClass::myMethod));
/// }
/// @endcode
Expand Down Expand Up @@ -313,12 +313,12 @@ struct WrapperFunction {
///
///
/// static void adder_add_sync_sps_wrapper(
/// orc_rt_SessionRef Session, void *CallCtx,
/// orc_rt_SessionRef Session, uint64_t CallId,
/// orc_rt_WrapperFunctionReturn Return,
/// orc_rt_WrapperFunctionBuffer ArgBytes) {
/// using SPSSig = SPSString(SPSExecutorAddr, int32_t, bool);
/// SPSWrapperFunction<SPSSig>::handle(
/// Session, CallCtx, Return, ArgBytes,
/// Session, CallId, Return, ArgBytes,
/// WrapperFunction::handleWithSyncMethod(&Adder::addSync));
/// }
/// @endcode
Expand Down Expand Up @@ -368,7 +368,7 @@ struct WrapperFunction {
/// This utility deserializes and serializes arguments and return values
/// (using the given Serializer), and calls the given handler.
template <typename Serializer, typename Handler>
static void handle(orc_rt_SessionRef Session, void *CallCtx,
static void handle(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
WrapperFunctionBuffer ArgBytes, Serializer &&S,
Handler &&H) {
Expand All @@ -380,16 +380,16 @@ struct WrapperFunction {
typedef typename CallableArgInfo<Yield>::args_tuple_type RetTupleType;

if (ArgBytes.getOutOfBandError())
return Return(Session, CallCtx, ArgBytes.release());
return Return(Session, CallId, ArgBytes.release());

if (auto Args = S.arguments().template deserialize<ArgTuple>(ArgBytes))
std::apply(HandlerTraits::forwardArgsAsRequested(bind_front(
std::forward<Handler>(H),
detail::StructuredYield<RetTupleType, Serializer>(
Session, CallCtx, Return, std::move(S)))),
Session, CallId, Return, std::move(S)))),
*Args);
else
Return(Session, CallCtx,
Return(Session, CallId,
WrapperFunctionBuffer::createOutOfBandError(
"Could not deserialize wrapper function arg data")
.release());
Expand Down
16 changes: 8 additions & 8 deletions orc-rt/lib/executor/SimpleNativeMemoryMap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,45 +367,45 @@ Error SimpleNativeMemoryMap::recordDeallocActions(
}

ORC_RT_SPS_INTERFACE void orc_rt_SimpleNativeMemoryMap_reserve_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
using Sig = SPSExpected<SPSExecutorAddr>(SPSExecutorAddr, SPSSize);
SPSWrapperFunction<Sig>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
WrapperFunction::handleWithAsyncMethod(&SimpleNativeMemoryMap::reserve));
}

ORC_RT_SPS_INTERFACE void
orc_rt_SimpleNativeMemoryMap_releaseMultiple_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
using Sig = SPSError(SPSExecutorAddr, SPSSequence<SPSExecutorAddr>);
SPSWrapperFunction<Sig>::handle(Session, CallCtx, Return, ArgBytes,
SPSWrapperFunction<Sig>::handle(Session, CallId, Return, ArgBytes,
WrapperFunction::handleWithAsyncMethod(
&SimpleNativeMemoryMap::releaseMultiple));
}

ORC_RT_SPS_INTERFACE void orc_rt_SimpleNativeMemoryMap_initialize_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
using Sig = SPSExpected<SPSExecutorAddr>(
SPSExecutorAddr, SPSSimpleNativeMemoryMapInitializeRequest);
SPSWrapperFunction<Sig>::handle(Session, CallCtx, Return, ArgBytes,
SPSWrapperFunction<Sig>::handle(Session, CallId, Return, ArgBytes,
WrapperFunction::handleWithAsyncMethod(
&SimpleNativeMemoryMap::initialize));
}

ORC_RT_SPS_INTERFACE void
orc_rt_SimpleNativeMemoryMap_deinitializeMultiple_sps_wrapper(
orc_rt_SessionRef Session, void *CallCtx,
orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
using Sig = SPSError(SPSExecutorAddr, SPSSequence<SPSExecutorAddr>);
SPSWrapperFunction<Sig>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
WrapperFunction::handleWithAsyncMethod(
&SimpleNativeMemoryMap::deinitializeMultiple));
}
Expand Down
8 changes: 5 additions & 3 deletions orc-rt/unittests/DirectCaller.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ class DirectCaller {
virtual ~DirectResultSender() {}
virtual void send(orc_rt_SessionRef Session,
orc_rt::WrapperFunctionBuffer ResultBytes) = 0;
static void send(orc_rt_SessionRef Session, void *CallCtx,
static void send(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionBuffer ResultBytes) {
std::unique_ptr<DirectResultSender>(
reinterpret_cast<DirectResultSender *>(CallCtx))
reinterpret_cast<DirectResultSender *>(
static_cast<uintptr_t>(CallId)))
->send(Session, ResultBytes);
}
};
Expand Down Expand Up @@ -59,7 +60,8 @@ class DirectCaller {
orc_rt::WrapperFunctionBuffer ArgBytes) {
auto DR =
makeDirectResultSender(std::forward<HandleResultFn>(HandleResult));
Fn(Session, reinterpret_cast<void *>(DR.release()),
Fn(Session,
static_cast<uint64_t>(reinterpret_cast<uintptr_t>(DR.release())),
DirectResultSender::send, ArgBytes.release());
}

Expand Down
43 changes: 23 additions & 20 deletions orc-rt/unittests/SPSWrapperFunctionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@

using namespace orc_rt;

static void void_noop_sps_wrapper(orc_rt_SessionRef Session, void *CallCtx,
static void void_noop_sps_wrapper(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<void()>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
[](move_only_function<void()> Return) { Return(); });
}

Expand All @@ -40,11 +40,12 @@ TEST(SPSWrapperFunctionUtilsTest, VoidNoop) {
EXPECT_TRUE(Ran);
}

static void add_via_lambda_sps_wrapper(orc_rt_SessionRef Session, void *CallCtx,
static void add_via_lambda_sps_wrapper(orc_rt_SessionRef Session,
uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<int32_t(int32_t, int32_t)>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
[](move_only_function<void(int32_t)> Return, int32_t X, int32_t Y) {
Return(X + Y);
});
Expand All @@ -64,11 +65,11 @@ static void add_via_function(move_only_function<void(int32_t)> Return,
}

static void
add_via_function_sps_wrapper(orc_rt_SessionRef Session, void *CallCtx,
add_via_function_sps_wrapper(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<int32_t(int32_t, int32_t)>::handle(
Session, CallCtx, Return, ArgBytes, add_via_function);
Session, CallId, Return, ArgBytes, add_via_function);
}

TEST(SPSWrapperFunctionUtilsTest, BinaryOpViaFunction) {
Expand All @@ -80,11 +81,11 @@ TEST(SPSWrapperFunctionUtilsTest, BinaryOpViaFunction) {
}

static void
add_via_function_pointer_sps_wrapper(orc_rt_SessionRef Session, void *CallCtx,
add_via_function_pointer_sps_wrapper(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<int32_t(int32_t, int32_t)>::handle(
Session, CallCtx, Return, ArgBytes, &add_via_function);
Session, CallId, Return, ArgBytes, &add_via_function);
}

TEST(SPSWrapperFunctionUtilsTest, BinaryOpViaFunctionPointer) {
Expand All @@ -96,11 +97,12 @@ TEST(SPSWrapperFunctionUtilsTest, BinaryOpViaFunctionPointer) {
}

static void
round_trip_string_via_span_sps_wrapper(orc_rt_SessionRef Session, void *CallCtx,
round_trip_string_via_span_sps_wrapper(orc_rt_SessionRef Session,
uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<SPSString(SPSString)>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
[](move_only_function<void(std::string)> Return, span<const char> S) {
Return({S.data(), S.size()});
});
Expand All @@ -119,11 +121,11 @@ TEST(SPSWrapperFunctionUtilsTest, RoundTripStringViaSpan) {
}

static void improbable_feat_sps_wrapper(orc_rt_SessionRef Session,
void *CallCtx,
uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<SPSError(bool)>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
[](move_only_function<void(Error)> Return, bool LuckyHat) {
if (LuckyHat)
Return(Error::success());
Expand Down Expand Up @@ -155,11 +157,11 @@ TEST(SPSWrapperFunctionUtilsTest, TransparentConversionErrorFailureCase) {
EXPECT_EQ(ErrMsg, "crushed by boulder");
}

static void halve_number_sps_wrapper(orc_rt_SessionRef Session, void *CallCtx,
static void halve_number_sps_wrapper(orc_rt_SessionRef Session, uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<SPSExpected<int32_t>(int32_t)>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
[](move_only_function<void(Expected<int32_t>)> Return, int N) {
if (N % 2 == 0)
Return(N >> 1);
Expand Down Expand Up @@ -208,12 +210,12 @@ class SPSSerializationTraits<SPSOpCounter<N>, OpCounter<N>> {

static void
handle_with_reference_types_sps_wrapper(orc_rt_SessionRef Session,
void *CallCtx,
uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<void(
SPSOpCounter<0>, SPSOpCounter<1>, SPSOpCounter<2>,
SPSOpCounter<3>)>::handle(Session, CallCtx, Return, ArgBytes,
SPSOpCounter<3>)>::handle(Session, CallId, Return, ArgBytes,
[](move_only_function<void()> Return,
OpCounter<0>, OpCounter<1> &,
const OpCounter<2> &,
Expand Down Expand Up @@ -281,11 +283,11 @@ class Adder {
} // anonymous namespace

static void adder_add_async_sps_wrapper(orc_rt_SessionRef Session,
void *CallCtx,
uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<int32_t(SPSExecutorAddr, int32_t, int32_t)>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
WrapperFunction::handleWithAsyncMethod(&Adder::addAsync));
}

Expand All @@ -300,11 +302,12 @@ TEST(SPSWrapperFunctionUtilsTest, HandleWtihAsyncMethod) {
EXPECT_EQ(Result, 42);
}

static void adder_add_sync_sps_wrapper(orc_rt_SessionRef Session, void *CallCtx,
static void adder_add_sync_sps_wrapper(orc_rt_SessionRef Session,
uint64_t CallId,
orc_rt_WrapperFunctionReturn Return,
orc_rt_WrapperFunctionBuffer ArgBytes) {
SPSWrapperFunction<int32_t(SPSExecutorAddr, int32_t, int32_t)>::handle(
Session, CallCtx, Return, ArgBytes,
Session, CallId, Return, ArgBytes,
WrapperFunction::handleWithSyncMethod(&Adder::addSync));
}

Expand Down
Loading