Skip to content

Commit dcc4dfd

Browse files
bjtulynnquning18
authored andcommitted
platform: bugfix for shm
1. refine shm callback logic 2. remove read old message issue 3. fix shm thread exit issue 4. add topic white list feature
1 parent c4ea9ac commit dcc4dfd

File tree

13 files changed

+173
-162
lines changed

13 files changed

+173
-162
lines changed

ros/ros_comm/roscpp/include/ros/shm_manager.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ class ShmManager
8787

8888
struct ItemShm
8989
{
90+
boost::interprocess::managed_shared_memory* segment;
9091
sharedmem_transport::SharedMemorySegment* segment_mgr;
9192
sharedmem_transport::SharedMemoryBlock* descriptors_sub;
9293
uint32_t queue_size;
@@ -109,6 +110,7 @@ class ShmManager
109110
mutable std::mutex mutex_;
110111
bool started_;
111112
boost::interprocess::interprocess_mutex shm_sub_mutex_;
113+
std::map<std::string, bool> shm_skip_first_msg_;
112114
};
113115

114116
}

ros/ros_comm/roscpp/include/ros/subscription.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,6 @@ class ROSCPP_DECL Subscription : public boost::enable_shared_from_this<Subscript
107107
*/
108108
uint32_t handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link);
109109

110-
uint32_t handleMessage(int32_t msg_index, const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header);
111-
112110
const std::string datatype();
113111
const std::string md5sum();
114112

@@ -259,6 +257,19 @@ class ROSCPP_DECL Subscription : public boost::enable_shared_from_this<Subscript
259257
int32_t last_read_index_;
260258

261259
boost::interprocess::interprocess_mutex shm_sub_mutex_;
260+
261+
SubscriptionCallbackHelperPtr helper_;
262+
263+
public:
264+
SubscriptionCallbackHelperPtr& get_helper()
265+
{
266+
return helper_;
267+
}
268+
269+
std::vector<PublisherLinkPtr> get_publisher_links()
270+
{
271+
return publisher_links_;
272+
}
262273
};
263274

264275
}

