Skip to content

Commit de841d9

Browse files
authored
Support users holding onto shared pointers in the message memory pool (ros2#2336)
* Support users holding onto shared pointers in the message memory pool Before this commit, the MessageMemoryPool would actually reuse messages in the pool, even if the user had taken additional shared_ptr copies. This commit fixes things so that we properly handle that situation. In particular, we allocate memory during class initialization, and delete it during destruction. We then run the constructor when we hand the pointer out, and the destructor (only) when we return it to the pool. This keeps things consistent. We also add in locks, since in a multi-threaded scenario we need to protect against multiple threads accessing the pool at the same time. With this in place, things work as expected when users hold shared_ptr copies. We also add in a test for this situation. One note about performance: this update preserves the "no-allocations-at-runtime" aspect of the MessagePool. However, there are some tradeoffs with CPU time here, particularly with very large message pools. This could probably be optimized further to do less work when trying to add items back to the free_list, but I view that as a further enhancement. Signed-off-by: Chris Lalancette <[email protected]>
1 parent 9c098e5 commit de841d9

File tree

2 files changed

+105
-35
lines changed

2 files changed

+105
-35
lines changed

rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp

Lines changed: 85 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,17 @@
1515
#ifndef RCLCPP__STRATEGIES__MESSAGE_POOL_MEMORY_STRATEGY_HPP_
1616
#define RCLCPP__STRATEGIES__MESSAGE_POOL_MEMORY_STRATEGY_HPP_
1717

18+
#include <array>
19+
#include <cstring>
1820
#include <memory>
21+
#include <mutex>
22+
#include <stdexcept>
23+
#include <type_traits>
1924

2025
#include "rosidl_runtime_cpp/traits.hpp"
2126

27+
#include "rclcpp/logger.hpp"
28+
#include "rclcpp/logging.hpp"
2229
#include "rclcpp/macros.hpp"
2330
#include "rclcpp/message_memory_strategy.hpp"
2431
#include "rclcpp/visibility_control.hpp"
@@ -50,13 +57,24 @@ class MessagePoolMemoryStrategy
5057
public:
5158
RCLCPP_SMART_PTR_DEFINITIONS(MessagePoolMemoryStrategy)
5259

53-
/// Default constructor
5460
MessagePoolMemoryStrategy()
55-
: next_array_index_(0)
5661
{
62+
pool_mutex_ = std::make_shared<std::mutex>();
63+
64+
pool_ = std::shared_ptr<std::array<MessageT *, Size>>(
65+
new std::array<MessageT *, Size>,
66+
[](std::array<MessageT *, Size> * arr) {
67+
for (size_t i = 0; i < Size; ++i) {
68+
free((*arr)[i]);
69+
}
70+
delete arr;
71+
});
72+
73+
free_list_ = std::make_shared<CircularArray<Size>>();
74+
5775
for (size_t i = 0; i < Size; ++i) {
58-
pool_[i].msg_ptr_ = std::make_shared<MessageT>();
59-
pool_[i].used = false;
76+
(*pool_)[i] = static_cast<MessageT *>(malloc(sizeof(MessageT)));
77+
free_list_->push_back(i);
6078
}
6179
}
6280

@@ -68,43 +86,85 @@ class MessagePoolMemoryStrategy
6886
*/
6987
std::shared_ptr<MessageT> borrow_message()
7088
{
71-
size_t current_index = next_array_index_;
72-
next_array_index_ = (next_array_index_ + 1) % Size;
73-
if (pool_[current_index].used) {
74-
throw std::runtime_error("Tried to access message that was still in use! Abort.");
89+
std::lock_guard<std::mutex> lock(*pool_mutex_);
90+
if (free_list_->size() == 0) {
91+
throw std::runtime_error("No more free slots in the pool");
7592
}
76-
pool_[current_index].msg_ptr_->~MessageT();
77-
new (pool_[current_index].msg_ptr_.get())MessageT;
7893

79-
pool_[current_index].used = true;
80-
return pool_[current_index].msg_ptr_;
94+
size_t current_index = free_list_->pop_front();
95+
96+
return std::shared_ptr<MessageT>(
97+
new((*pool_)[current_index]) MessageT(),
98+
[pool = this->pool_, pool_mutex = this->pool_mutex_,
99+
free_list = this->free_list_](MessageT * p) {
100+
std::lock_guard<std::mutex> lock(*pool_mutex);
101+
for (size_t i = 0; i < Size; ++i) {
102+
if ((*pool)[i] == p) {
103+
p->~MessageT();
104+
free_list->push_back(i);
105+
break;
106+
}
107+
}
108+
});
81109
}
82110

83111
/// Return a message to the message pool.
84112
/**
85-
* Manage metadata in the message pool ring buffer to release the message.
113+
* This does nothing since the message isn't returned to the pool until the user has dropped
114+
* all references.
86115
* \param[in] msg Shared pointer to the message to return.
87116
*/
88117
void return_message(std::shared_ptr<MessageT> & msg)
89118
{
90-
for (size_t i = 0; i < Size; ++i) {
91-
if (pool_[i].msg_ptr_ == msg) {
92-
pool_[i].used = false;
93-
return;
94-
}
95-
}
96-
throw std::runtime_error("Unrecognized message ptr in return_message.");
119+
(void)msg;
97120
}
98121

99122
protected:
100-
struct PoolMember
123+
template<size_t N>
124+
class CircularArray
101125
{
102-
std::shared_ptr<MessageT> msg_ptr_;
103-
bool used;
126+
public:
127+
void push_back(const size_t v)
128+
{
129+
if (size_ + 1 > N) {
130+
throw std::runtime_error("Tried to push too many items into the array");
131+
}
132+
array_[(front_ + size_) % N] = v;
133+
++size_;
134+
}
135+
136+
size_t pop_front()
137+
{
138+
if (size_ < 1) {
139+
throw std::runtime_error("Tried to pop item from empty array");
140+
}
141+
142+
size_t val = array_[front_];
143+
144+
front_ = (front_ + 1) % N;
145+
--size_;
146+
147+
return val;
148+
}
149+
150+
size_t size() const
151+
{
152+
return size_;
153+
}
154+
155+
private:
156+
size_t front_ = 0;
157+
size_t size_ = 0;
158+
std::array<size_t, N> array_;
104159
};
105160

106-
std::array<PoolMember, Size> pool_;
107-
size_t next_array_index_;
161+
// It's very important that these are shared_ptrs, since users of this class might hold a
162+
// reference to a pool item longer than the lifetime of the class. In that scenario, the
163+
// shared_ptr ensures that the lifetime of these variables outlives this class, and hence ensures
164+
// the custom destructor for each pool item can successfully run.
165+
std::shared_ptr<std::mutex> pool_mutex_;
166+
std::shared_ptr<std::array<MessageT *, Size>> pool_;
167+
std::shared_ptr<CircularArray<Size>> free_list_;
108168
};
109169

110170
} // namespace message_pool_memory_strategy

