Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/fastdds/rtps/attributes/RTPSParticipantAttributes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ class BuiltinAttributes
//! Set to true to avoid multicast traffic on builtin endpoints
bool avoid_builtin_multicast = true;

//! Flow controller name to use for the builtin writers
std::string flow_controller_name = "";

BuiltinAttributes() = default;

virtual ~BuiltinAttributes() = default;
Expand All @@ -410,6 +413,7 @@ class BuiltinAttributes
(this->writerHistoryMemoryPolicy == b.writerHistoryMemoryPolicy) &&
(this->writerPayloadSize == b.writerPayloadSize) &&
(this->mutation_tries == b.mutation_tries) &&
(this->flow_controller_name == b.flow_controller_name) &&
(this->avoid_builtin_multicast == b.avoid_builtin_multicast);
}

Expand Down
4 changes: 3 additions & 1 deletion resources/xsd/fastdds_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,8 @@
├ writerHistoryMemoryPolicy [0~1],
├ readerPayloadSize [uint32],
├ writerPayloadSize [uint32],
└ mutation_tries [uint32]-->
├ mutation_tries [uint32],
└ flow_controller_name [string] -->
<xs:complexType name="builtinAttributesType">
<xs:all>
<xs:element name="discovery_config" type="discoverySettingsType" minOccurs="0" maxOccurs="1"/>
Expand All @@ -737,6 +738,7 @@
<xs:element name="readerPayloadSize" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="writerPayloadSize" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="mutation_tries" type="uint32" minOccurs="0" maxOccurs="1"/>
<xs:element name="flow_controller_name" type="string" minOccurs="0" maxOccurs="1"/>
</xs:all>
</xs:complexType>

Expand Down
19 changes: 3 additions & 16 deletions src/cpp/fastdds/builtin/type_lookup_service/TypeLookupManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <rtps/builtin/data/ParticipantProxyData.hpp>
#include <rtps/builtin/data/ReaderProxyData.hpp>
#include <rtps/builtin/data/WriterProxyData.hpp>
#include <rtps/builtin/discovery/participant/PDP.h>
#include <rtps/participant/RTPSParticipantImpl.hpp>
#include <rtps/reader/StatefulReader.hpp>
#include <rtps/RTPSDomainImpl.hpp>
Expand Down Expand Up @@ -549,36 +550,22 @@ bool TypeLookupManager::create_endpoints()
{
bool ret = true;

const RTPSParticipantAttributes& pattr = participant_->get_attributes();

// Built-in history attributes.
HistoryAttributes hatt;
hatt.initialReservedCaches = 20;
hatt.maximumReservedCaches = 1000;
hatt.payloadMaxSize = TypeLookupManager::typelookup_data_max_size;

WriterAttributes watt;
watt.endpoint.unicastLocatorList = builtin_protocols_->m_metatrafficUnicastLocatorList;
watt.endpoint.multicastLocatorList = builtin_protocols_->m_metatrafficMulticastLocatorList;
watt.endpoint.external_unicast_locators = builtin_protocols_->m_att.metatraffic_external_unicast_locators;
watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
WriterAttributes watt = participant_->pdp()->create_builtin_writer_attributes();
watt.endpoint.remoteLocatorList = builtin_protocols_->m_initialPeersList;
watt.matched_readers_allocation = pattr.allocation.participants;
watt.endpoint.topicKind = fastdds::rtps::NO_KEY;
watt.endpoint.reliabilityKind = fastdds::rtps::RELIABLE;
watt.endpoint.durabilityKind = fastdds::rtps::VOLATILE;
watt.mode = fastdds::rtps::ASYNCHRONOUS_WRITER;

ReaderAttributes ratt;
ratt.endpoint.unicastLocatorList = builtin_protocols_->m_metatrafficUnicastLocatorList;
ratt.endpoint.multicastLocatorList = builtin_protocols_->m_metatrafficMulticastLocatorList;
ratt.endpoint.external_unicast_locators = builtin_protocols_->m_att.metatraffic_external_unicast_locators;
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
ReaderAttributes ratt = participant_->pdp()->create_builtin_reader_attributes();
ratt.endpoint.remoteLocatorList = builtin_protocols_->m_initialPeersList;
ratt.matched_writers_allocation = pattr.allocation.participants;
ratt.expects_inline_qos = true;
ratt.endpoint.topicKind = fastdds::rtps::NO_KEY;
ratt.endpoint.reliabilityKind = fastdds::rtps::RELIABLE;
ratt.endpoint.durabilityKind = fastdds::rtps::VOLATILE;

// Built-in request writer
Expand Down
30 changes: 30 additions & 0 deletions src/cpp/fastdds/domain/DomainParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <fastdds/domain/DomainParticipantImpl.hpp>

#include <algorithm>
#include <chrono>
#include <string>

Expand Down Expand Up @@ -2449,6 +2450,33 @@ ReturnCode_t DomainParticipantImpl::check_qos(
}
}

