Skip to content

Commit 545aeff

Browse files
committed
Implemented RPC client SDK
1 parent 72a3962 commit 545aeff

File tree

13 files changed

+259
-93
lines changed

13 files changed

+259
-93
lines changed

include/ocvsmd/sdk/defines.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ namespace ocvsmd
1919
namespace sdk
2020
{
2121

22+
/// Defines the type of the SDK error.
23+
///
2224
struct Error
2325
{
2426
/// Defines platform-independent error codes.

include/ocvsmd/sdk/execution.hpp

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,48 @@ typename SenderOf<Result>::Ptr then(typename SenderOf<Input>::Ptr input_sender,
256256
return std::make_unique<ThenSender>(std::move(input_sender), std::forward<Func>(transform_func));
257257
}
258258

259+
/// Internal implementation details.
260+
/// Not supposed to be used directly by the users of the SDK.
261+
///
262+
namespace detail
263+
{
264+
265+
/// Helper function to serialize a message and perform an action on the serialized payload.
266+
///
267+
template <typename Result, typename Message, typename Action>
268+
CETL_NODISCARD static typename SenderOf<Result>::Ptr tryPerformOnSerialized(const Message& msg, Action&& action)
269+
{
270+
#if defined(__cpp_exceptions)
271+
try
272+
{
273+
#endif
274+
// Try to serialize the message to raw payload buffer.
275+
//
276+
constexpr std::size_t BufferSize = Message::_traits_::SerializationBufferSizeBytes;
277+
// NOLINTNEXTLINE(*-avoid-c-arrays)
278+
OwnedMutablePayload payload{BufferSize, std::make_unique<cetl::byte[]>(BufferSize)};
279+
//
280+
// No lint b/c of integration with Nunavut.
281+
// NOLINTNEXTLINE(*-pro-type-reinterpret-cast)
282+
auto* const payload_data = reinterpret_cast<std::uint8_t*>(payload.data.get());
283+
const auto result_size = serialize(msg, {payload_data, payload.size});
284+
if (result_size)
285+
{
286+
payload.size = result_size.value();
287+
return std::forward<Action>(action)(std::move(payload));
288+
}
289+
return just<Result>(Error{Error::Code::InvalidArgument});
290+
291+
#if defined(__cpp_exceptions)
292+
} catch (const std::bad_alloc&)
293+
{
294+
return just<Result>(Error{Error::Code::OutOfMemory});
295+
}
296+
#endif
297+
}
298+
299+
} // namespace detail
300+
259301
} // namespace sdk
260302
} // namespace ocvsmd
261303

include/ocvsmd/sdk/node_pub_sub.hpp

