Skip to content

Commit 2443c94

Browse files
committed
Introduced ScatteredBuffer::IFragmentsObserver interface
1 parent c26d86e commit 2443c94

File tree

7 files changed

+112
-40
lines changed

7 files changed

+112
-40
lines changed

include/libcyphal/transport/can/delegate.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,14 @@ class CanardMemory final : public ScatteredBuffer::IStorage
101101
return bytes_to_copy;
102102
}
103103

104+
void observeFragments(ScatteredBuffer::IFragmentsObserver& observer) const override
105+
{
106+
if ((buffer_ != nullptr) && (payload_size_ > 0))
107+
{
108+
observer.onNext({buffer_, payload_size_});
109+
}
110+
}
111+
104112
private:
105113
// MARK: Data members:
106114

include/libcyphal/transport/msg_sessions.hpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#define LIBCYPHAL_TRANSPORT_MSG_SESSIONS_HPP_INCLUDED
88

99
#include "errors.hpp"
10+
#include "scattered_buffer.hpp"
1011
#include "session.hpp"
1112
#include "types.hpp"
1213

@@ -34,6 +35,18 @@ struct MessageTxParams final
3435
PortId subject_id{};
3536
};
3637

38+
struct MessageRxMetadata final
39+
{
40+
TransferRxMetadata rx_meta{};
41+
cetl::optional<NodeId> publisher_node_id;
42+
};
43+
44+
struct MessageRxTransfer final
45+
{
46+
MessageRxMetadata metadata{};
47+
ScatteredBuffer payload;
48+
};
49+
3750
/// @brief Defines an abstract interface of a transport layer receive session for message subscription.
3851
///
3952
/// Use transport's `makeMessageRxSession` factory function to create an instance of this interface.

include/libcyphal/transport/scattered_buffer.hpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#define LIBCYPHAL_TRANSPORT_SCATTERED_BUFFER_HPP_INCLUDED
88

99
#include "libcyphal/config.hpp"
10+
#include "types.hpp"
1011

1112
#include <cetl/pf17/cetlpf.hpp>
1213
#include <cetl/rtti.hpp>
@@ -33,6 +34,26 @@ class ScatteredBuffer final
3334
///
3435
static constexpr std::size_t StorageVariantFootprint = config::Transport::ScatteredBuffer_StorageVariantFootprint();
3536

37+
/// @brief Defines interface for observing internal fragments of the scattered buffer.
38+
///
39+
class IFragmentsObserver
40+
{
41+
public:
42+
IFragmentsObserver(const IFragmentsObserver&) = delete;
43+
IFragmentsObserver& operator=(const IFragmentsObserver&) = delete;
44+
IFragmentsObserver& operator=(IFragmentsObserver&&) noexcept = delete;
45+
IFragmentsObserver(IFragmentsObserver&&) noexcept = delete;
46+
47+
/// @brief Notifies the observer about the next fragment of the scattered buffer.
48+
///
49+
virtual void onNext(const PayloadFragment fragment) = 0;
50+
51+
protected:
52+
IFragmentsObserver() = default;
53+
~IFragmentsObserver() = default;
54+
55+
}; // IFragmentsObserver
56+
3657
/// @brief Defines storage interface for the scattered buffer.
3758
///
3859
/// @see ScatteredBuffer::ScatteredBuffer(AnyStorage&& any_storage)
@@ -72,6 +93,12 @@ class ScatteredBuffer final
7293
cetl::byte* const destination,
7394
const std::size_t length_bytes) const = 0;
7495

96+
/// @brief Reports the internal fragments of the storage to the specified observer.
97+
///
98+
/// @param observer The observer will be called (by `onNext` method) for each fragment of the storage.
99+
///
100+
virtual void observeFragments(IFragmentsObserver& observer) const = 0;
101+
75102
// MARK: RTTI
76103

77104
static constexpr cetl::type_id _get_type_id_() noexcept
@@ -201,6 +228,18 @@ class ScatteredBuffer final
201228
return storage_->copy(offset_bytes, static_cast<cetl::byte*>(destination), length_bytes);
202229
}
203230

231+
/// @brief Reports the internal fragments of the buffer to the specified observer.
232+
///
233+
/// @param observer The observer will be called (by `onNext` method) for each fragment of the buffer.
234+
///
235+
void observeFragments(IFragmentsObserver& observer) const
236+
{
237+
if (const auto* const storage = storage_)
238+
{
239+
storage->observeFragments(observer);
240+
}
241+
}
242+
204243
private:
205244
cetl::unbounded_variant<StorageVariantFootprint, false, true> storage_variant_;
206245
const IStorage* storage_;

include/libcyphal/transport/svc_sessions.hpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#define LIBCYPHAL_TRANSPORT_SVC_SESSION_HPP_INCLUDED
88

99
#include "errors.hpp"
10+
#include "scattered_buffer.hpp"
1011
#include "session.hpp"
1112
#include "types.hpp"
1213

