Skip to content

Commit 8b6a033

Browse files
jefferyyjhsuapojomovsky
authored andcommitted
Add transient local durability support to publisher and subscriptions when using intra-process communication (ros2#2303)
* Add intra process transient local durability support to publisher and subscription Signed-off-by: Jeffery Hsu <[email protected]> * Remove durability_is_transient_local_ from publisher_base Signed-off-by: Jeffery Hsu <[email protected]> * Design changes that move most transient local publish functionalities out of intra process manager into intra process manager Signed-off-by: Jeffery Hsu <[email protected]> * Move transient local publish to a separate function Signed-off-by: Jeffery Hsu <[email protected]> * Remove publisher buffer weak ptr from intra process manager when it associated publisher is removed. Signed-off-by: Jeffery Hsu <[email protected]> * Remove incorrectly placed RCLCPP_PUBLIC Signed-off-by: Jeffery Hsu <[email protected]> * Add missing RCLCPP_PUBLIC Signed-off-by: Jeffery Hsu <[email protected]> * Expand RingBufferImplementation beyond shared_ptr and unique_ptr Signed-off-by: Jeffery Hsu <[email protected]> * Comment and format fix Signed-off-by: Jeffery Hsu <[email protected]> --------- Signed-off-by: Jeffery Hsu <[email protected]>
1 parent d4dd4e4 commit 8b6a033

21 files changed

+1062
-356
lines changed

rclcpp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ set(${PROJECT_NAME}_SRCS
4646
src/rclcpp/context.cpp
4747
src/rclcpp/contexts/default_context.cpp
4848
src/rclcpp/detail/add_guard_condition_to_rcl_wait_set.cpp
49+
src/rclcpp/detail/resolve_intra_process_buffer_type.cpp
4950
src/rclcpp/detail/resolve_parameter_overrides.cpp
5051
src/rclcpp/detail/rmw_implementation_specific_payload.cpp
5152
src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp

rclcpp/include/rclcpp/detail/resolve_intra_process_buffer_type.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ resolve_intra_process_buffer_type(
4747
return resolved_buffer_type;
4848
}
4949

50+
RCLCPP_PUBLIC
51+
rclcpp::IntraProcessBufferType
52+
resolve_intra_process_buffer_type(
53+
const rclcpp::IntraProcessBufferType buffer_type);
54+
5055
} // namespace detail
5156

5257
} // namespace rclcpp

rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_
1616
#define RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_
1717

18+
#include <vector>
19+
1820
namespace rclcpp
1921
{
2022
namespace experimental
@@ -31,6 +33,8 @@ class BufferImplementationBase
3133
virtual BufferT dequeue() = 0;
3234
virtual void enqueue(BufferT request) = 0;
3335

36+
virtual std::vector<BufferT> get_all_data() = 0;
37+
3438
virtual void clear() = 0;
3539
virtual bool has_data() const = 0;
3640
};

rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <stdexcept>
2020
#include <type_traits>
2121
#include <utility>
22+
#include <vector>
2223

2324
#include "rclcpp/allocator/allocator_common.hpp"
2425
#include "rclcpp/allocator/allocator_deleter.hpp"
@@ -65,6 +66,9 @@ class IntraProcessBuffer : public IntraProcessBufferBase
6566

6667
virtual MessageSharedPtr consume_shared() = 0;
6768
virtual MessageUniquePtr consume_unique() = 0;
69+
70+
virtual std::vector<MessageSharedPtr> get_all_data_shared() = 0;
71+
virtual std::vector<MessageUniquePtr> get_all_data_unique() = 0;
6872
};
6973

7074
template<
@@ -128,6 +132,16 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
128132
return consume_unique_impl<BufferT>();
129133
}
130134

135+
std::vector<MessageSharedPtr> get_all_data_shared() override
136+
{
137+
return get_all_data_shared_impl();
138+
}
139+
140+
std::vector<MessageUniquePtr> get_all_data_unique() override
141+
{
142+
return get_all_data_unique_impl();
143+
}
144+
131145
bool has_data() const override
132146
{
133147
return buffer_->has_data();
@@ -237,6 +251,71 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
237251
{
238252
return buffer_->dequeue();
239253
}
254+
255+
// MessageSharedPtr to MessageSharedPtr
256+
template<typename T = BufferT>
257+
typename std::enable_if<
258+
std::is_same<T, MessageSharedPtr>::value,
259+
std::vector<MessageSharedPtr>
260+
>::type
261+
get_all_data_shared_impl()
262+
{
263+
return buffer_->get_all_data();
264+
}
265+
266+
// MessageUniquePtr to MessageSharedPtr
267+
template<typename T = BufferT>
268+
typename std::enable_if<
269+
std::is_same<T, MessageUniquePtr>::value,
270+
std::vector<MessageSharedPtr>
271+
>::type
272+
get_all_data_shared_impl()
273+
{
274+
std::vector<MessageSharedPtr> result;
275+
auto uni_ptr_vec = buffer_->get_all_data();
276+
result.reserve(uni_ptr_vec.size());
277+
for (MessageUniquePtr & uni_ptr : uni_ptr_vec) {
278+
result.emplace_back(std::move(uni_ptr));
279+
}
280+
return result;
281+
}
282+
283+
// MessageSharedPtr to MessageUniquePtr
284+
template<typename T = BufferT>
285+
typename std::enable_if<
286+
std::is_same<T, MessageSharedPtr>::value,
287+
std::vector<MessageUniquePtr>
288+
>::type
289+
get_all_data_unique_impl()
290+
{
291+
std::vector<MessageUniquePtr> result;
292+
auto shared_ptr_vec = buffer_->get_all_data();
293+
result.reserve(shared_ptr_vec.size());
294+
for (MessageSharedPtr shared_msg : shared_ptr_vec) {
295+
MessageUniquePtr unique_msg;
296+
MessageDeleter * deleter = std::get_deleter<MessageDeleter, const MessageT>(shared_msg);
297+
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
298+
MessageAllocTraits::construct(*message_allocator_.get(), ptr, *shared_msg);
299+
if (deleter) {
300+
unique_msg = MessageUniquePtr(ptr, *deleter);
301+
} else {
302+
unique_msg = MessageUniquePtr(ptr);
303+
}
304+
result.push_back(std::move(unique_msg));
305+
}
306+
return result;
307+
}
308+
309+
// MessageUniquePtr to MessageUniquePtr
310+
template<typename T = BufferT>
311+
typename std::enable_if<
312+
std::is_same<T, MessageUniquePtr>::value,
313+
std::vector<MessageUniquePtr>
314+
>::type
315+
get_all_data_unique_impl()
316+
{
317+
return buffer_->get_all_data();
318+
}
240319
};
241320