Lines changed: 29 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class Publisher
7777
template <typename Message>
7878
SenderOf<OptError>::Ptr publish(const Message& message, const std::chrono::microseconds timeout)
7979
{
80-
return tryPerformOnSerialized(message, [this, timeout](auto raw_payload) {
80+
return detail::tryPerformOnSerialized<OptError>(message, [this, timeout](auto raw_payload) {
8181
//
8282
return rawPublish(std::move(raw_payload), timeout);
8383
});
@@ -86,39 +86,6 @@ class Publisher
8686
protected:
8787
Publisher() = default;
8888

89-
private:
90-
template <typename Message, typename Action>
91-
CETL_NODISCARD static SenderOf<OptError>::Ptr tryPerformOnSerialized(const Message& msg, Action&& action)
92-
{
93-
#if defined(__cpp_exceptions)
94-
try
95-
{
96-
#endif
97-
// Try to serialize the message to raw payload buffer.
98-
//
99-
constexpr std::size_t BufferSize = Message::_traits_::SerializationBufferSizeBytes;
100-
// NOLINTNEXTLINE(*-avoid-c-arrays)
101-
OwnedMutablePayload payload{BufferSize, std::make_unique<cetl::byte[]>(BufferSize)};
102-
//
103-
// No lint b/c of integration with Nunavut.
104-
// NOLINTNEXTLINE(*-pro-type-reinterpret-cast)
105-
auto* const payload_data = reinterpret_cast<std::uint8_t*>(payload.data.get());
106-
const auto result_size = serialize(msg, {payload_data, payload.size});
107-
if (result_size)
108-
{
109-
payload.size = result_size.value();
110-
return std::forward<Action>(action)(std::move(payload));
111-
}
112-
return just<OptError>(Error{Error::Code::InvalidArgument});
113-
114-
#if defined(__cpp_exceptions)
115-
} catch (const std::bad_alloc&)
116-
{
117-
return just<OptError>(Error{Error::Code::OutOfMemory});
118-
}
119-
#endif
120-
}
121-
12289
}; // Publisher
12390

12491
/// Defines the interface of Messages Subscriber.
@@ -203,38 +170,41 @@ class Subscriber
203170
/// Also, to not miss any new message, user should immediately initiate
204171
/// a new `receive` operation after getting the success result of the previous one.
205172
///
173+
/// @param memory The memory resource to use for the message deserialization.
206174
/// @return An execution sender which emits the async result of the operation.
207175
///
208176
template <typename Message>
209177
typename SenderOf<Receive::Result<Message>>::Ptr receive(cetl::pmr::memory_resource& memory)
210178
{
211179
using ResultMsg = Receive::Result<Message>;
212180

213-
return then<ResultMsg, RawReceive::Result>(rawReceive(), [&memory](auto raw_result) -> ResultMsg {
214-
//
215-
if (const auto* const failure = cetl::get_if<RawReceive::Failure>(&raw_result))
216-
{
217-
return *failure;
218-
}
219-
auto raw_msg = cetl::get<RawReceive::Success>(std::move(raw_result));
220-
221-
// No lint b/c of integration with Nunavut.
222-
// NOLINTNEXTLINE(*-pro-type-reinterpret-cast)
223-
const auto* const raw_payload = reinterpret_cast<const std::uint8_t*>(raw_msg.payload.data.get());
224-
Message message{&memory};
225-
const auto deser_result = deserialize(message, {raw_payload, raw_msg.payload.size});
226-
if (!deser_result)
227-
{
228-
// Invalid message payload.
229-
return Error{Error::Code::InvalidArgument};
230-
}
231-
232-
return Receive::Success<Message>{
233-
std::move(message),
234-
raw_msg.priority,
235-
raw_msg.publisher_node_id,
236-
};
237-
});
181+
return then<ResultMsg, RawReceive::Result>( //
182+
rawReceive(),
183+
[&memory](auto raw_result) -> ResultMsg {
184+
//
185+
if (const auto* const failure = cetl::get_if<RawReceive::Failure>(&raw_result))
186+
{
187+
return *failure;
188+
}
189+
auto raw_msg = cetl::get<RawReceive::Success>(std::move(raw_result));
190+
191+
// No lint b/c of integration with Nunavut.
192+
// NOLINTNEXTLINE(*-pro-type-reinterpret-cast)
193+
const auto* const raw_payload = reinterpret_cast<const std::uint8_t*>(raw_msg.payload.data.get());
194+
Message message{&memory};
195+
const auto deser_result = deserialize(message, {raw_payload, raw_msg.payload.size});
196+
if (!deser_result)
197+
{
198+
// Invalid message payload.
199+
return Error{Error::Code::InvalidArgument};
200+
}
201+
202+
return Receive::Success<Message>{
203+
std::move(message),
204+
raw_msg.priority,
205+
raw_msg.publisher_node_id,
206+
};
207+
});
238208
}
239209

240210
protected:

include/ocvsmd/sdk/node_rpc_client.hpp

Lines changed: 90 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,46 +39,125 @@ class NodeRpcClient
3939
NodeRpcClient& operator=(NodeRpcClient&&) = delete;
4040
NodeRpcClient& operator=(const NodeRpcClient&) = delete;
4141

42-
/// Defines the result type of the RPC client raw response.
42+
/// Defines the result type of the RPC client raw call.
4343
///
44-
/// On success, the result is a raw data buffer, its size, and extra metadata.
44+
/// On success, the result is a raw response data buffer, its size, and extra metadata.
4545
/// On failure, the result is an SDK error.
4646
///
47-
struct RawResponse final
47+
struct RawCall final
4848
{
4949
struct Success
5050
{
51-
OwnedMutablePayload payload;
51+
OwnedMutablePayload raw_response;
5252
CyphalPriority priority;
5353
CyphalNodeId server_node_id;
5454
};
5555
using Failure = Error;
5656
using Result = cetl::variant<Success, Failure>;
5757
};
58-
/// Sends raw RPC request.
58+
/// Sends raw RPC call.
5959
///
60-
/// The client-side (the SDK) will forward the raw data to the corresponding Cyphal network service client
60+
/// The client-side (the SDK) will forward the raw request data to the corresponding Cyphal network service client
6161
/// on the server-side (the daemon). The raw data is forwarded as is, without any interpretation or validation.
6262
///
63-
/// Note, only one request can be active at a time (per rpc client).
63+
/// Note, only one call can be active at a time (per rpc client).
6464
/// In the case of multiple "concurrent" operations, only the last one will report the response result.
6565
/// Any previous still existing operations will be "stalled" and never complete.
6666
///
67-
/// @param raw_payload The raw request data to be sent.
67+
/// @param raw_request The raw request data to be sent.
6868
/// @param request_timeout The maximum time to keep the raw request as valid in the Cyphal network.
6969
/// @param response_timeout The maximum time to wait for reply from the Cyphal node.
7070
/// @return An execution sender which emits the async result of the operation.
7171
///
72-
virtual SenderOf<RawResponse::Result>::Ptr rawRequest(OwnedMutablePayload&& raw_payload,
73-
const std::chrono::microseconds request_timeout,
74-
const std::chrono::microseconds response_timeout) = 0;
72+
virtual SenderOf<RawCall::Result>::Ptr rawCall(OwnedMutablePayload&& raw_request,
73+
const std::chrono::microseconds request_timeout,
74+
const std::chrono::microseconds response_timeout) = 0;
7575

7676
/// Sets priority for request to be issued by this client.
7777
///
7878
/// The next and following `request` operations will use this priority.
7979
///
8080
virtual OptError setPriority(const CyphalPriority priority) = 0;
8181

82+
/// Defines the result type of the RPC client call.
83+
///
84+
/// On success, the result is a deserialized response message, and extra metadata.
85+
/// On failure, the result is an SDK error.
86+
///
87+
struct Call final
88+
{
89+
template <typename Response>
90+
struct Success
91+
{
92+
Response response;
93+
CyphalPriority priority;
94+
CyphalNodeId server_node_id;
95+
};
96+
97+
using Failure = Error;
98+
99+
template <typename Response>
100+
using Result = cetl::variant<Success<Response>, Failure>;
101+
};
102+
/// Sends RPC call.
103+
///
104+
/// The client-side (the SDK) will forward the serialized request message to the corresponding Cyphal network
105+
/// service client on the server-side (the daemon).
106+
///
107+
/// Note, only one call can be active at a time (per rpc client).
108+
/// In the case of multiple "concurrent" operations, only the last one will report the response result.
109+
/// Any previous still existing operations will be "stalled" and never complete.
110+
///
111+
/// @param memory The memory resource to use for the response deserialization.
112+
/// @param request The request message to be sent.
113+
/// @param request_timeout The maximum time to keep the request as valid in the Cyphal network.
114+
/// @param response_timeout The maximum time to wait for reply from the Cyphal node.
115+
/// @return An execution sender which emits the async result of the operation.
116+
///
117+
template <typename Response, typename Request>
118+
typename SenderOf<Call::Result<Response>>::Ptr call(cetl::pmr::memory_resource& memory,
119+
const Request& request,
120+
const std::chrono::microseconds request_timeout,
121+
const std::chrono::microseconds response_timeout)
122+
{
123+
using CallResult = Call::Result<Response>;
124+
125+
auto raw_call_sender = detail::tryPerformOnSerialized<RawCall::Result>( //
126+
request,
127+
[this, request_timeout, response_timeout](auto raw_request) {
128+
//
129+
return rawCall(std::move(raw_request), request_timeout, response_timeout);
130+
});
131+
132+
return then<CallResult, RawCall::Result>( //
133+
std::move(raw_call_sender),
134+
[&memory](auto raw_result) -> CallResult {
135+
//
136+
if (const auto* const failure = cetl::get_if<RawCall::Failure>(&raw_result))
137+
{
138+
return *failure;
139+
}
140+
auto called = cetl::get<RawCall::Success>(std::move(raw_result));
141+
142+
// No lint b/c of integration with Nunavut.
143+
// NOLINTNEXTLINE(*-pro-type-reinterpret-cast)
144+
const auto* const raw_payload = reinterpret_cast<const std::uint8_t*>(called.raw_response.data.get());
145+
Response response{&memory};
146+
const auto deser_result = deserialize(response, {raw_payload, called.raw_response.size});
147+
if (!deser_result)
148+
{
149+
// Invalid message payload.
150+
return Error{Error::Code::InvalidArgument};
151+
}
152+
153+
return Call::Success<Response>{
154+
std::move(response),
155+
called.priority,
156+
called.server_node_id,
157+
};
158+
});
159+
}
160+
82161
protected:
83162
NodeRpcClient() = default;
84163

0 commit comments

Comments
 (0)