Skip to content

Commit 977ac28

Browse files
Support topic instances (#753)
* rmw_fastrtps_shared: adapt shared type support Signed-off-by: Mario Dominguez <[email protected]> * rmw_fastrtps_cpp: update type support and implementation for keys adoption Signed-off-by: Mario Dominguez <[email protected]> * rmw_fastrtps_dynamic_cpp: update type support and implementation for keys adoption Signed-off-by: Mario Dominguez <[email protected]> * rmw_fastrtps_shared_cpp: new apply_qos_resource_limits_for_keys() Signed-off-by: Mario Dominguez <[email protected]> * rmw_fastrtps_cpp: add apply_qos_resource_limits_for_keys() Signed-off-by: Mario Dominguez <[email protected]> * rmw_fastrtps_dynamic_cpp: add apply_qos_resource_limits_for_keys() Signed-off-by: Mario Dominguez <[email protected]> * Apply review changes Signed-off-by: Mario Dominguez <[email protected]> * Linters Signed-off-by: Mario Dominguez <[email protected]> * Rev changes 2 Signed-off-by: Mario Dominguez <[email protected]> * More linters Signed-off-by: Miguel Company <[email protected]> * Typo Signed-off-by: Mario Dominguez <[email protected]> * Add assert(ihandle) Signed-off-by: Mario Dominguez <[email protected]> * Align argument Signed-off-by: Mario Dominguez <[email protected]> * Fix build after rebase Signed-off-by: Miguel Company <[email protected]> * Make `get_key_hash_from_ros_message` more modern and readable. Signed-off-by: Miguel Company <[email protected]> --------- Signed-off-by: Mario Dominguez <[email protected]> Signed-off-by: Miguel Company <[email protected]> Co-authored-by: Mario Dominguez <[email protected]>
1 parent f2b3204 commit 977ac28

File tree

19 files changed

+524
-10
lines changed

19 files changed

+524
-10
lines changed

rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,18 @@ class TypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport
4444

4545
explicit TypeSupport(const rosidl_message_type_support_t * type_supports);
4646

47+
bool get_key_hash_from_ros_message(
48+
void * ros_message,
49+
eprosima::fastdds::rtps::InstanceHandle_t * ihandle,
50+
bool force_md5,
51+
const void * impl) const override;
52+
4753
protected:
4854
void set_members(const message_type_support_callbacks_t * members);
4955

5056
private:
5157
const message_type_support_callbacks_t * members_;
58+
const message_type_support_key_callbacks_t * key_callbacks_;
5259
bool has_data_;
5360
};
5461

rmw_fastrtps_cpp/src/publisher.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,15 @@ rmw_fastrtps_cpp::create_publisher(
259259
return nullptr;
260260
}
261261

262+
// Apply resource limits QoS if the type is keyed
263+
if (fastdds_type->is_compute_key_provided &&
264+
!participant_info->leave_middleware_default_qos)
265+
{
266+
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
267+
writer_qos.history(),
268+
writer_qos.resource_limits());
269+
}
270+
262271
// Creates DataWriter with a mask enabling publication_matched calls for the listener
263272
info->data_writer_ = publisher->create_datawriter(
264273
info->topic_,

rmw_fastrtps_cpp/src/rmw_client.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,15 @@ rmw_create_client(
328328
return nullptr;
329329
}
330330

331+
// Apply resource limits QoS if the type is keyed
332+
if (response_fastdds_type->is_compute_key_provided &&
333+
!participant_info->leave_middleware_default_qos)
334+
{
335+
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
336+
reader_qos.history(),
337+
reader_qos.resource_limits());
338+
}
339+
331340
// Creates DataReader
332341
info->response_reader_ = subscriber->create_datareader(
333342
response_topic_desc,
@@ -386,6 +395,15 @@ rmw_create_client(
386395
return nullptr;
387396
}
388397

398+
// Apply resource limits QoS if the type is keyed
399+
if (request_fastdds_type->is_compute_key_provided &&
400+
!participant_info->leave_middleware_default_qos)
401+
{
402+
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
403+
writer_qos.history(),
404+
writer_qos.resource_limits());
405+
}
406+
389407
// Creates DataWriter with a mask enabling publication_matched calls for the listener
390408
info->request_writer_ = publisher->create_datawriter(
391409
info->request_topic_,

rmw_fastrtps_cpp/src/rmw_service.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,15 @@ rmw_create_service(
324324
return nullptr;
325325
}
326326

327+
// Apply resource limits QoS if the type is keyed
328+
if (request_fastdds_type->is_compute_key_provided &&
329+
!participant_info->leave_middleware_default_qos)
330+
{
331+
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
332+
reader_qos.history(),
333+
reader_qos.resource_limits());
334+
}
335+
327336
// Creates DataReader
328337
info->request_reader_ = subscriber->create_datareader(
329338
request_topic_desc,
@@ -386,6 +395,15 @@ rmw_create_service(
386395
return nullptr;
387396
}
388397

398+
// Apply resource limits QoS if the type is keyed
399+
if (response_fastdds_type->is_compute_key_provided &&
400+
!participant_info->leave_middleware_default_qos)
401+
{
402+
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
403+
writer_qos.history(),
404+
writer_qos.resource_limits());
405+
}
406+
389407
// Creates DataWriter with a mask enabling publication_matched calls for the listener
390408
info->response_writer_ = publisher->create_datawriter(
391409
info->response_topic_,

rmw_fastrtps_cpp/src/subscription.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,15 @@ __create_dynamic_subscription(
392392
return nullptr;
393393
}
394394

395+
// Apply resource limits QoS if the type is keyed
396+
if (fastdds_type->is_compute_key_provided &&
397+
!participant_info->leave_middleware_default_qos)
398+
{
399+
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
400+
reader_qos.history(),
401+
reader_qos.resource_limits());
402+
}
403+
395404
info->datareader_qos_ = reader_qos;
396405

