Skip to content

Commit cf5257c

Browse files
authored
Merge pull request #130 from jefferyyjhsu/add-transient-local-durability-to-intra-process-pub-sub-irobot/humble
Add transient local durability to intra process pub sub irobot/humble
2 parents b99681f + 3169c07 commit cf5257c

20 files changed

+849
-78
lines changed

rclcpp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ set(${PROJECT_NAME}_SRCS
4646
src/rclcpp/context.cpp
4747
src/rclcpp/contexts/default_context.cpp
4848
src/rclcpp/detail/add_guard_condition_to_rcl_wait_set.cpp
49+
src/rclcpp/detail/resolve_intra_process_buffer_type.cpp
4950
src/rclcpp/detail/resolve_parameter_overrides.cpp
5051
src/rclcpp/detail/rmw_implementation_specific_payload.cpp
5152
src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp

rclcpp/include/rclcpp/detail/resolve_intra_process_buffer_type.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ resolve_intra_process_buffer_type(
4747
return resolved_buffer_type;
4848
}
4949

50+
RCLCPP_PUBLIC
51+
rclcpp::IntraProcessBufferType
52+
resolve_intra_process_buffer_type(
53+
const rclcpp::IntraProcessBufferType buffer_type);
54+
5055
} // namespace detail
5156

5257
} // namespace rclcpp

rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_
1616
#define RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_
1717

18+
#include <vector>
19+
1820
namespace rclcpp
1921
{
2022
namespace experimental
@@ -31,6 +33,8 @@ class BufferImplementationBase
3133
virtual BufferT dequeue() = 0;
3234
virtual void enqueue(BufferT request) = 0;
3335

36+
virtual std::vector<BufferT> get_all_data() = 0;
37+
3438
virtual void clear() = 0;
3539
virtual bool has_data() const = 0;
3640
};

rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <stdexcept>
2020
#include <type_traits>
2121
#include <utility>
22+
#include <vector>
2223

2324
#include "rclcpp/allocator/allocator_common.hpp"
2425
#include "rclcpp/allocator/allocator_deleter.hpp"
@@ -64,6 +65,9 @@ class IntraProcessBuffer : public IntraProcessBufferBase
6465

6566
virtual MessageSharedPtr consume_shared() = 0;
6667
virtual MessageUniquePtr consume_unique() = 0;
68+
69+
virtual std::vector<MessageSharedPtr> get_all_data_shared() = 0;
70+
virtual std::vector<MessageUniquePtr> get_all_data_unique() = 0;
6771
};
6872

6973
template<
@@ -123,6 +127,16 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
123127
return consume_unique_impl<BufferT>();
124128
}
125129

130+
std::vector<MessageSharedPtr> get_all_data_shared() override
131+
{
132+
return get_all_data_shared_impl();
133+
}
134+
135+
std::vector<MessageUniquePtr> get_all_data_unique() override
136+
{
137+
return get_all_data_unique_impl();
138+
}
139+
126140
bool has_data() const override
127141
{
128142
return buffer_->has_data();
@@ -232,6 +246,71 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
232246
{
233247
return buffer_->dequeue();
234248
}
249+
250+
// MessageSharedPtr to MessageSharedPtr
251+
template<typename T = BufferT>
252+
typename std::enable_if<
253+
std::is_same<T, MessageSharedPtr>::value,
254+
std::vector<MessageSharedPtr>
255+
>::type
256+
get_all_data_shared_impl()
257+
{
258+
return buffer_->get_all_data();
259+
}
260+
261+
// MessageUniquePtr to MessageSharedPtr
262+
template<typename T = BufferT>
263+
typename std::enable_if<
264+
std::is_same<T, MessageUniquePtr>::value,
265+
std::vector<MessageSharedPtr>
266+
>::type
267+
get_all_data_shared_impl()
268+
{
269+
std::vector<MessageSharedPtr> result;
270+
auto uni_ptr_vec = buffer_->get_all_data();
271+
result.reserve(uni_ptr_vec.size());
272+
for (MessageUniquePtr & uni_ptr : uni_ptr_vec) {
273+
result.emplace_back(std::move(uni_ptr));
274+
}
275+
return result;
276+
}
277+
278+
// MessageSharedPtr to MessageUniquePtr
279+
template<typename T = BufferT>
280+
typename std::enable_if<
281+
std::is_same<T, MessageSharedPtr>::value,
282+
std::vector<MessageUniquePtr>
283+
>::type
284+
get_all_data_unique_impl()
285+
{
286+
std::vector<MessageUniquePtr> result;
287+
auto shared_ptr_vec = buffer_->get_all_data();
288+
result.reserve(shared_ptr_vec.size());
289+
for (MessageSharedPtr shared_msg : shared_ptr_vec) {
290+
MessageUniquePtr unique_msg;
291+
MessageDeleter * deleter = std::get_deleter<MessageDeleter, const MessageT>(shared_msg);
292+
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
293+
MessageAllocTraits::construct(*message_allocator_.get(), ptr, *shared_msg);
294+
if (deleter) {
295+
unique_msg = MessageUniquePtr(ptr, *deleter);
296+
} else {
297+
unique_msg = MessageUniquePtr(ptr);
298+
}
299+
result.push_back(std::move(unique_msg));
300+
}
301+
return result;
302+
}
303+
304+
// MessageUniquePtr to MessageUniquePtr
305+
template<typename T = BufferT>
306+
typename std::enable_if<
307+
std::is_same<T, MessageUniquePtr>::value,
308+
std::vector<MessageUniquePtr>
309+
>::type
310+
get_all_data_unique_impl()
311+
{
312+
return buffer_->get_all_data();
313+
}
235314
};
236315

