Skip to content

Commit b2c4893

Browse files
Builtin flow controller
Signed-off-by: Eugenio Collado <eugeniocollado@eprosima.com>
1 parent 0bc1b2e commit b2c4893

File tree

9 files changed

+164
-4
lines changed

9 files changed

+164
-4
lines changed

include/fastdds/rtps/attributes/RTPSParticipantAttributes.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,9 @@ class BuiltinAttributes
391391
//! Set to true to avoid multicast traffic on builtin endpoints
392392
bool avoid_builtin_multicast = true;
393393

394+
//! Flow controller name to use for the builtin writers
395+
std::string flow_controller_name = "";
396+
394397
BuiltinAttributes() = default;
395398

396399
virtual ~BuiltinAttributes() = default;
@@ -410,6 +413,7 @@ class BuiltinAttributes
410413
(this->writerHistoryMemoryPolicy == b.writerHistoryMemoryPolicy) &&
411414
(this->writerPayloadSize == b.writerPayloadSize) &&
412415
(this->mutation_tries == b.mutation_tries) &&
416+
(this->flow_controller_name == b.flow_controller_name) &&
413417
(this->avoid_builtin_multicast == b.avoid_builtin_multicast);
414418
}
415419

resources/xsd/fastdds_profiles.xsd

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,8 @@
720720
├ writerHistoryMemoryPolicy [0~1],
721721
├ readerPayloadSize [uint32],
722722
├ writerPayloadSize [uint32],
723-
└ mutation_tries [uint32]-->
723+
├ mutation_tries [uint32],
724+
└ flow_controller_name [string] -->
724725
<xs:complexType name="builtinAttributesType">
725726
<xs:all>
726727
<xs:element name="discovery_config" type="discoverySettingsType" minOccurs="0" maxOccurs="1"/>
@@ -735,6 +736,7 @@
735736
<xs:element name="readerPayloadSize" type="uint32" minOccurs="0" maxOccurs="1"/>
736737
<xs:element name="writerPayloadSize" type="uint32" minOccurs="0" maxOccurs="1"/>
737738
<xs:element name="mutation_tries" type="uint32" minOccurs="0" maxOccurs="1"/>
739+
<xs:element name="flow_controller_name" type="string" minOccurs="0" maxOccurs="1"/>
738740
</xs:all>
739741
</xs:complexType>
740742