397406
// create_datareader
@@ -654,6 +663,15 @@ __create_subscription(
654663
return nullptr;
655664
}
656665

666+
// Apply resource limits QoS if the type is keyed
667+
if (fastdds_type->is_compute_key_provided &&
668+
!participant_info->leave_middleware_default_qos)
669+
{
670+
rmw_fastrtps_shared_cpp::apply_qos_resource_limits_for_keys(
671+
reader_qos.history(),
672+
reader_qos.resource_limits());
673+
}
674+
657675
info->datareader_qos_ = reader_qos;
658676

659677
// create_datareader

rmw_fastrtps_cpp/src/type_support_common.cpp

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ TypeSupport::TypeSupport(
3131
is_compute_key_provided = false;
3232
max_size_bound_ = false;
3333
is_plain_ = false;
34+
key_is_unbounded_ = false;
35+
key_max_serialized_size_ = 0;
36+
members_ = nullptr;
37+
key_callbacks_ = nullptr;
38+
has_data_ = false;
3439
}
3540

3641
void TypeSupport::set_members(const message_type_support_callbacks_t * members)
@@ -60,6 +65,16 @@ void TypeSupport::set_members(const message_type_support_callbacks_t * members)
6065
max_serialized_type_size = 4 + data_size;
6166
// Account for RTPS submessage alignment
6267
max_serialized_type_size = (max_serialized_type_size + 3) & ~3;
68+
69+
if (nullptr != members->key_callbacks) {
70+
key_callbacks_ = members->key_callbacks;
71+
is_compute_key_provided = true;
72+
73+
key_max_serialized_size_ = key_callbacks_->max_serialized_size_key(key_is_unbounded_);
74+
if (!key_is_unbounded_) {
75+
key_buffer_.reserve(key_max_serialized_size_);
76+
}
77+
}
6378
}
6479

6580
size_t TypeSupport::getEstimatedSerializedSize(const void * ros_message, const void * impl) const
@@ -132,6 +147,55 @@ bool TypeSupport::deserializeROSmessage(
132147
return true;
133148
}
134149

150+
bool TypeSupport::get_key_hash_from_ros_message(
151+
void * ros_message,
152+
eprosima::fastdds::rtps::InstanceHandle_t * ihandle,
153+
bool force_md5,
154+
const void * [[maybe_unused]] impl) const
155+
{
156+
assert(ros_message);
157+
assert(ihandle);
158+
159+
// retrieve estimated serialized size in case key is unbounded
160+
if (key_is_unbounded_) {
161+
key_max_serialized_size_ = (std::max) (
162+
key_max_serialized_size_,
163+
key_callbacks_->get_serialized_size_key(ros_message));
164+
key_buffer_.reserve(key_max_serialized_size_);
165+
}
166+
167+
eprosima::fastcdr::FastBuffer fast_buffer(
168+
reinterpret_cast<char *>(key_buffer_.data()),
169+
key_max_serialized_size_);
170+
171+
eprosima::fastcdr::Cdr ser(
172+
fast_buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::CdrVersion::XCDRv1);
173+
174+
key_callbacks_->cdr_serialize_key(ros_message, ser);
175+
176+
const size_t max_serialized_key_length = 16;
177+
178+
auto ser_length = ser.get_serialized_data_length();
179+
180+
// check for md5
181+
if (force_md5 || key_max_serialized_size_ > max_serialized_key_length) {
182+
md5_.init();
183+
md5_.update(key_buffer_.data(), static_cast<unsigned int>(ser_length));
184+
md5_.finalize();
185+
186+
for (uint8_t i = 0; i < max_serialized_key_length; ++i) {
187+
ihandle->value[i] = md5_.digest[i];
188+
}
189+
} else {
190+
memset(ihandle->value, 0, max_serialized_key_length);
191+
for (uint8_t i = 0; i < ser_length; ++i) {
192+
ihandle->value[i] = key_buffer_[i];
193+
}
194+
}
195+
196+
return true;
197+
}
198+
135199
MessageTypeSupport::MessageTypeSupport(
136200
const message_type_support_callbacks_t * members,
137201
const rosidl_message_type_support_t * type_supports)