242321
template<typename BufferT>

rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_
1616
#define RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_
1717

18+
#include <memory>
1819
#include <mutex>
1920
#include <stdexcept>
2021
#include <utility>
@@ -110,6 +111,17 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
110111
return request;
111112
}
112113

114+
/// Get all the elements from the ring buffer
115+
/**
116+
* This member function is thread-safe.
117+
*
118+
* \return a vector containing all the elements from the ring buffer
119+
*/
120+
std::vector<BufferT> get_all_data() override
121+
{
122+
return get_all_data_impl();
123+
}
124+
113125
/// Get the next index value for the ring buffer
114126
/**
115127
* This member function is thread-safe.
@@ -189,6 +201,71 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
189201
return size_ == capacity_;
190202
}
191203

204+
/// Traits for checking if a type is std::unique_ptr
205+
template<typename ...>
206+
struct is_std_unique_ptr final : std::false_type {};
207+
template<class T, typename ... Args>
208+
struct is_std_unique_ptr<std::unique_ptr<T, Args...>> final : std::true_type
209+
{
210+
typedef T Ptr_type;
211+
};
212+
213+
/// Get all the elements from the ring buffer
214+
/**
215+
* This member function is thread-safe.
216+
* Two versions for the implementation of the function.
217+
* One for buffer containing unique_ptr and the other for other types
218+
*
219+
* \return a vector containing all the elements from the ring buffer
220+
*/
221+
template<typename T = BufferT, std::enable_if_t<is_std_unique_ptr<T>::value &&
222+
std::is_copy_constructible<
223+
typename is_std_unique_ptr<T>::Ptr_type
224+
>::value,
225+
void> * = nullptr>
226+
std::vector<BufferT> get_all_data_impl()
227+
{
228+
std::lock_guard<std::mutex> lock(mutex_);
229+
std::vector<BufferT> result_vtr;
230+
result_vtr.reserve(size_);
231+
for (size_t id = 0; id < size_; ++id) {
232+
result_vtr.emplace_back(
233+
new typename is_std_unique_ptr<T>::Ptr_type(
234+
*(ring_buffer_[(read_index_ + id) % capacity_])));
235+
}
236+
return result_vtr;
237+
}
238+
239+
template<typename T = BufferT, std::enable_if_t<
240+
std::is_copy_constructible<T>::value, void> * = nullptr>
241+
std::vector<BufferT> get_all_data_impl()
242+
{
243+
std::lock_guard<std::mutex> lock(mutex_);
244+
std::vector<BufferT> result_vtr;
245+
result_vtr.reserve(size_);
246+
for (size_t id = 0; id < size_; ++id) {
247+
result_vtr.emplace_back(ring_buffer_[(read_index_ + id) % capacity_]);
248+
}
249+
return result_vtr;
250+
}
251+
252+
template<typename T = BufferT, std::enable_if_t<!is_std_unique_ptr<T>::value &&
253+
!std::is_copy_constructible<T>::value, void> * = nullptr>
254+
std::vector<BufferT> get_all_data_impl()
255+
{
256+
throw std::logic_error("Underlined type results in invalid get_all_data_impl()");
257+
return {};
258+
}
259+
260+
template<typename T = BufferT, std::enable_if_t<is_std_unique_ptr<T>::value &&
261+
!std::is_copy_constructible<typename is_std_unique_ptr<T>::Ptr_type>::value,
262+
void> * = nullptr>
263+
std::vector<BufferT> get_all_data_impl()
264+
{
265+
throw std::logic_error("Underlined type in unique_ptr results in invalid get_all_data_impl()");
266+
return {};
267+
}
268+
192269
size_t capacity_;
193270

194271
std::vector<BufferT> ring_buffer_;

0 commit comments

Comments
 (0)