66#ifndef OCVSMD_SDK_NODE_PUB_SUB_HPP_INCLUDED
77#define OCVSMD_SDK_NODE_PUB_SUB_HPP_INCLUDED
88
9+ #include " defines.hpp"
910#include " execution.hpp"
1011
1112#include < cetl/pf17/cetlpf.hpp>
1213
14+ #include < chrono>
1315#include < memory>
1416
1517namespace ocvsmd
1618{
1719namespace sdk
1820{
1921
20- // / Defines the interface for the Raw Messages Subscriber .
22+ // / Defines the interface of Messages Publisher .
2123// /
22- class RawSubscriber
24+ class Publisher
25+ {
26+ public:
27+ // / Defines a smart pointer type for the interface.
28+ // /
29+ // / It's made "shared" b/c execution sender (see `publish` method) implicitly
30+ // / holds reference to its publisher.
31+ // /
32+ using Ptr = std::shared_ptr<Publisher>;
33+
34+ virtual ~Publisher () = default ;
35+
36+ // No copy/move semantics.
37+ Publisher (Publisher&&) = delete ;
38+ Publisher (const Publisher&) = delete ;
39+ Publisher& operator =(Publisher&&) = delete ;
40+ Publisher& operator =(const Publisher&) = delete ;
41+
42+ // / Publishes the next raw message using this publisher.
43+ // /
44+ // / The client-side (the SDK) will forward the raw data to the corresponding Cyphal network publisher
45+ // / on the server-side (the daemon). The raw data is forwarded as is, without any interpretation or validation.
46+ // /
47+ // / Note, only one operation can be active at a time (per publisher).
48+ // / In the case of multiple "concurrent" operations, only the last one will report the publishing result.
49+ // / Any previous still existing operations will be "stalled" and never complete.
50+ // /
51+ // / @param raw_payload The raw message data to publish.
52+ // / @param timeout The maximum time to keep the published raw message as valid in the Cyphal network.
53+ // / @return An execution sender which emits the async result of the operation.
54+ // /
55+ virtual SenderOf<OptError>::Ptr rawPublish (OwnMutablePayload&& raw_payload,
56+ const std::chrono::microseconds timeout) = 0;
57+
58+ // / Sets priority for messages to be issued by this publisher.
59+ // /
60+ // / The next and following `publish` operations will use this priority.
61+ // /
62+ virtual OptError setPriority (const CyphalPriority priority) = 0;
63+
64+ // / Publishes the next message using this publisher.
65+ // /
66+ // / The client-side (the SDK) will forward the serialized message to the corresponding Cyphal network publisher
67+ // / on the server-side (the daemon).
68+ // /
69+ // / Note, only one operation can be active at a time (per publisher).
70+ // / In the case of multiple "concurrent" operations, only the last one will report the publishing result.
71+ // / Any previous still existing operations will be "stalled" and never complete.
72+ // /
73+ // / @param message The message to publish.
74+ // / @param timeout The maximum time to keep the published message as valid in the Cyphal network.
75+ // / @return An execution sender which emits the async result of the operation.
76+ // /
77+ template <typename Message>
78+ SenderOf<OptError>::Ptr publish (const Message& message, const std::chrono::microseconds timeout)
79+ {
80+ return tryPerformOnSerialized (message, [this , timeout](auto raw_payload) {
81+ //
82+ return rawPublish (std::move (raw_payload), timeout);
83+ });
84+ }
85+
86+ protected:
87+ Publisher () = default ;
88+
89+ private:
90+ template <typename Message, typename Action>
91+ CETL_NODISCARD static SenderOf<OptError>::Ptr tryPerformOnSerialized (const Message& msg, Action&& action)
92+ {
93+ #if defined(__cpp_exceptions)
94+ try
95+ {
96+ #endif
97+ // Try to serialize the message to raw payload buffer.
98+ //
99+ constexpr std::size_t BufferSize = Message::_traits_::SerializationBufferSizeBytes;
100+ // NOLINTNEXTLINE(*-avoid-c-arrays)
101+ OwnMutablePayload payload{BufferSize, std::make_unique<cetl::byte[]>(BufferSize)};
102+ //
103+ // No lint b/c of integration with Nunavut.
104+ // NOLINTNEXTLINE(*-pro-type-reinterpret-cast)
105+ auto * const payload_data = reinterpret_cast <std::uint8_t *>(payload.data .get ());
106+ const auto result_size = serialize (msg, {payload_data, payload.size });
107+ if (result_size)
108+ {
109+ payload.size = result_size.value ();
110+ return std::forward<Action>(action)(std::move (payload));
111+ }
112+ return just<OptError>(Error{Error::Code::InvalidArgument});
113+
114+ #if defined(__cpp_exceptions)
115+ } catch (const std::bad_alloc&)
116+ {
117+ return just<OptError>(Error{Error::Code::OutOfMemory});
118+ }
119+ #endif
120+ }
121+
122+ }; // Publisher
123+
124+ // / Defines the interface of Messages Subscriber.
125+ // /
126+ class Subscriber
23127{
24128public:
25129 // / Defines a smart pointer type for the interface.
26130 // /
27131 // / It's made "shared" b/c execution sender (see `receive` method) implicitly
28132 // / holds reference to its subscriber.
29133 // /
30- using Ptr = std::shared_ptr<RawSubscriber >;
134+ using Ptr = std::shared_ptr<Subscriber >;
31135
32- virtual ~RawSubscriber () = default ;
136+ virtual ~Subscriber () = default ;
33137
34138 // No copy/move semantics.
35- RawSubscriber (RawSubscriber &&) = delete ;
36- RawSubscriber (const RawSubscriber &) = delete ;
37- RawSubscriber & operator =(RawSubscriber &&) = delete ;
38- RawSubscriber & operator =(const RawSubscriber &) = delete ;
139+ Subscriber (Subscriber &&) = delete ;
140+ Subscriber (const Subscriber &) = delete ;
141+ Subscriber & operator =(Subscriber &&) = delete ;
142+ Subscriber & operator =(const Subscriber &) = delete ;
39143
40- // / Defines the result type of the raw subscriber message reception.
144+ // / Defines the result type of the subscriber raw message reception.
41145 // /
42146 // / On success, the result is a raw data buffer, its size, and extra metadata.
43147 // / On failure, the result is an SDK error.
44148 // /
45- struct Receive final
149+ struct RawReceive final
46150 {
47151 struct Success
48152 {
49- std::size_t size;
50- std::unique_ptr<cetl::byte[]> data; // NOLINT(*-avoid-c-arrays)
51- CyphalPriority priority;
52- cetl::optional<CyphalNodeId> publisher_node_id;
153+ OwnMutablePayload payload;
154+ CyphalPriority priority;
155+ cetl::optional<CyphalNodeId> publisher_node_id;
53156 };
54157 using Failure = Error;
55158 using Result = cetl::variant<Success, Failure>;
@@ -67,12 +170,77 @@ class RawSubscriber
67170 // /
68171 // / @return An execution sender which emits the async result of the operation.
69172 // /
70- virtual SenderOf<Receive::Result>::Ptr receive () = 0;
173+ virtual SenderOf<RawReceive::Result>::Ptr rawReceive () = 0;
174+
175+ // / Defines the result type of the subscriber message reception.
176+ // /
177+ // / On success, the result is a deserialized message, and its extra metadata.
178+ // / On failure, the result is an SDK error.
179+ // /
180+ struct Receive final
181+ {
182+ template <typename Message>
183+ struct Success
184+ {
185+ Message message;
186+ CyphalPriority priority;
187+ cetl::optional<CyphalNodeId> publisher_node_id;
188+ };
189+
190+ using Failure = Error;
191+
192+ template <typename Message>
193+ using Result = cetl::variant<Success<Message>, Failure>;
194+ };
195+ // / Receives the next message from this subscriber.
196+ // /
197+ // / The server-side (the daemon) will forward the observed raw data on the corresponding Cyphal network subscriber.
198+ // / The received raw data is then deserialized into the strong-typed message.
199+ // /
200+ // / Note, only one `receive` operation can be active at a time (per subscriber).
201+ // / In the case of multiple "concurrent" operations, only the last one will receive the result.
202+ // / Any previous still existing operations will be "stalled" and never complete.
203+ // / Also, to not miss any new message, user should immediately initiate
204+ // / a new `receive` operation after getting the success result of the previous one.
205+ // /
206+ // / @return An execution sender which emits the async result of the operation.
207+ // /
208+ template <typename Message>
209+ typename SenderOf<Receive::Result<Message>>::Ptr receive (cetl::pmr::memory_resource& memory)
210+ {
211+ using ResultMsg = Receive::Result<Message>;
212+
213+ return then<ResultMsg, RawReceive::Result>(rawReceive (), [&memory](auto raw_result) -> ResultMsg {
214+ //
215+ if (const auto * const failure = cetl::get_if<RawReceive::Failure>(&raw_result))
216+ {
217+ return *failure;
218+ }
219+ auto raw_msg = cetl::get<RawReceive::Success>(std::move (raw_result));
220+
221+ // No lint b/c of integration with Nunavut.
222+ // NOLINTNEXTLINE(*-pro-type-reinterpret-cast)
223+ const auto * const raw_payload = reinterpret_cast <const std::uint8_t *>(raw_msg.payload .data .get ());
224+ Message message{&memory};
225+ const auto deser_result = deserialize (message, {raw_payload, raw_msg.payload .size });
226+ if (!deser_result)
227+ {
228+ // Invalid message payload.
229+ return Error{Error::Code::InvalidArgument};
230+ }
231+
232+ return Receive::Success<Message>{
233+ std::move (message),
234+ raw_msg.priority ,
235+ raw_msg.publisher_node_id ,
236+ };
237+ });
238+ }
71239
72240protected:
73- RawSubscriber () = default ;
241+ Subscriber () = default ;
74242
75- }; // RawSubscriber
243+ }; // Subscriber
76244
77245} // namespace sdk
78246} // namespace ocvsmd
0 commit comments