// Check participant's wire protocol (builtin flow controller) configuration
if (RETCODE_OK == ret_val)
{
const std::string& builtin_flow_controller_name = qos.wire_protocol().builtin.flow_controller_name;

if (!builtin_flow_controller_name.empty())
{
// Get the list of flow controllers
auto flow_controllers = qos.flow_controllers();

// Check if any flow controller matches the builtin flow controller name
bool found = std::any_of(flow_controllers.begin(), flow_controllers.end(),
[&builtin_flow_controller_name](const std::shared_ptr<fastdds::rtps::
FlowControllerDescriptor>& fc)
{
return fc && fc->name == builtin_flow_controller_name;
});

if (!found)
{
EPROSIMA_LOG_ERROR(RTPS_QOS_CHECK, "Flow controller name not found in flow controllers list");
return RETCODE_INCONSISTENT_POLICY;
}
}
}


return ret_val;
}

Expand Down Expand Up @@ -2504,6 +2532,8 @@ bool DomainParticipantImpl::can_qos_be_updated(
from.wire_protocol().builtin.writerHistoryMemoryPolicy) ||
!(to.wire_protocol().builtin.writerPayloadSize == from.wire_protocol().builtin.writerPayloadSize) ||
!(to.wire_protocol().builtin.mutation_tries == from.wire_protocol().builtin.mutation_tries) ||
!(to.wire_protocol().builtin.flow_controller_name ==
from.wire_protocol().builtin.flow_controller_name) ||
!(to.wire_protocol().builtin.avoid_builtin_multicast ==
from.wire_protocol().builtin.avoid_builtin_multicast) ||
!(to.wire_protocol().builtin.discovery_config.discoveryProtocol ==
Expand Down
29 changes: 25 additions & 4 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include <rtps/builtin/discovery/participant/PDPEndpoints.hpp>
#include <rtps/builtin/discovery/participant/PDPListener.h>
#include <rtps/builtin/liveliness/WLP.hpp>
#include <rtps/flowcontrol/FlowControllerFactory.hpp>
#include <rtps/history/TopicPayloadPoolRegistry.hpp>
#include <rtps/network/utils/external_locators.hpp>
#include <rtps/participant/RTPSParticipantImpl.hpp>
Expand Down Expand Up @@ -1628,38 +1629,58 @@ static void set_builtin_endpoint_locators(
endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
}

ReaderAttributes PDP::create_builtin_reader_attributes() const
ReaderAttributes PDP::create_builtin_reader_attributes()
{
ReaderAttributes attributes;

const RTPSParticipantAttributes& pattr = getRTPSParticipant()->get_attributes();
set_builtin_matched_allocation(attributes.matched_writers_allocation, pattr);
set_builtin_endpoint_locators(attributes.endpoint, this, mp_builtin);

// Builtin endpoints are always reliable, transient local, keyed topics
attributes.endpoint.reliabilityKind = RELIABLE;
attributes.endpoint.durabilityKind = TRANSIENT_LOCAL;
attributes.endpoint.topicKind = WITH_KEY;

attributes.endpoint.endpointKind = READER;

// Built-in readers never expect inline qos
attributes.expects_inline_qos = false;

attributes.times.heartbeat_response_delay = pdp_heartbeat_response_delay;

set_builtin_endpoint_locators(attributes.endpoint, this, mp_builtin);

return attributes;
}

WriterAttributes PDP::create_builtin_writer_attributes() const
WriterAttributes PDP::create_builtin_writer_attributes()
{
WriterAttributes attributes;

const RTPSParticipantAttributes& pattr = getRTPSParticipant()->get_attributes();
set_builtin_matched_allocation(attributes.matched_readers_allocation, pattr);
set_builtin_endpoint_locators(attributes.endpoint, this, mp_builtin);

// Builtin endpoints are always reliable, transient local, keyed topics
attributes.endpoint.reliabilityKind = RELIABLE;
attributes.endpoint.durabilityKind = TRANSIENT_LOCAL;
attributes.endpoint.topicKind = WITH_KEY;

attributes.endpoint.endpointKind = WRITER;

// We assume that if we have at least one flow controller defined, we use async flow controller
if (!pattr.flow_controllers.empty())
{
attributes.mode = ASYNCHRONOUS_WRITER;
attributes.flow_controller_name = (pattr.builtin.flow_controller_name !=
"") ? pattr.builtin.flow_controller_name : fastdds::rtps::async_flow_controller_name;
}

attributes.times.heartbeat_period = pdp_heartbeat_period;
attributes.times.nack_response_delay = pdp_nack_response_delay;
attributes.times.nack_supression_duration = pdp_nack_supression_duration;

set_builtin_endpoint_locators(attributes.endpoint, this, mp_builtin);

return attributes;
}

Expand Down
5 changes: 2 additions & 3 deletions src/cpp/rtps/builtin/discovery/participant/PDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,9 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable
return temp_writer_proxies_;
}

