Skip to content

Commit df6db3d

Browse files
Flow controller configuration for built-in endpoints (#5680)
* Refactor builtin writers Signed-off-by: Eugenio Collado <eugeniocollado@eprosima.com> Refactor builtin readers Signed-off-by: Eugenio Collado <eugeniocollado@eprosima.com> Uncrustify Signed-off-by: Eugenio Collado <eugeniocollado@eprosima.com> Refactor WLP Signed-off-by: Eugenio Collado <eugeniocollado@eprosima.com> * Builtin flow controller Signed-off-by: Eugenio Collado <eugeniocollado@eprosima.com> * BlackboxPubSub Signed-off-by: Eugenio Collado <eugeniocollado@eprosima.com> * TypeLookUpService tests Signed-off-by: Eugenio Collado <eugeniocollado@eprosima.com> * Unicast msgs test Signed-off-by: Eugenio Collado <eugeniocollado@eprosima.com> * Check qos unregistered builtin flow controller Signed-off-by: Eugenio Collado <eugeniocollado@eprosima.com> * Fix tests Signed-off-by: Eugenio Collado <eugeniocollado@eprosima.com> * Applied suggestions Signed-off-by: Eugenio Collado <eugeniocollado@eprosima.com> * Modified versions.md Signed-off-by: Eugenio Collado <eugeniocollado@eprosima.com> --------- Signed-off-by: Eugenio Collado <eugeniocollado@eprosima.com>
1 parent 16fedc8 commit df6db3d

File tree

26 files changed

+682
-140
lines changed

26 files changed

+682
-140
lines changed

include/fastdds/rtps/attributes/RTPSParticipantAttributes.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,9 @@ class BuiltinAttributes
391391
//! Set to true to avoid multicast traffic on builtin endpoints
392392
bool avoid_builtin_multicast = true;
393393

394+
//! Flow controller name to use for the builtin writers
395+
std::string flow_controller_name = "";
396+
394397
BuiltinAttributes() = default;
395398

396399
virtual ~BuiltinAttributes() = default;
@@ -410,6 +413,7 @@ class BuiltinAttributes
410413
(this->writerHistoryMemoryPolicy == b.writerHistoryMemoryPolicy) &&
411414
(this->writerPayloadSize == b.writerPayloadSize) &&
412415
(this->mutation_tries == b.mutation_tries) &&
416+
(this->flow_controller_name == b.flow_controller_name) &&
413417
(this->avoid_builtin_multicast == b.avoid_builtin_multicast);
414418
}
415419

resources/xsd/fastdds_profiles.xsd

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,8 @@
722722
├ writerHistoryMemoryPolicy [0~1],
723723
├ readerPayloadSize [uint32],
724724
├ writerPayloadSize [uint32],
725-
└ mutation_tries [uint32]-->
725+
├ mutation_tries [uint32],
726+
└ flow_controller_name [string] -->
726727
<xs:complexType name="builtinAttributesType">
727728
<xs:all>
728729
<xs:element name="discovery_config" type="discoverySettingsType" minOccurs="0" maxOccurs="1"/>
@@ -737,6 +738,7 @@
737738
<xs:element name="readerPayloadSize" type="uint32" minOccurs="0" maxOccurs="1"/>
738739
<xs:element name="writerPayloadSize" type="uint32" minOccurs="0" maxOccurs="1"/>
739740
<xs:element name="mutation_tries" type="uint32" minOccurs="0" maxOccurs="1"/>
741+
<xs:element name="flow_controller_name" type="string" minOccurs="0" maxOccurs="1"/>
740742
</xs:all>
741743
</xs:complexType>
742744