rclcpp/test/rclcpp/strategies/test_message_pool_memory_strategy.cpp

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,28 @@ TEST_F(TestMessagePoolMemoryStrategy, borrow_too_many) {
5656
// Size is 1, borrowing second time should fail
5757
RCLCPP_EXPECT_THROW_EQ(
5858
message_memory_strategy_->borrow_message(),
59-
std::runtime_error("Tried to access message that was still in use! Abort."));
59+
std::runtime_error("No more free slots in the pool"));
6060
EXPECT_NO_THROW(message_memory_strategy_->return_message(message));
6161
}
6262

63-
TEST_F(TestMessagePoolMemoryStrategy, return_unrecognized) {
64-
auto message = message_memory_strategy_->borrow_message();
65-
ASSERT_NE(nullptr, message);
63+
TEST_F(TestMessagePoolMemoryStrategy, borrow_hold_reference) {
64+
{
65+
auto message = message_memory_strategy_->borrow_message();
66+
ASSERT_NE(nullptr, message);
6667

67-
auto unrecognized = std::make_shared<test_msgs::msg::Empty>();
68-
// Unrecognized does not belong to pool
69-
RCLCPP_EXPECT_THROW_EQ(
70-
message_memory_strategy_->return_message(unrecognized),
71-
std::runtime_error("Unrecognized message ptr in return_message."));
72-
EXPECT_NO_THROW(message_memory_strategy_->return_message(message));
68+
// Return it.
69+
EXPECT_NO_THROW(message_memory_strategy_->return_message(message));
70+
71+
// But we are still holding the reference, so we expect that there is still no room in the pool.
72+
RCLCPP_EXPECT_THROW_EQ(
73+
message_memory_strategy_->borrow_message(),
74+
std::runtime_error("No more free slots in the pool"));
75+
}
76+
77+
// Now that we've dropped the reference (left the scope), we expect to be able to borrow again.
78+
79+
auto message2 = message_memory_strategy_->borrow_message();
80+
ASSERT_NE(nullptr, message2);
81+
82+
EXPECT_NO_THROW(message_memory_strategy_->return_message(message2));
7383
}

0 commit comments

Comments
 (0)