ReaderAttributes create_builtin_reader_attributes() const;
ReaderAttributes create_builtin_reader_attributes();

WriterAttributes create_builtin_writer_attributes() const;
WriterAttributes create_builtin_writer_attributes();

#if HAVE_SECURITY
void add_builtin_security_attributes(
Expand Down Expand Up @@ -679,7 +679,6 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable

};


// configuration values for PDP reliable entities.
extern const dds::Duration_t pdp_heartbeat_period;
extern const dds::Duration_t pdp_nack_response_delay;
Expand Down
35 changes: 3 additions & 32 deletions src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,6 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(

EPROSIMA_LOG_INFO(RTPS_PDP, "Beginning PDPClient Endpoints creation");

const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->get_attributes();

/***********************************
* PDP READER
***********************************/
Expand All @@ -322,17 +320,8 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(
hatt.memoryPolicy = mp_builtin->m_att.readerHistoryMemoryPolicy;
endpoints.reader.history_.reset(new ReaderHistory(hatt));

ReaderAttributes ratt;
ratt.expects_inline_qos = false;
ratt.endpoint.endpointKind = READER;
ratt.endpoint.multicastLocatorList = mp_builtin->m_metatrafficMulticastLocatorList;
ratt.endpoint.unicastLocatorList = mp_builtin->m_metatrafficUnicastLocatorList;
ratt.endpoint.external_unicast_locators = mp_builtin->m_att.metatraffic_external_unicast_locators;
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
ratt.endpoint.topicKind = WITH_KEY;
ratt.endpoint.durabilityKind = TRANSIENT_LOCAL;
ratt.endpoint.reliabilityKind = RELIABLE;
ratt.times.heartbeat_response_delay = pdp_heartbeat_response_delay;
ReaderAttributes ratt = create_builtin_reader_attributes();

#if HAVE_SECURITY
if (is_discovery_protected)
{
Expand Down Expand Up @@ -377,18 +366,7 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(
hatt.memoryPolicy = mp_builtin->m_att.writerHistoryMemoryPolicy;
endpoints.writer.history_.reset(new WriterHistory(hatt));

WriterAttributes watt;
watt.endpoint.endpointKind = WRITER;
watt.endpoint.durabilityKind = TRANSIENT_LOCAL;
watt.endpoint.reliabilityKind = RELIABLE;
watt.endpoint.topicKind = WITH_KEY;
watt.endpoint.multicastLocatorList = mp_builtin->m_metatrafficMulticastLocatorList;
watt.endpoint.unicastLocatorList = mp_builtin->m_metatrafficUnicastLocatorList;
watt.endpoint.external_unicast_locators = mp_builtin->m_att.metatraffic_external_unicast_locators;
watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
watt.times.heartbeat_period = pdp_heartbeat_period;
watt.times.nack_response_delay = pdp_nack_response_delay;
watt.times.nack_supression_duration = pdp_nack_supression_duration;
WriterAttributes watt = create_builtin_writer_attributes();

#if HAVE_SECURITY
if (is_discovery_protected)
Expand All @@ -399,13 +377,6 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(
}
#endif // HAVE_SECURITY

// We assume that if we have at least one flow controller defined, we use async flow controller
if (!pattr.flow_controllers.empty())
{
watt.mode = ASYNCHRONOUS_WRITER;
watt.flow_controller_name = fastdds::rtps::async_flow_controller_name;
}

RTPSWriter* wout = nullptr;
#if HAVE_SECURITY
EntityId_t writer_entity =
Expand Down
29 changes: 6 additions & 23 deletions src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
bool secure)
{
static_cast<void>(secure);
const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->get_attributes();

/***********************************
* PDP READER
Expand All @@ -349,18 +348,10 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
endpoints.reader.history_.reset(new ReaderHistory(hatt));

// PDP Reader Attributes
ReaderAttributes ratt;
ratt.expects_inline_qos = false;
ratt.endpoint.endpointKind = READER;
ratt.endpoint.multicastLocatorList = mp_builtin->m_metatrafficMulticastLocatorList;
ratt.endpoint.unicastLocatorList = mp_builtin->m_metatrafficUnicastLocatorList;
ratt.endpoint.external_unicast_locators = mp_builtin->m_att.metatraffic_external_unicast_locators;
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
ratt.endpoint.topicKind = WITH_KEY;
ReaderAttributes ratt = create_builtin_reader_attributes();
// Change depending on backup mode
ratt.endpoint.durabilityKind = durability_;
ratt.endpoint.reliabilityKind = RELIABLE;
ratt.times.heartbeat_response_delay = pdp_heartbeat_response_delay;

#if HAVE_SECURITY
if (secure)
{
Expand Down Expand Up @@ -422,8 +413,8 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
endpoints.writer.history_.reset(new WriterHistory(hatt));

// PDP Writer Attributes
WriterAttributes watt;
watt.endpoint.endpointKind = WRITER;
WriterAttributes watt = create_builtin_writer_attributes();

// VOLATILE durability to highlight that on steady state the history is empty (except for announcement DATAs)
// this setting is incompatible with CLIENTs TRANSIENT_LOCAL PDP readers but not validation is done on builitin
// endpoints
Expand All @@ -435,16 +426,8 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
get_writer_persistence_file_name()));
#endif // HAVE_SQLITE3

watt.endpoint.reliabilityKind = RELIABLE;
watt.endpoint.topicKind = WITH_KEY;
watt.endpoint.multicastLocatorList = mp_builtin->m_metatrafficMulticastLocatorList;
watt.endpoint.unicastLocatorList = mp_builtin->m_metatrafficUnicastLocatorList;
watt.endpoint.external_unicast_locators = mp_builtin->m_att.metatraffic_external_unicast_locators;
watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
watt.times.heartbeat_period = pdp_heartbeat_period;
watt.times.nack_response_delay = pdp_nack_response_delay;
watt.times.nack_supression_duration = pdp_nack_supression_duration;
watt.mode = ASYNCHRONOUS_WRITER;
watt.mode = ASYNCHRONOUS_WRITER; //

// Enable separate sending so the filter can be called for each change and reader proxy
watt.separate_sending = true;
#if HAVE_SECURITY
Expand Down
6 changes: 0 additions & 6 deletions src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,6 @@ bool PDPSimple::create_dcps_participant_endpoints()
mp_RTPSParticipant->createSenderResources(entry);
}

// We assume that if we have at least one flow controller defined, we use async flow controller
if (!pattr.flow_controllers.empty())
{
watt.mode = ASYNCHRONOUS_WRITER;
watt.flow_controller_name = fastdds::rtps::async_flow_controller_name;
}

RTPSWriter* rtps_writer = nullptr;
if (mp_RTPSParticipant->createWriter(&rtps_writer, watt, writer.history_.get(),
Expand Down
Loading
Loading