src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.cpp

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include <rtps/builtin/data/ParticipantProxyData.hpp>
3838
#include <rtps/builtin/data/ReaderProxyData.hpp>
3939
#include <rtps/builtin/data/WriterProxyData.hpp>
40+
#include <rtps/builtin/discovery/participant/PDP.h>
4041
#include <rtps/participant/RTPSParticipantImpl.hpp>
4142
#include <rtps/reader/StatefulReader.hpp>
4243
#include <rtps/RTPSDomainImpl.hpp>
@@ -549,36 +550,22 @@ bool TypeLookupManager::create_endpoints()
549550
{
550551
bool ret = true;
551552

552-
const RTPSParticipantAttributes& pattr = participant_->get_attributes();
553-
554553
// Built-in history attributes.
555554
HistoryAttributes hatt;
556555
hatt.initialReservedCaches = 20;
557556
hatt.maximumReservedCaches = 1000;
558557
hatt.payloadMaxSize = TypeLookupManager::typelookup_data_max_size;
559558

560-
WriterAttributes watt;
561-
watt.endpoint.unicastLocatorList = builtin_protocols_->m_metatrafficUnicastLocatorList;
562-
watt.endpoint.multicastLocatorList = builtin_protocols_->m_metatrafficMulticastLocatorList;
563-
watt.endpoint.external_unicast_locators = builtin_protocols_->m_att.metatraffic_external_unicast_locators;
564-
watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
559+
WriterAttributes watt = participant_->pdp()->create_builtin_writer_attributes();
565560
watt.endpoint.remoteLocatorList = builtin_protocols_->m_initialPeersList;
566-
watt.matched_readers_allocation = pattr.allocation.participants;
567561
watt.endpoint.topicKind = fastdds::rtps::NO_KEY;
568-
watt.endpoint.reliabilityKind = fastdds::rtps::RELIABLE;
569562
watt.endpoint.durabilityKind = fastdds::rtps::VOLATILE;
570563
watt.mode = fastdds::rtps::ASYNCHRONOUS_WRITER;
571564

572-
ReaderAttributes ratt;
573-
ratt.endpoint.unicastLocatorList = builtin_protocols_->m_metatrafficUnicastLocatorList;
574-
ratt.endpoint.multicastLocatorList = builtin_protocols_->m_metatrafficMulticastLocatorList;
575-
ratt.endpoint.external_unicast_locators = builtin_protocols_->m_att.metatraffic_external_unicast_locators;
576-
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
565+
ReaderAttributes ratt = participant_->pdp()->create_builtin_reader_attributes();
577566
ratt.endpoint.remoteLocatorList = builtin_protocols_->m_initialPeersList;
578-
ratt.matched_writers_allocation = pattr.allocation.participants;
579567
ratt.expects_inline_qos = true;
580568
ratt.endpoint.topicKind = fastdds::rtps::NO_KEY;
581-
ratt.endpoint.reliabilityKind = fastdds::rtps::RELIABLE;
582569
ratt.endpoint.durabilityKind = fastdds::rtps::VOLATILE;
583570

584571
// Built-in request writer

src/cpp/fastdds/domain/DomainParticipantImpl.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include <fastdds/domain/DomainParticipantImpl.hpp>
2121

22+
#include <algorithm>
2223
#include <chrono>
2324
#include <string>
2425

@@ -2449,6 +2450,33 @@ ReturnCode_t DomainParticipantImpl::check_qos(
24492450
}
24502451
}
24512452

2453+
// Check participant's wire protocol (builtin flow controller) configuration
2454+
if (RETCODE_OK == ret_val)
2455+
{
2456+
const std::string& builtin_flow_controller_name = qos.wire_protocol().builtin.flow_controller_name;
2457+
2458+
if (!builtin_flow_controller_name.empty())
2459+
{
2460+
// Get the list of flow controllers
2461+
auto flow_controllers = qos.flow_controllers();
2462+
2463+
// Check if any flow controller matches the builtin flow controller name
2464+
bool found = std::any_of(flow_controllers.begin(), flow_controllers.end(),
2465+
[&builtin_flow_controller_name](const std::shared_ptr<fastdds::rtps::
2466+
FlowControllerDescriptor>& fc)
2467+
{
2468+
return fc && fc->name == builtin_flow_controller_name;
2469+
});
2470+
2471+
if (!found)
2472+
{
2473+
EPROSIMA_LOG_ERROR(RTPS_QOS_CHECK, "Flow controller name not found in flow controllers list");
2474+
return RETCODE_INCONSISTENT_POLICY;
2475+
}
2476+
}
2477+
}
2478+
2479+
24522480
return ret_val;
24532481
}
24542482