237316
template<typename BufferT>

rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_
1616
#define RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_
1717

18+
#include <memory>
1819
#include <mutex>
1920
#include <stdexcept>
2021
#include <utility>
@@ -97,6 +98,17 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
9798
return request;
9899
}
99100

101+
/// Get all the elements from the ring buffer
102+
/**
103+
* This member function is thread-safe.
104+
*
105+
* \return a vector containing all the elements from the ring buffer
106+
*/
107+
std::vector<BufferT> get_all_data() override
108+
{
109+
return get_all_data_impl();
110+
}
111+
100112
/// Get the next index value for the ring buffer
101113
/**
102114
* This member function is thread-safe.
@@ -173,6 +185,71 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
173185
return size_ == capacity_;
174186
}
175187

188+
/// Traits for checking if a type is std::unique_ptr
189+
template<typename ...>
190+
struct is_std_unique_ptr final : std::false_type {};
191+
template<class T, typename ... Args>
192+
struct is_std_unique_ptr<std::unique_ptr<T, Args...>> final : std::true_type
193+
{
194+
typedef T Ptr_type;
195+
};
196+
197+
/// Get all the elements from the ring buffer
198+
/**
199+
* This member function is thread-safe.
200+
* Two versions for the implementation of the function.
201+
* One for buffer containing unique_ptr and the other for other types
202+
*
203+
* \return a vector containing all the elements from the ring buffer
204+
*/
205+
template<typename T = BufferT, std::enable_if_t<is_std_unique_ptr<T>::value &&
206+
std::is_copy_constructible<
207+
typename is_std_unique_ptr<T>::Ptr_type
208+
>::value,
209+
void> * = nullptr>
210+
std::vector<BufferT> get_all_data_impl()
211+
{
212+
std::lock_guard<std::mutex> lock(mutex_);
213+
std::vector<BufferT> result_vtr;
214+
result_vtr.reserve(size_);
215+
for (size_t id = 0; id < size_; ++id) {
216+
result_vtr.emplace_back(
217+
new typename is_std_unique_ptr<T>::Ptr_type(
218+
*(ring_buffer_[(read_index_ + id) % capacity_])));
219+
}
220+
return result_vtr;
221+
}
222+
223+
template<typename T = BufferT, std::enable_if_t<
224+
std::is_copy_constructible<T>::value, void> * = nullptr>
225+
std::vector<BufferT> get_all_data_impl()
226+
{
227+
std::lock_guard<std::mutex> lock(mutex_);
228+
std::vector<BufferT> result_vtr;
229+
result_vtr.reserve(size_);
230+
for (size_t id = 0; id < size_; ++id) {
231+
result_vtr.emplace_back(ring_buffer_[(read_index_ + id) % capacity_]);
232+
}
233+
return result_vtr;
234+
}
235+
236+
template<typename T = BufferT, std::enable_if_t<!is_std_unique_ptr<T>::value &&
237+
!std::is_copy_constructible<T>::value, void> * = nullptr>
238+
std::vector<BufferT> get_all_data_impl()
239+
{
240+
throw std::logic_error("Underlined type results in invalid get_all_data_impl()");
241+
return {};
242+
}
243+
244+
template<typename T = BufferT, std::enable_if_t<is_std_unique_ptr<T>::value &&
245+
!std::is_copy_constructible<typename is_std_unique_ptr<T>::Ptr_type>::value,
246+
void> * = nullptr>
247+
std::vector<BufferT> get_all_data_impl()
248+
{
249+
throw std::logic_error("Underlined type in unique_ptr results in invalid get_all_data_impl()");
250+
return {};
251+
}
252+
176253
size_t capacity_;
177254

178255
std::vector<BufferT> ring_buffer_;

0 commit comments

Comments
 (0)