Skip to content

Commit e33cdb7

Browse files
authored
Fix for issue #412 Only one single RPC client is possible per port (#415)
1. Response Rx Session now doesn't own lizard's rpc port subscription - instead now session references the shared subscription (with reference counting). 2. Transport delegates now manage subscriptions. 3. CAN Response Rx Sessions now stored in cavl tree (for UDP it was already like this) - matching now by both port and node ids (previously it was by the port only). 4. CAN filters now also are made by the delegate (b/c it knows about subscriptions (see bullet # 2). 5. Extended unit tests to cover multiple response Rx sessions (both CAN and UDP). Also covered unsolicited responses. Github's Hide whitespaces is recommended.
1 parent f66b8a6 commit e33cdb7

27 files changed

+2083
-1172
lines changed

include/libcyphal/transport/can/can_transport_impl.hpp

Lines changed: 98 additions & 128 deletions
Large diffs are not rendered by default.

include/libcyphal/transport/can/delegate.hpp

Lines changed: 369 additions & 100 deletions
Large diffs are not rendered by default.

include/libcyphal/transport/can/msg_rx_session.hpp

Lines changed: 16 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,17 @@ class MessageRxSession final : private IRxSessionDelegate, public IMessageRxSess
5050
};
5151

5252
public:
53-
CETL_NODISCARD static Expected<UniquePtr<IMessageRxSession>, AnyFailure> make(TransportDelegate& delegate,
54-
const MessageRxParams& params)
53+
CETL_NODISCARD static Expected<UniquePtr<IMessageRxSession>, AnyFailure> make( //
54+
cetl::pmr::memory_resource& memory,
55+
TransportDelegate& delegate,
56+
const MessageRxParams& params)
5557
{
5658
if (params.subject_id > CANARD_SUBJECT_ID_MAX)
5759
{
5860
return ArgumentError{};
5961
}
6062

61-
auto session = libcyphal::detail::makeUniquePtr<Spec>(delegate.memory(), Spec{}, delegate, params);
63+
auto session = libcyphal::detail::makeUniquePtr<Spec>(memory, Spec{}, delegate, params);
6264
if (session == nullptr)
6365
{
6466
return MemoryError{};
@@ -72,20 +74,12 @@ class MessageRxSession final : private IRxSessionDelegate, public IMessageRxSess
7274
, params_{params}
7375
, subscription_{}
7476
{
75-
const std::int8_t result = ::canardRxSubscribe(&delegate.canardInstance(),
76-
CanardTransferKindMessage,
77-
params_.subject_id,
78-
params_.extent_bytes,
79-
CANARD_DEFAULT_TRANSFER_ID_TIMEOUT_USEC,
80-
&subscription_);
81-
(void) result;
82-
CETL_DEBUG_ASSERT(result >= 0, "There is no way currently to get an error here.");
83-
CETL_DEBUG_ASSERT(result > 0, "New subscription supposed to be made.");
77+
delegate.listenForRxSubscription(subscription_, params);
8478

8579
// No Sonar `cpp:S5356` b/c we integrate here with C libcanard API.
8680
subscription_.user_reference = static_cast<IRxSessionDelegate*>(this); // NOSONAR cpp:S5356
8781

88-
delegate_.onSessionEvent(TransportDelegate::SessionEvent::MsgRxLifetime{true /* is_added */});
82+
delegate_.onSessionEvent(TransportDelegate::SessionEvent::MsgCreated{});
8983
}
9084

9185
MessageRxSession(const MessageRxSession&) = delete;
@@ -95,13 +89,8 @@ class MessageRxSession final : private IRxSessionDelegate, public IMessageRxSess
9589

9690
~MessageRxSession()
9791
{
98-
const std::int8_t result =
99-
::canardRxUnsubscribe(&delegate_.canardInstance(), CanardTransferKindMessage, params_.subject_id);
100-
(void) result;
101-
CETL_DEBUG_ASSERT(result >= 0, "There is no way currently to get an error here.");
102-
CETL_DEBUG_ASSERT(result > 0, "Subscription supposed to be made at constructor.");
103-
104-
delegate_.onSessionEvent(TransportDelegate::SessionEvent::MsgRxLifetime{false /* is_added */});
92+
delegate_.cancelRxSubscriptionFor(subscription_, CanardTransferKindMessage);
93+
delegate_.onSessionEvent(TransportDelegate::SessionEvent::MsgDestroyed{});
10594
}
10695

10796
private:
@@ -141,26 +130,15 @@ class MessageRxSession final : private IRxSessionDelegate, public IMessageRxSess
141130

142131
// MARK: IRxSessionDelegate
143132

144-
void acceptRxTransfer(const CanardRxTransfer& transfer) override
133+
void acceptRxTransfer(CanardMemory&& lizard_memory,
134+
const TransferRxMetadata& rx_metadata,
135+
const NodeId source_node_id) override
145136
{
146-
const auto priority = static_cast<Priority>(transfer.metadata.priority);
147-
const auto transfer_id = static_cast<TransferId>(transfer.metadata.transfer_id);
148-
const auto timestamp = TimePoint{std::chrono::microseconds{transfer.timestamp_usec}};
149-
150137
const cetl::optional<NodeId> publisher_node_id =
151-
transfer.metadata.remote_node_id > CANARD_NODE_ID_MAX
152-
? cetl::nullopt
153-
: cetl::make_optional<NodeId>(transfer.metadata.remote_node_id);
154-
155-
// No Sonar `cpp:S5356` and `cpp:S5357` b/c we need to pass raw data from C libcanard api.
156-
auto* const buffer = static_cast<cetl::byte*>(transfer.payload.data); // NOSONAR cpp:S5356 cpp:S5357
157-
TransportDelegate::CanardMemory canard_memory{delegate_,
158-
transfer.payload.allocated_size,
159-
buffer,
160-
transfer.payload.size};
161-
162-
const MessageRxMetadata meta{{{transfer_id, priority}, timestamp}, publisher_node_id};
163-
MessageRxTransfer msg_rx_transfer{meta, ScatteredBuffer{std::move(canard_memory)}};
138+
source_node_id > CANARD_NODE_ID_MAX ? cetl::nullopt : cetl::make_optional(source_node_id);
139+
140+
const MessageRxMetadata meta{rx_metadata, publisher_node_id};
141+
MessageRxTransfer msg_rx_transfer{meta, ScatteredBuffer{std::move(lizard_memory)}};
164142
if (on_receive_cb_fn_)
165143
{
166144
on_receive_cb_fn_(OnReceiveCallback::Arg{msg_rx_transfer});

include/libcyphal/transport/can/msg_tx_session.hpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,17 @@ class MessageTxSession final : public IMessageTxSession
4242
};
4343

4444
public:
45-
CETL_NODISCARD static Expected<UniquePtr<IMessageTxSession>, AnyFailure> make(TransportDelegate& delegate,
46-
const MessageTxParams& params)
45+
CETL_NODISCARD static Expected<UniquePtr<IMessageTxSession>, AnyFailure> make( //
46+
cetl::pmr::memory_resource& memory,
47+
TransportDelegate& delegate,
48+
const MessageTxParams& params)
4749
{
4850
if (params.subject_id > CANARD_SUBJECT_ID_MAX)
4951
{
5052
return ArgumentError{};
5153
}
5254

53-
auto session = libcyphal::detail::makeUniquePtr<Spec>(delegate.memory(), Spec{}, delegate, params);
55+
auto session = libcyphal::detail::makeUniquePtr<Spec>(memory, Spec{}, delegate, params);
5456
if (session == nullptr)
5557
{
5658
return MemoryError{};
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/// @copyright
2+
/// Copyright (C) OpenCyphal Development Team <opencyphal.org>
3+
/// Copyright Amazon.com Inc. or its affiliates.
4+
/// SPDX-License-Identifier: MIT
5+
6+
#ifndef LIBCYPHAL_TRANSPORT_CAN_RX_SESSION_TREE_NODE_HPP_INCLUDED
7+
#define LIBCYPHAL_TRANSPORT_CAN_RX_SESSION_TREE_NODE_HPP_INCLUDED
8+
9+
#include "libcyphal/transport/session_tree.hpp"
10+
11+
namespace libcyphal
12+
{
13+
namespace transport
14+
{
15+
namespace can
16+
{
17+
18+
/// Internal implementation details of the UDP transport.
19+
/// Not supposed to be used directly by the users of the library.
20+
///
21+
namespace detail
22+
{
23+
24+
class IRxSessionDelegate;
25+
26+
/// Umbrella type for various RX session tree nodes in use at the CAN transport.
27+
///
28+
/// Currently, it contains only one `Response` subtype,
29+
/// but still kept nested to match the UDP transport approach (where there are several subtypes).
30+
///
31+
struct RxSessionTreeNode
32+
{
33+
/// @brief Represents a service response RX session node.
34+
///
35+
using Response = transport::detail::ResponseRxSessionNode<IRxSessionDelegate>;
36+
37+
}; // RxSessionTreeNode
38+
39+
} // namespace detail
40+
} // namespace can
41+
} // namespace transport
42+
} // namespace libcyphal
43+
44+
#endif // LIBCYPHAL_TRANSPORT_CAN_RX_SESSION_TREE_NODE_HPP_INCLUDED

0 commit comments

Comments
 (0)