@@ -2504,6 +2532,8 @@ bool DomainParticipantImpl::can_qos_be_updated(
25042532
from.wire_protocol().builtin.writerHistoryMemoryPolicy) ||
25052533
!(to.wire_protocol().builtin.writerPayloadSize == from.wire_protocol().builtin.writerPayloadSize) ||
25062534
!(to.wire_protocol().builtin.mutation_tries == from.wire_protocol().builtin.mutation_tries) ||
2535+
!(to.wire_protocol().builtin.flow_controller_name ==
2536+
from.wire_protocol().builtin.flow_controller_name) ||
25072537
!(to.wire_protocol().builtin.avoid_builtin_multicast ==
25082538
from.wire_protocol().builtin.avoid_builtin_multicast) ||
25092539
!(to.wire_protocol().builtin.discovery_config.discoveryProtocol ==

src/cpp/rtps/builtin/discovery/participant/PDP.cpp

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
#include <rtps/builtin/discovery/participant/PDPEndpoints.hpp>
5050
#include <rtps/builtin/discovery/participant/PDPListener.h>
5151
#include <rtps/builtin/liveliness/WLP.hpp>
52+
#include <rtps/flowcontrol/FlowControllerFactory.hpp>
5253
#include <rtps/history/TopicPayloadPoolRegistry.hpp>
5354
#include <rtps/network/utils/external_locators.hpp>
5455
#include <rtps/participant/RTPSParticipantImpl.hpp>
@@ -1628,38 +1629,58 @@ static void set_builtin_endpoint_locators(
16281629
endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
16291630
}
16301631

1631-
ReaderAttributes PDP::create_builtin_reader_attributes() const
1632+
ReaderAttributes PDP::create_builtin_reader_attributes()
16321633
{
16331634
ReaderAttributes attributes;
16341635

16351636
const RTPSParticipantAttributes& pattr = getRTPSParticipant()->get_attributes();
16361637
set_builtin_matched_allocation(attributes.matched_writers_allocation, pattr);
1637-
set_builtin_endpoint_locators(attributes.endpoint, this, mp_builtin);
16381638

16391639
// Builtin endpoints are always reliable, transient local, keyed topics
16401640
attributes.endpoint.reliabilityKind = RELIABLE;
16411641
attributes.endpoint.durabilityKind = TRANSIENT_LOCAL;
16421642
attributes.endpoint.topicKind = WITH_KEY;
16431643

1644+
attributes.endpoint.endpointKind = READER;
1645+
16441646
// Built-in readers never expect inline qos
16451647
attributes.expects_inline_qos = false;
16461648

1649+
attributes.times.heartbeat_response_delay = pdp_heartbeat_response_delay;
1650+
1651+
set_builtin_endpoint_locators(attributes.endpoint, this, mp_builtin);
1652+
16471653
return attributes;
16481654
}
16491655

1650-
WriterAttributes PDP::create_builtin_writer_attributes() const
1656+
WriterAttributes PDP::create_builtin_writer_attributes()
16511657
{
16521658
WriterAttributes attributes;
16531659

16541660
const RTPSParticipantAttributes& pattr = getRTPSParticipant()->get_attributes();
16551661
set_builtin_matched_allocation(attributes.matched_readers_allocation, pattr);
1656-
set_builtin_endpoint_locators(attributes.endpoint, this, mp_builtin);
16571662

16581663
// Builtin endpoints are always reliable, transient local, keyed topics
16591664
attributes.endpoint.reliabilityKind = RELIABLE;
16601665
attributes.endpoint.durabilityKind = TRANSIENT_LOCAL;
16611666
attributes.endpoint.topicKind = WITH_KEY;
16621667

1668+
attributes.endpoint.endpointKind = WRITER;
1669+
1670+
// We assume that if we have at least one flow controller defined, we use async flow controller
1671+
if (!pattr.flow_controllers.empty())
1672+
{
1673+
attributes.mode = ASYNCHRONOUS_WRITER;
1674+
attributes.flow_controller_name = (pattr.builtin.flow_controller_name !=
1675+
"") ? pattr.builtin.flow_controller_name : fastdds::rtps::async_flow_controller_name;
1676+
}
1677+
1678+
attributes.times.heartbeat_period = pdp_heartbeat_period;
1679+
attributes.times.nack_response_delay = pdp_nack_response_delay;
1680+
attributes.times.nack_supression_duration = pdp_nack_supression_duration;
1681+
1682+
set_builtin_endpoint_locators(attributes.endpoint, this, mp_builtin);
1683+
16631684
return attributes;
16641685
}
16651686

src/cpp/rtps/builtin/discovery/participant/PDP.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -441,9 +441,9 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable
441441
return temp_writer_proxies_;
442442
}
443443