rmw_fastrtps_dynamic_cpp/src/MessageTypeSupport_impl.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ MessageTypeSupport<MembersType>::MessageTypeSupport(
6161
} else {
6262
this->max_serialized_type_size++;
6363
}
64+
65+
if (this->members_->has_any_key_member_) {
66+
this->key_max_serialized_size_ = this->calculateMaxSerializedKeySize(members);
67+
this->is_compute_key_provided = true;
68+
this->key_buffer_.reserve(this->key_max_serialized_size_);
69+
}
70+
6471
// Account for RTPS submessage alignment
6572
this->max_serialized_type_size = (this->max_serialized_type_size + 3) & ~3;
6673
}

rmw_fastrtps_dynamic_cpp/src/ServiceTypeSupport_impl.hpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@ RequestTypeSupport<ServiceMembersType, MessageMembersType>::RequestTypeSupport(
6262
} else {
6363
this->max_serialized_type_size++;
6464
}
65+
66+
if (this->members_->has_any_key_member_) {
67+
this->key_max_serialized_size_ = this->calculateMaxSerializedKeySize(this->members_);
68+
this->is_compute_key_provided = true;
69+
this->key_buffer_.reserve(this->key_max_serialized_size_);
70+
}
71+
6572
// Account for RTPS submessage alignment
6673
this->max_serialized_type_size = (this->max_serialized_type_size + 3) & ~3;
6774
}
@@ -98,6 +105,13 @@ ResponseTypeSupport<ServiceMembersType, MessageMembersType>::ResponseTypeSupport
98105
} else {
99106
this->max_serialized_type_size++;
100107
}
108+
109+
if (this->members_->has_any_key_member_) {
110+
this->key_max_serialized_size_ = this->calculateMaxSerializedKeySize(this->members_);
111+
this->is_compute_key_provided = true;
112+
this->key_buffer_.reserve(this->key_max_serialized_size_);
113+
}
114+
101115
// Account for RTPS submessage alignment
102116
this->max_serialized_type_size = (this->max_serialized_type_size + 3) & ~3;
103117
}

rmw_fastrtps_dynamic_cpp/src/TypeSupport.hpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,10 @@ class TypeSupportProxy : public rmw_fastrtps_shared_cpp::TypeSupport
138138

139139
bool deserializeROSmessage(
140140
eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override;
141+
142+
bool get_key_hash_from_ros_message(
143+
void * ros_message, eprosima::fastdds::rtps::InstanceHandle_t * ihandle, bool force_md5,
144+
const void * impl) const override;
141145
};
142146

143147
class BaseTypeSupport : public rmw_fastrtps_shared_cpp::TypeSupport
@@ -185,6 +189,12 @@ class TypeSupport : public BaseTypeSupport
185189
bool deserializeROSmessage(
186190
eprosima::fastcdr::Cdr & deser, void * ros_message, const void * impl) const override;
187191

192+
bool get_key_hash_from_ros_message(
193+
void * ros_message,
194+
eprosima::fastdds::rtps::InstanceHandle_t * ihandle,
195+
bool force_md5,
196+
const void * impl) const override;
197+
188198
protected:
189199
// Meant for messages typesupport
190200
explicit TypeSupport(const void * ros_type_support);
@@ -195,24 +205,58 @@ class TypeSupport : public BaseTypeSupport
195205
const void * ros_message_type_supports);
196206

197207
size_t calculateMaxSerializedSize(const MembersType * members, size_t current_alignment);
208+
size_t calculateMaxSerializedKeySize(const MembersType * members);
198209

199210
const MembersType * members_;
200211

201212
private:
213+
size_t calculateMaxSerializedSize(
214+
const MembersType * members,
215+
size_t current_alignment,
216+
bool compute_key,
217+
bool & is_key_unbounded);
218+
202219
size_t getEstimatedSerializedSize(
203220
const MembersType * members,
204221
const void * ros_message,
205222
size_t current_alignment) const;
206223

224+
size_t getEstimatedSerializedKeySize(
225+
const MembersType * members,
226+
const void * ros_message) const;
227+
228+
size_t getEstimatedSerializedSize(
229+
const MembersType * members,
230+
const void * ros_message,
231+
size_t current_alignment,
232+
bool compute_key) const;
233+
207234
bool serializeROSmessage(
208235
eprosima::fastcdr::Cdr & ser,
209236
const MembersType * members,
210237
const void * ros_message) const;
211238

239+
bool serializeKeyROSmessage(
240+
eprosima::fastcdr::Cdr & ser,
241+
const MembersType * members,
242+
const void * ros_message) const;
243+
244+
bool serializeROSmessage(
245+
eprosima::fastcdr::Cdr & ser,
246+
const MembersType * members,
247+
const void * ros_message,
248+
bool compute_key) const;
249+
212250
bool deserializeROSmessage(
213251
eprosima::fastcdr::Cdr & deser,
214252
const MembersType * members,
215253
void * ros_message) const;
254+
255+
bool get_key_hash_from_ros_message(
256+
const MembersType * members,
257+
void * ros_message,
258+
eprosima::fastdds::rtps::InstanceHandle_t * ihandle,
259+
bool force_md5) const;
216260
};
217261

218262
} // namespace rmw_fastrtps_dynamic_cpp

0 commit comments

Comments
 (0)