src/cpp/fastdds/domain/DomainParticipantImpl.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2090,6 +2090,7 @@ bool DomainParticipantImpl::can_qos_be_updated(
20902090
from.wire_protocol().builtin.writerHistoryMemoryPolicy) ||
20912091
!(to.wire_protocol().builtin.writerPayloadSize == from.wire_protocol().builtin.writerPayloadSize) ||
20922092
!(to.wire_protocol().builtin.mutation_tries == from.wire_protocol().builtin.mutation_tries) ||
2093+
!(to.wire_protocol().builtin.flow_controller_name == from.wire_protocol().builtin.flow_controller_name) ||
20932094
!(to.wire_protocol().builtin.avoid_builtin_multicast ==
20942095
from.wire_protocol().builtin.avoid_builtin_multicast) ||
20952096
!(to.wire_protocol().builtin.discovery_config.discoveryProtocol ==

src/cpp/rtps/builtin/discovery/participant/PDP.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
#include <rtps/builtin/discovery/participant/PDPEndpoints.hpp>
5151
#include <rtps/builtin/discovery/participant/PDPListener.h>
5252
#include <rtps/builtin/liveliness/WLP.hpp>
53+
#include <rtps/flowcontrol/FlowControllerFactory.hpp>
5354
#include <rtps/history/TopicPayloadPoolRegistry.hpp>
5455
#include <rtps/network/utils/external_locators.hpp>
5556
#include <rtps/participant/RTPSParticipantImpl.hpp>
@@ -1700,6 +1701,7 @@ WriterAttributes PDP::static_create_builtin_writer_attributes(
17001701
if (!pattr.flow_controllers.empty())
17011702
{
17021703
attributes.mode = ASYNCHRONOUS_WRITER;
1704+
attributes.flow_controller_name = (pattr.builtin.flow_controller_name != "") ? pattr.builtin.flow_controller_name : fastdds::rtps::async_flow_controller_name;
17031705
}
17041706

17051707
attributes.times.heartbeat_period = pdp_heartbeat_period;

src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1127,7 +1127,6 @@ class FlowControllerImpl : public FlowController
11271127
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
11281128
{
11291129
bool ret_value = false;
1130-
assert(!change->writer_info.is_linked.load());
11311130
// Sync delivery failed. Try to store for asynchronous delivery.
11321131
#if HAVE_STRICT_REALTIME
11331132
std::unique_lock<fastdds::TimedMutex> lock(async_mode.changes_interested_mutex, std::defer_lock);

src/cpp/xmlparser/XMLElementParser.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,7 @@ XMLP_ret XMLParser::getXMLBuiltinAttributes(
593593
<xs:element name="readerHistoryMemoryPolicy" type="historyMemoryPolicyType" minOccurs="0"/>
594594
<xs:element name="writerHistoryMemoryPolicy" type="historyMemoryPolicyType" minOccurs="0"/>
595595
<xs:element name="mutation_tries" type="uint32Type" minOccurs="0"/>
596+
<xs:element name="flow_controller_name" type="stringType" minOccurs="0"/>
596597
</xs:all>
597598
</xs:complexType>
598599
*/
@@ -709,6 +710,14 @@ XMLP_ret XMLParser::getXMLBuiltinAttributes(
709710
return XMLP_ret::XML_ERROR;
710711
}
711712
}
713+
else if (strcmp(name, FLOW_CONTROLLER_NAME) == 0)
714+
{
715+
// flow_controller_name - stringType
716+
if (XMLP_ret::XML_OK != getXMLString(p_aux0, &builtin.flow_controller_name, ident))
717+
{
718+
return XMLP_ret::XML_ERROR;
719+
}
720+
}
712721
else
713722
{
714723
EPROSIMA_LOG_ERROR(XMLPARSER, "Invalid element found into 'builtinAttributesType'. Name: " << name);

test/blackbox/api/dds-pim/PubSubParticipant.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,6 +701,14 @@ class PubSubParticipant
701701
return false;
702702
}
703703

704+
PubSubParticipant& flow_controller(
705+
const std::shared_ptr<eprosima::fastdds::rtps::FlowControllerDescriptor>& flow_controller)
706+
{
707+
participant_qos_.flow_controllers().clear();
708+
participant_qos_.flow_controllers().push_back(flow_controller);
709+
return *this;
710+
}
711+
704712
PubSubParticipant& initial_peers(
705713
const eprosima::fastdds::rtps::LocatorList& initial_peers)
706714
{

test/blackbox/common/BlackboxTestsDiscovery.cpp

Lines changed: 133 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,7 +1298,7 @@ TEST_P(Discovery, EndpointCreationMultithreaded)
12981298
endpoint_thr.join();
12991299
}
13001300

1301-
// Regression test for redmine issue 16253
1301+
//! Regression test for redmine issue 16253
13021302
TEST_P(Discovery, AsymmeticIgnoreParticipantFlags)
13031303
{
13041304
if (INTRAPROCESS != GetParam())
@@ -1416,7 +1416,7 @@ TEST_P(Discovery, single_unicast_pdp_response)
14161416
main_wire_protocol.builtin.discovery_config.initial_announcements.count = 1;
14171417
main_wire_protocol.builtin.discovery_config.initial_announcements.period = { 0, 100000000 };
14181418

1419-
// The main participant will use the test transport and a specific announcments configuration
1419+
// The main participant will use the test transport and a specific announcements configuration
14201420
main_participant->disable_builtin_transport().add_user_transport_to_pparams(test_transport)
14211421
.wire_protocol(main_wire_protocol);
14221422

@@ -1474,6 +1474,137 @@ TEST_P(Discovery, single_unicast_pdp_response)
14741474
participants.clear();
14751475
}
14761476

1477+
//! Regression test for redmine issue 22506
1478+
//! Test using a user's flowcontroller limiting the bandwidth and two remote participants waiting for the PDP sample.
1479+
TEST_P(Discovery, single_unicast_pdp_response_flowcontroller)
1480+
{
1481+
// Leverage intraprocess so transport is only used for participant discovery
1482+
if (INTRAPROCESS != GetParam())
1483+
{
1484+
GTEST_SKIP() << "Only makes sense on INTRAPROCESS";
1485+
return;
1486+
}
1487+
1488+
using namespace eprosima::fastdds::dds;
1489+
1490+
// All participants would restrict communication to UDP localhost.
1491+
// The main participant should send a single initial announcement, and have a big announcement period.
1492+
// This is to ensure that we only check the datagrams sent in response to the participant discovery,
1493+
// and not the ones sent in the periodic announcements.
1494+
// The main participant will use the test transport to count the number of unicast messages sent.
1495+
1496+
// This will hold the multicast port. Since the test is not always run in the same domain, we'll need to set
1497+
// its value when the first multicast datagram is sent.
1498+
std::atomic<uint32_t> multicast_port{ 0 };
1499+
// Declare a test transport that will count the number of unicast messages sent
1500+
std::atomic<size_t> num_unicast_sends{ 0 };
1501+
auto test_transport = std::make_shared<test_UDPv4TransportDescriptor>();
1502+
test_transport->interfaceWhiteList.push_back("127.0.0.1");
1503+
test_transport->locator_filter_ = [&num_unicast_sends, &multicast_port](
1504+
const eprosima::fastdds::rtps::Locator& destination)
1505+
{
1506+
if (IPLocator::isMulticast(destination))
1507+
{
1508+
uint32_t port = 0;
1509+
multicast_port.compare_exchange_strong(port, destination.port);
1510+
}
1511+
else
1512+
{
1513+
num_unicast_sends.fetch_add(1u, std::memory_order_seq_cst);
1514+
}
1515+
1516+
// Do not discard any message
1517+
return false;
1518+
};
1519+
1520+
// Create the main participant
1521+
auto main_participant = std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(0, 0, 0, 0);
1522+
WireProtocolConfigQos main_wire_protocol;
1523+
main_wire_protocol.builtin.avoid_builtin_multicast = true;
1524+
main_wire_protocol.builtin.discovery_config.leaseDuration = c_TimeInfinite;
1525+
main_wire_protocol.builtin.discovery_config.leaseDuration_announcementperiod = { 3600, 0 };
1526+
main_wire_protocol.builtin.discovery_config.initial_announcements.count = 1;
1527+
main_wire_protocol.builtin.discovery_config.initial_announcements.period = { 0, 100000000u };
1528+
main_wire_protocol.builtin.flow_controller_name = "TestFlowController";
1529+
// main_wire_protocol.builtin.flow_controller_name = "FastDDSFlowControllerDefault";
1530+
1531+
// Flowcontroller to limit the bandwidth
1532+
auto test_flow_controller = std::make_shared<eprosima::fastdds::rtps::FlowControllerDescriptor>();
1533+
test_flow_controller->name = "TestFlowController";
1534+
test_flow_controller->max_bytes_per_period = 3120; //3120 OK
1535+
test_flow_controller->period_ms = static_cast<uint64_t>(100);
1536+
1537+
// The main participant will use the test transport, specific announcements configuration and a flowcontroller
1538+
main_participant->disable_builtin_transport().add_user_transport_to_pparams(test_transport)
1539+
.wire_protocol(main_wire_protocol)
1540+
.flow_controller(test_flow_controller);
1541+
1542+
// Start the main participant
1543+
ASSERT_TRUE(main_participant->init_participant());
1544+
1545+
// Wait for the initial announcements to be sent
1546+
std::this_thread::sleep_for(std::chrono::seconds(1));
1547+
// This would have set the multicast port
1548+
EXPECT_NE(multicast_port, 0u);
1549+
1550+
// The rest of the participants only send announcements to the main participant
1551+
// Calculate the metatraffic unicast port of the main participant
1552+
uint32_t port = multicast_port + main_wire_protocol.port.offsetd1 - main_wire_protocol.port.offsetd0;
1553+
1554+
// The rest of the participants only send announcements to the main participant
1555+
auto udp_localhost_transport = std::make_shared<test_UDPv4TransportDescriptor>();
1556+
udp_localhost_transport->interfaceWhiteList.push_back("127.0.0.1");
1557+
Locator peer_locator;
1558+
IPLocator::createLocator(LOCATOR_KIND_UDPv4, "127.0.0.1", port, peer_locator);
1559+
WireProtocolConfigQos wire_protocol;
1560+
wire_protocol.builtin.avoid_builtin_multicast = true;
1561+
wire_protocol.builtin.initialPeersList.push_back(peer_locator);
1562+
wire_protocol.builtin.discovery_config.leaseDuration = c_TimeInfinite;
1563+
wire_protocol.builtin.discovery_config.leaseDuration_announcementperiod = { 3600, 0 };
1564+
wire_protocol.builtin.discovery_config.initial_announcements.count = 1;
1565+
wire_protocol.builtin.discovery_config.initial_announcements.period = { 0, 100000000u };
1566+
1567+
std::vector<std::shared_ptr<PubSubParticipant<HelloWorldPubSubType>>> participants;
1568+
for (size_t i = 0; i < 20; ++i)
1569+
{
1570+
auto participant = std::make_shared<PubSubParticipant<HelloWorldPubSubType>>(0, 0, 0, 0);
1571+
// All participants use the same transport
1572+
participant->disable_builtin_transport().add_user_transport_to_pparams(udp_localhost_transport)
1573+
.wire_protocol(wire_protocol);
1574+
participants.push_back(participant);
1575+
}
1576+
1577+
// Start the rest of the participants
1578+
for (auto& participant : participants)
1579+
{
1580+
ASSERT_TRUE(participant->init_participant());
1581+
participant->wait_discovery(std::chrono::seconds(1), 1, true);
1582+
// participant->wait_discovery();
1583+
// std::this_thread::sleep_for(std::chrono::milliseconds(30));
1584+
std::cout << "\n\nnum_unicast_sends: " << num_unicast_sends.load(std::memory_order::memory_order_seq_cst) << " VS " << participants.size() << std::endl;
1585+
}
1586+
1587+
// EXPECT_EQ(num_unicast_sends.load(std::memory_order::memory_order_seq_cst),
1588+
// participants.size());
1589+
1590+
// Destroy main participant
1591+
main_participant.reset();
1592+
for (auto& participant : participants)
1593+
{
1594+
participant->wait_discovery(std::chrono::seconds(1), 0, true);
1595+
std::cout << "\n\nnum_unicast_sends: " << num_unicast_sends.load(std::memory_order::memory_order_seq_cst) << " VS " << participants.size() << std::endl;
1596+
}
1597+
1598+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
1599+
// Check that only two unicast messages per participant were sent
1600+
std::cout << "\n\nnum_unicast_sends: " << num_unicast_sends.load(std::memory_order::memory_order_seq_cst) << " VS " << participants.size() << std::endl;
1601+
EXPECT_EQ(num_unicast_sends.load(std::memory_order::memory_order_seq_cst),
1602+
participants.size() + participants.size());
1603+
1604+
// Clean up
1605+
participants.clear();
1606+
}
1607+
14771608
#ifdef INSTANTIATE_TEST_SUITE_P
14781609
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
14791610
#else

test/mock/rtps/RTPSParticipantAttributes/fastdds/rtps/attributes/RTPSParticipantAttributes.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,9 @@ class BuiltinAttributes
377377
//! Mutation tries if the port is being used.
378378
uint32_t mutation_tries = 100u;
379379

380+
//! Flow controller name to use for the builtin writers
381+
std::string flow_controller_name = "";
382+
380383
//! Set to true to avoid multicast traffic on builtin endpoints
381384
bool avoid_builtin_multicast = true;
382385

@@ -399,6 +402,7 @@ class BuiltinAttributes
399402
(this->writerHistoryMemoryPolicy == b.writerHistoryMemoryPolicy) &&
400403
(this->writerPayloadSize == b.writerPayloadSize) &&
401404
(this->mutation_tries == b.mutation_tries) &&
405+
(this->flow_controller_name == b.flow_controller_name) &&
402406
(this->avoid_builtin_multicast == b.avoid_builtin_multicast);
403407
}
404408

0 commit comments

Comments
 (0)