444-
ReaderAttributes create_builtin_reader_attributes() const;
444+
ReaderAttributes create_builtin_reader_attributes();
445445

446-
WriterAttributes create_builtin_writer_attributes() const;
446+
WriterAttributes create_builtin_writer_attributes();
447447

448448
#if HAVE_SECURITY
449449
void add_builtin_security_attributes(
@@ -679,7 +679,6 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable
679679

680680
};
681681

682-
683682
// configuration values for PDP reliable entities.
684683
extern const dds::Duration_t pdp_heartbeat_period;
685684
extern const dds::Duration_t pdp_nack_response_delay;

src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -310,8 +310,6 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(
310310

311311
EPROSIMA_LOG_INFO(RTPS_PDP, "Beginning PDPClient Endpoints creation");
312312

313-
const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->get_attributes();
314-
315313
/***********************************
316314
* PDP READER
317315
***********************************/
@@ -322,17 +320,8 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(
322320
hatt.memoryPolicy = mp_builtin->m_att.readerHistoryMemoryPolicy;
323321
endpoints.reader.history_.reset(new ReaderHistory(hatt));
324322

325-
ReaderAttributes ratt;
326-
ratt.expects_inline_qos = false;
327-
ratt.endpoint.endpointKind = READER;
328-
ratt.endpoint.multicastLocatorList = mp_builtin->m_metatrafficMulticastLocatorList;
329-
ratt.endpoint.unicastLocatorList = mp_builtin->m_metatrafficUnicastLocatorList;
330-
ratt.endpoint.external_unicast_locators = mp_builtin->m_att.metatraffic_external_unicast_locators;
331-
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
332-
ratt.endpoint.topicKind = WITH_KEY;
333-
ratt.endpoint.durabilityKind = TRANSIENT_LOCAL;
334-
ratt.endpoint.reliabilityKind = RELIABLE;
335-
ratt.times.heartbeat_response_delay = pdp_heartbeat_response_delay;
323+
ReaderAttributes ratt = create_builtin_reader_attributes();
324+
336325
#if HAVE_SECURITY
337326
if (is_discovery_protected)
338327
{
@@ -377,18 +366,7 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(
377366
hatt.memoryPolicy = mp_builtin->m_att.writerHistoryMemoryPolicy;
378367
endpoints.writer.history_.reset(new WriterHistory(hatt));
379368

380-
WriterAttributes watt;
381-
watt.endpoint.endpointKind = WRITER;
382-
watt.endpoint.durabilityKind = TRANSIENT_LOCAL;
383-
watt.endpoint.reliabilityKind = RELIABLE;
384-
watt.endpoint.topicKind = WITH_KEY;
385-
watt.endpoint.multicastLocatorList = mp_builtin->m_metatrafficMulticastLocatorList;
386-
watt.endpoint.unicastLocatorList = mp_builtin->m_metatrafficUnicastLocatorList;
387-
watt.endpoint.external_unicast_locators = mp_builtin->m_att.metatraffic_external_unicast_locators;
388-
watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
389-
watt.times.heartbeat_period = pdp_heartbeat_period;
390-
watt.times.nack_response_delay = pdp_nack_response_delay;
391-
watt.times.nack_supression_duration = pdp_nack_supression_duration;
369+
WriterAttributes watt = create_builtin_writer_attributes();
392370

393371
#if HAVE_SECURITY
394372
if (is_discovery_protected)
@@ -399,13 +377,6 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(
399377
}
400378
#endif // HAVE_SECURITY
401379

402-
// We assume that if we have at least one flow controller defined, we use async flow controller
403-
if (!pattr.flow_controllers.empty())
404-
{
405-
watt.mode = ASYNCHRONOUS_WRITER;
406-
watt.flow_controller_name = fastdds::rtps::async_flow_controller_name;
407-
}
408-
409380
RTPSWriter* wout = nullptr;
410381
#if HAVE_SECURITY
411382
EntityId_t writer_entity =

src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,6 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
336336
bool secure)
337337
{
338338
static_cast<void>(secure);
339-
const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->get_attributes();
340339

341340
/***********************************
342341
* PDP READER
@@ -349,18 +348,10 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
349348
endpoints.reader.history_.reset(new ReaderHistory(hatt));
350349

351350
// PDP Reader Attributes
352-
ReaderAttributes ratt;
353-
ratt.expects_inline_qos = false;
354-
ratt.endpoint.endpointKind = READER;
355-
ratt.endpoint.multicastLocatorList = mp_builtin->m_metatrafficMulticastLocatorList;
356-
ratt.endpoint.unicastLocatorList = mp_builtin->m_metatrafficUnicastLocatorList;
357-
ratt.endpoint.external_unicast_locators = mp_builtin->m_att.metatraffic_external_unicast_locators;
358-
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
359-
ratt.endpoint.topicKind = WITH_KEY;
351+
ReaderAttributes ratt = create_builtin_reader_attributes();
360352
// Change depending on backup mode
361353
ratt.endpoint.durabilityKind = durability_;
362-
ratt.endpoint.reliabilityKind = RELIABLE;
363-
ratt.times.heartbeat_response_delay = pdp_heartbeat_response_delay;
354+
364355
#if HAVE_SECURITY
365356
if (secure)
366357
{
@@ -422,8 +413,8 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
422413
endpoints.writer.history_.reset(new WriterHistory(hatt));
423414

424415
// PDP Writer Attributes
425-
WriterAttributes watt;
426-
watt.endpoint.endpointKind = WRITER;
416+
WriterAttributes watt = create_builtin_writer_attributes();
417+
427418
// VOLATILE durability to highlight that on steady state the history is empty (except for announcement DATAs)
428419
// this setting is incompatible with CLIENTs TRANSIENT_LOCAL PDP readers but not validation is done on builitin
429420
// endpoints
@@ -435,16 +426,8 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
435426
get_writer_persistence_file_name()));
436427
#endif // HAVE_SQLITE3
437428

438-
watt.endpoint.reliabilityKind = RELIABLE;
439-
watt.endpoint.topicKind = WITH_KEY;
440-
watt.endpoint.multicastLocatorList = mp_builtin->m_metatrafficMulticastLocatorList;
441-
watt.endpoint.unicastLocatorList = mp_builtin->m_metatrafficUnicastLocatorList;
442-
watt.endpoint.external_unicast_locators = mp_builtin->m_att.metatraffic_external_unicast_locators;
443-
watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
444-
watt.times.heartbeat_period = pdp_heartbeat_period;
445-
watt.times.nack_response_delay = pdp_nack_response_delay;
446-
watt.times.nack_supression_duration = pdp_nack_supression_duration;
447-
watt.mode = ASYNCHRONOUS_WRITER;
429+
watt.mode = ASYNCHRONOUS_WRITER; //
430+
448431
// Enable separate sending so the filter can be called for each change and reader proxy
449432
watt.separate_sending = true;
450433
#if HAVE_SECURITY

src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -395,12 +395,6 @@ bool PDPSimple::create_dcps_participant_endpoints()
395395
mp_RTPSParticipant->createSenderResources(entry);
396396
}
397397

398-
// We assume that if we have at least one flow controller defined, we use async flow controller
399-
if (!pattr.flow_controllers.empty())
400-
{
401-
watt.mode = ASYNCHRONOUS_WRITER;
402-
watt.flow_controller_name = fastdds::rtps::async_flow_controller_name;
403-
}
404398

405399
RTPSWriter* rtps_writer = nullptr;
406400
if (mp_RTPSParticipant->createWriter(&rtps_writer, watt, writer.history_.get(),

0 commit comments

Comments
 (0)