ros/ros_comm/roscpp/include/ros/subscription_queue.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ class ROSCPP_DECL SubscriptionQueue : public CallbackInterface, public boost::en
6363
private:
6464
struct Item
6565
{
66-
int32_t msg_index;
6766
bool default_transport;
6867
SubscriptionCallbackHelperPtr helper;
6968
MessageDeserializerPtr deserializer;
@@ -80,9 +79,9 @@ class ROSCPP_DECL SubscriptionQueue : public CallbackInterface, public boost::en
8079
SubscriptionQueue(const std::string& topic, int32_t queue_size, bool allow_concurrent_callbacks);
8180
~SubscriptionQueue();
8281

83-
void push(int32_t msg_index, bool default_transport, const SubscriptionCallbackHelperPtr& helper, const MessageDeserializerPtr& deserializer,
84-
bool has_tracked_object, const VoidConstWPtr& tracked_object, bool nonconst_need_copy,
85-
ros::Time receipt_time = ros::Time(), bool* was_full = 0);
82+
void push(bool default_transport, const SubscriptionCallbackHelperPtr& helper, const MessageDeserializerPtr& deserializer,
83+
bool has_tracked_object, const VoidConstWPtr& tracked_object, bool nonconst_need_copy,
84+
ros::Time receipt_time = ros::Time(), bool* was_full = 0);
8685
void clear();
8786

8887
virtual CallbackInterface::CallResult call();

ros/ros_comm/roscpp/include/sharedmem_transport/sharedmem_block.h

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,17 @@ class SharedMemoryBlock
116116
*/
117117
bool write_to_block(uint8_t* dest, const ros::SerializedMessage& msg);
118118

119-
/**
120-
* \brief Read from block
121-
*
122-
* @param msg: msg waited to be deserialized to
123-
* Return read result, true or false
124-
*/
125-
bool read_from_block(uint32_t& msg_size);
119+
/**
120+
* \brief Read from block
121+
*
122+
* @param src: block address
123+
* @param msg: msg waited to be deserialized to
124+
* @param helper: subscription callback helper
125+
* @param header_ptr: header pointer
126+
* Return read result, true or false
127+
*/
128+
bool read_from_block(uint8_t* src, ros::VoidConstPtr& msg,
129+
ros::SubscriptionCallbackHelperPtr& helper, ros::M_stringPtr& header_ptr);
126130

127131
/**
128132
* \brief Get real alloc_size

ros/ros_comm/roscpp/include/sharedmem_transport/sharedmem_segment.h

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include "ros/message_deserializer.h"
4040
#include "ros/subscription_callback_helper.h"
4141
#include "ros/forwards.h"
42+
#include "ros/datatypes.h"
4243

4344
namespace sharedmem_transport
4445
{
@@ -100,17 +101,21 @@ class SharedMemorySegment
100101
SharedMemoryBlock* descriptors_pub, uint8_t** addr_pub, int32_t& last_index);
101102

102103
/**
103-
* \brief Read data from block for sharedmem subscriber
104-
*
105-
* @param last_read_index: index which is used to record last readable block
106-
* @param descriptors_sub: descriptors maped address from sharedmem subscriber
107-
* @param topic: topic name
108-
* @param msg_buffer: msg buffer
109-
* @param msg_size: msg size
110-
* Return read result, true or false
111-
*/
112-
bool read_data(int32_t& last_read_index, SharedMemoryBlock* descriptors_sub,
113-
const std::string& topic, int32_t& msg_buffer, uint32_t& msg_size);
104+
* \brief Read data from block for sharedmem subscriber
105+
*
106+
* @param msg: msg waited to be read
107+
* @param last_read_index: index which is used to record last readable block
108+
* @param descriptors_sub: descriptors maped address from sharedmem subscriber
109+
* @param addr_sub: block maped address from sharedmem subscriber
110+
* @param helper: subscription callback helper
111+
* @param topic: topic name
112+
* @param header_ptr: header pointer
113+
* Return read result, true or false
114+
*/
115+
bool read_data(ros::VoidConstPtr& msg, int32_t& last_read_index,
116+
SharedMemoryBlock* descriptors_sub, uint8_t** addr_sub,
117+
ros::SubscriptionCallbackHelperPtr& helper, const std::string& topic,
118+
ros::M_stringPtr& header_ptr);
114119

115120
/**
116121
* \brief Set _wrote_num, after publisher wrote data to block

ros/ros_comm/roscpp/src/libros/init.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,7 @@ bool configParse(std::string file)
667667
return false;
668668
}
669669

670+
// For transport_mode
670671
if (node["transport_mode"])
671672
{
672673
g_config_comm.transport_mode = node["transport_mode"].as<int>();
@@ -680,6 +681,28 @@ bool configParse(std::string file)
680681
ROS_WARN_STREAM("Key transport_mode doesn't exist");
681682
}
682683

684+
// For topic_white_list
685+
if (node["topic_white_list"])
686+
{
687+
if (node["topic_white_list"].IsSequence())
688+
{
689+
std::string topic;
690+
for (int i = 0; i < node["topic_white_list"].size(); i++)
691+
{
692+
topic = node["topic_white_list"][i].as<std::string>();
693+
g_config_comm.topic_white_list.insert(topic);
694+
}
695+
}
696+
else
697+
{
698+
ROS_WARN_STREAM("Key topic_white_list is not Sequece");
699+
}
700+
}
701+
else
702+
{
703+
ROS_WARN_STREAM("Key topic_white_list doesn't exist");
704+
}
705+
683706
return true ;
684707

685708
} catch (YAML::Exception& e) {

ros/ros_comm/roscpp/src/libros/sharedmem_transport/sharedmem_block.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,16 @@ bool SharedMemoryBlock::write_to_block(uint8_t* dest, const ros::SerializedMessa
112112
return result;
113113
}
114114

115-
116-
bool SharedMemoryBlock::read_from_block(uint32_t& msg_size)
115+
bool SharedMemoryBlock::read_from_block(uint8_t* src, ros::VoidConstPtr& msg,
116+
ros::SubscriptionCallbackHelperPtr& helper, ros::M_stringPtr& header_ptr)
117117
{
118-
msg_size = (uint32_t)_msg_size;
118+
// Deserialze msg from block
119+
ros::SubscriptionCallbackHelperDeserializeParams params ;
120+
params.buffer = src;
121+
params.length = (uint32_t)_msg_size;
122+
params.connection_header = header_ptr;
123+
msg = helper->deserialize(params);
124+
119125
return true;
120126
}
121127

ros/ros_comm/roscpp/src/libros/sharedmem_transport/sharedmem_segment.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,10 @@ bool SharedMemorySegment::write_data(const ros::SerializedMessage& msg,
184184
return true;
185185
}
186186

187-
bool SharedMemorySegment::read_data(int32_t& last_read_index,
188-
SharedMemoryBlock* descriptors_sub, const std::string& topic,
189-
int32_t& msg_buffer, uint32_t& msg_size)
187+
bool SharedMemorySegment::read_data(ros::VoidConstPtr& msg, int32_t& last_read_index,
188+
SharedMemoryBlock* descriptors_sub, uint8_t** addr_sub,
189+
ros::SubscriptionCallbackHelperPtr& helper, const std::string& topic,
190+
ros::M_stringPtr& header_ptr)
190191
{
191192
ROS_DEBUG("Read radical start!");
192193

@@ -237,9 +238,8 @@ bool SharedMemorySegment::read_data(int32_t& last_read_index,
237238
// Get descriptor current pointer
238239
SharedMemoryBlock* descriptors_curr = descriptors_sub + block_index;
239240

240-
// Read from block, get msg_buffer and msg_size
241-
msg_buffer = block_index;
242-
bool result = descriptors_curr->read_from_block(msg_size);
241+
// Read from block
242+
bool result = descriptors_curr->read_from_block(addr_sub[block_index], msg, helper, header_ptr);
243243

244244
// Release reserve block, after we have read from block
245245
descriptors_curr->release_reserve_for_radical_read();

ros/ros_comm/roscpp/src/libros/sharedmem_transport/sharedmem_util.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ bool SharedMemoryUtil::init_sharedmem(const char* topic_name,
7373

7474
if (!segment_mgr || !descriptors)
7575
{
76+
delete segment;
7677
return false;
7778
}
7879

ros/ros_comm/roscpp/src/libros/shm_manager.cpp

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,16 @@ void ShmManager::shutdown()
100100
{
101101
delete [](sh->second.addr_sub);
102102
}
103+
104+
if (sh->second.segment)
105+
{
106+
delete sh->second.segment;
107+
}
108+
109+
if (sh->second.segment_mgr)
110+
{
111+
delete sh->second.segment_mgr;
112+
}
103113
}
104114

105115
ROS_DEBUG("roscpp shm manager exit!");
@@ -126,7 +136,8 @@ void ShmManager::threadFunc()
126136

127137
if (g_config_comm.topic_white_list.find(topic) !=
128138
g_config_comm.topic_white_list.end() ||
129-
shm_map_.find(topic) != shm_map_.end())
139+
shm_map_.find(topic) != shm_map_.end() ||
140+
(*it)->get_publisher_links().size() == 0)
130141
{
131142
continue;
132143
}
@@ -158,6 +169,7 @@ void ShmManager::threadFunc()
158169
}
159170

160171
ItemShm item;
172+
item.segment = segment;
161173
item.segment_mgr = segment_mgr;
162174
item.descriptors_sub = descriptors_sub;
163175
item.queue_size = queue_size;
@@ -173,6 +185,18 @@ void ShmManager::threadFunc()
173185

174186
shm_map_[topic] = item;
175187
}
188+
189+
if (shm_skip_first_msg_.find(topic) == shm_skip_first_msg_.end())
190+
{
191+
if (segment == NULL)
192+
{
193+
shm_skip_first_msg_[topic] = false;
194+
}
195+
else
196+
{
197+
shm_skip_first_msg_[topic] = true;
198+
}
199+
}
176200
}
177201
}
178202

@@ -192,40 +216,55 @@ void ShmManager::threadFunc()
192216
bool exit = false;
193217
int32_t read_index = -1;
194218
bool is_new_msg = false;
195-
int32_t msg_index ;
196-
uint32_t msg_size ;
197219
M_stringPtr header_ptr(new M_string());
220+
ros::VoidConstPtr msg;
221+
SerializedMessage m;
222+
223+
(*header_ptr)["topic"] = shm_map_[topic].topic_name;
224+
(*header_ptr)["md5sum"] = shm_map_[topic].md5sum;
225+
(*header_ptr)["message_definition"] = shm_map_[topic].message_definition;
226+
(*header_ptr)["callerid"] = shm_map_[topic].callerid;
227+
(*header_ptr)["type"] = shm_map_[topic].datatype;
198228

199229
while (!exit && started_ && ros::ok())
200230
{
201231
{
202-
is_new_msg = shm_map_[topic].segment_mgr->read_data(read_index,
203-
shm_map_[topic].descriptors_sub, topic, msg_index, msg_size);
232+
if (!((shm_map_[topic].shm_sub_ptr)->get_helper()))
233+
{
234+
continue;
235+
}
236+
237+
is_new_msg = shm_map_[topic].segment_mgr->read_data(msg, read_index,
238+
shm_map_[topic].descriptors_sub, shm_map_[topic].addr_sub,
239+
(shm_map_[topic].shm_sub_ptr)->get_helper(), topic, header_ptr);
204240

205241
// Block needs to be allocated
206-
if (read_index == sharedmem_transport::ROS_SHM_SEGMENT_WROTE_NUM)
242+
if (read_index == sharedmem_transport::ROS_SHM_SEGMENT_WROTE_NUM ||
243+
shm_map_[topic].shm_sub_ptr->get_publisher_links().size() == 0)
207244
{
208245
if (shm_map_[topic].addr_sub)
209246
{
210247
delete [](shm_map_[topic].addr_sub);
211248
}
212249
shm_map_.erase(topic);
250+
shm_skip_first_msg_.erase(topic);
251+
is_new_msg = false;
213252
exit = true;
214253
}
215254

216255
// New message is coming
217-
if (is_new_msg && msg_size != 0)
256+
if (is_new_msg)
218257
{
219-
(*header_ptr)["topic"] = shm_map_[topic].topic_name;
220-
(*header_ptr)["md5sum"] = shm_map_[topic].md5sum;
221-
(*header_ptr)["message_definition"] = shm_map_[topic].message_definition;
222-
(*header_ptr)["callerid"] = shm_map_[topic].callerid;
223-
(*header_ptr)["type"] = shm_map_[topic].datatype;
224-
boost::shared_array<uint8_t> buffer(shm_map_[topic].addr_sub[msg_index],
225-
[](uint8_t * buffer) { return buffer; });
226-
227-
shm_map_[topic].shm_sub_ptr->handleMessage(msg_index, SerializedMessage(buffer, msg_size),
228-
true, true, header_ptr);
258+
if (shm_skip_first_msg_[topic] == true)
259+
{
260+
// Skip first message
261+
shm_skip_first_msg_[topic] = false;
262+
continue;
263+
}
264+
265+
m.message = msg;
266+
m.type_info = &((shm_map_[topic].shm_sub_ptr)->get_helper()->getTypeInfo());
267+
shm_map_[topic].shm_sub_ptr->handleMessage(m, false, true, header_ptr, NULL);
229268
}
230269
}
231270
}

0 commit comments

Comments
 (0)