@@ -46,6 +47,24 @@ struct ResponseTxParams final
4647
PortId service_id{};
4748
};
4849

50+
struct ServiceTxMetadata final
51+
{
52+
TransferTxMetadata tx_meta{};
53+
NodeId remote_node_id{};
54+
};
55+
56+
struct ServiceRxMetadata final
57+
{
58+
TransferRxMetadata rx_meta{};
59+
NodeId remote_node_id{};
60+
};
61+
62+
struct ServiceRxTransfer final
63+
{
64+
ServiceRxMetadata metadata{};
65+
ScatteredBuffer payload;
66+
};
67+
4968
/// @brief Defines an abstract interface of a transport layer receive session for service.
5069
///
5170
/// @see IRxSession, ISession

include/libcyphal/transport/transport.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ namespace libcyphal
2121
namespace transport
2222
{
2323

24+
struct ProtocolParams final
25+
{
26+
TransferId transfer_id_modulo{};
27+
std::size_t mtu_bytes{};
28+
NodeId max_nodes{};
29+
};
30+
2431
/// @brief Interface for a transport layer.
2532
///
2633
class ITransport

include/libcyphal/transport/types.hpp

Lines changed: 9 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
#ifndef LIBCYPHAL_TRANSPORT_TYPES_HPP_INCLUDED
77
#define LIBCYPHAL_TRANSPORT_TYPES_HPP_INCLUDED
88

9-
#include "scattered_buffer.hpp"
10-
119
#include "libcyphal/types.hpp"
1210

1311
#include <cetl/pf17/cetlpf.hpp>
@@ -48,59 +46,32 @@ enum class Priority : std::uint8_t
4846
Optional = 7,
4947
};
5048

51-
struct ProtocolParams final
52-
{
53-
TransferId transfer_id_modulo{};
54-
std::size_t mtu_bytes{};
55-
NodeId max_nodes{};
56-
};
49+
/// @brief Defines immutable fragment of raw data (as span of const bytes).
50+
///
51+
using PayloadFragment = cetl::span<const cetl::byte>;
52+
53+
/// @brief Defines a span of immutable raw data fragments.
54+
///
55+
using PayloadFragments = cetl::span<const PayloadFragment>;
5756

5857
struct TransferMetadata final
5958
{
6059
TransferId transfer_id{};
6160
Priority priority{};
6261
};
62+
6363
struct TransferTxMetadata final
6464
{
6565
TransferMetadata base{};
6666
TimePoint deadline;
6767
};
68+
6869
struct TransferRxMetadata final
6970
{
7071
TransferMetadata base{};
7172
TimePoint timestamp;
7273
};
7374

74-
/// @brief Defines a span of immutable fragments of payload.
75-
using PayloadFragments = cetl::span<const cetl::span<const cetl::byte>>;
76-
77-
struct MessageRxMetadata final
78-
{
79-
TransferRxMetadata rx_meta{};
80-
cetl::optional<NodeId> publisher_node_id;
81-
};
82-
struct MessageRxTransfer final
83-
{
84-
MessageRxMetadata metadata{};
85-
ScatteredBuffer payload;
86-
};
87-
88-
struct ServiceTxMetadata final
89-
{
90-
TransferTxMetadata tx_meta{};
91-
NodeId remote_node_id{};
92-
};
93-
struct ServiceRxMetadata final
94-
{
95-
TransferRxMetadata rx_meta{};
96-
NodeId remote_node_id{};
97-
};
98-
struct ServiceRxTransfer final
99-
{
100-
ServiceRxMetadata metadata{};
101-
ScatteredBuffer payload;
102-
};
103-
10475
} // namespace transport
10576
} // namespace libcyphal
10677

include/libcyphal/transport/udp/delegate.hpp

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ class UdpardMemory final : public ScatteredBuffer::IStorage
152152

153153
// Find first fragment to start from (according to source `offset_bytes`).
154154
//
155-
std::size_t src_offset = 0;
156-
const struct UdpardFragment* frag = &payload_;
155+
std::size_t src_offset = 0;
156+
const UdpardFragment* frag = &payload_;
157157
while ((nullptr != frag) && (offset_bytes >= (src_offset + frag->view.size)))
158158
{
159159
src_offset += frag->view.size;
@@ -190,6 +190,21 @@ class UdpardMemory final : public ScatteredBuffer::IStorage
190190
return total_bytes_copied;
191191
}
192192

193+
void observeFragments(ScatteredBuffer::IFragmentsObserver& observer) const override
194+
{
195+
const UdpardFragment* fragment = &payload_;
196+
while (nullptr != fragment)
197+
{
198+
const auto& frag_view = fragment->view;
199+
if ((nullptr != frag_view.data) && (frag_view.size > 0))
200+
{
201+
observer.onNext({static_cast<const cetl::byte*>(frag_view.data), frag_view.size});
202+
}
203+
204+
fragment = fragment->next;
205+
}
206+
}
207+
193208
private:
194209
// MARK: Data members:
195210

0 commit comments

Comments
 (0)