Skip to content

Commit 9cf0594

Browse files
Avoid sending statistics msg with big msgs and no fragmentation (#5743)
* Refs #23001: Add 100kb new Type Signed-off-by: cferreiragonz <[email protected]> * Refs #23001: Regression Test Signed-off-by: cferreiragonz <[email protected]> * Refs #23001: Remove stats buffer before alloc_buffer in SHM Signed-off-by: cferreiragonz <[email protected]> * Uncrustify Signed-off-by: Eugenio Collado <[email protected]> --------- Signed-off-by: cferreiragonz <[email protected]> Signed-off-by: Eugenio Collado <[email protected]> Co-authored-by: Eugenio Collado <[email protected]>
1 parent 300d4ff commit 9cf0594

21 files changed

+1593
-33
lines changed

src/cpp/rtps/messages/RTPSMessageGroup.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,12 @@ void RTPSMessageGroup::send()
308308
}
309309
#endif // if HAVE_SECURITY
310310

311-
eprosima::fastdds::statistics::rtps::add_statistics_submessage(msgToSend);
311+
if (msgToSend->length <
312+
static_cast<uint32_t>(std::numeric_limits<uint16_t>::max() - RTPSMESSAGE_DATA_MIN_LENGTH))
313+
{
314+
// Avoid sending the data message for DATA that are not fragmented and exceed the 65 kB limit
315+
eprosima::fastdds::statistics::rtps::add_statistics_submessage(msgToSend);
316+
}
312317

313318
if (!sender_->send(msgToSend,
314319
max_blocking_time_point_))

test/blackbox/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ set(RTPS_BLACKBOXTESTS_SOURCE ${RTPS_BLACKBOXTESTS_TEST_SOURCE}
6666
types/Data1mb.cxx
6767
types/Data1mbPubSubTypes.cxx
6868
types/Data1mbv1.cxx
69+
types/Data100kb.cxx
70+
types/Data100kbPubSubTypes.cxx
71+
types/Data100kbv1.cxx
6972
types/Data64kb.cxx
7073
types/Data64kbPubSubTypes.cxx
7174
types/Data64kbv1.cxx
@@ -116,6 +119,9 @@ set(BLACKBOXTESTS_SOURCE ${BLACKBOXTESTS_TEST_SOURCE}
116119
types/Data1mb.cxx
117120
types/Data1mbPubSubTypes.cxx
118121
types/Data1mbv1.cxx
122+
types/Data100kb.cxx
123+
types/Data100kbPubSubTypes.cxx
124+
types/Data100kbv1.cxx
119125
types/Data64kb.cxx
120126
types/Data64kbPubSubTypes.cxx
121127
types/Data64kbv1.cxx

test/blackbox/api/dds-pim/PubSubReader.hpp

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ using eprosima::fastrtps::rtps::IPLocator;
7171
using eprosima::fastdds::rtps::UDPTransportDescriptor;
7272
using eprosima::fastdds::rtps::UDPv4TransportDescriptor;
7373
using eprosima::fastdds::rtps::UDPv6TransportDescriptor;
74+
using eprosima::fastdds::rtps::BuiltinTransports;
75+
using eprosima::fastdds::rtps::BuiltinTransportsOptions;
7476

7577
using SampleLostStatusFunctor = std::function<void (const eprosima::fastdds::dds::SampleLostStatus&)>;
7678
using SampleRejectedStatusFunctor = std::function<void (const eprosima::fastdds::dds::SampleRejectedStatus&)>;
@@ -195,7 +197,8 @@ class PubSubReader
195197
do
196198
{
197199
reader_.receive(datareader, ret);
198-
} while (ret);
200+
}
201+
while (ret);
199202
}
200203
}
201204

@@ -373,7 +376,11 @@ class PubSubReader
373376
bool take = true,
374377
bool statistics = false,
375378
bool read = true)
376-
: PubSubReader(topic_name, take, statistics, read)
379+
: PubSubReader(
380+
topic_name,
381+
take,
382+
statistics,
383+
read)
377384
{
378385
filter_expression_ = filter_expression;
379386
expression_parameters_ = expression_parameters;
@@ -1029,15 +1036,15 @@ class PubSubReader
10291036
}
10301037

10311038
PubSubReader& setup_transports(
1032-
eprosima::fastdds::rtps::BuiltinTransports transports)
1039+
BuiltinTransports transports)
10331040
{
10341041
participant_qos_.setup_transports(transports);
10351042
return *this;
10361043
}
10371044

10381045
PubSubReader& setup_transports(
1039-
eprosima::fastdds::rtps::BuiltinTransports transports,
1040-
const eprosima::fastdds::rtps::BuiltinTransportsOptions& options)
1046+
BuiltinTransports transports,
1047+
const BuiltinTransportsOptions& options)
10411048
{
10421049
participant_qos_.setup_transports(transports, options);
10431050
return *this;
@@ -1046,9 +1053,10 @@ class PubSubReader
10461053
PubSubReader& setup_large_data_tcp(
10471054
bool v6 = false,
10481055
const uint16_t& port = 0,
1049-
const uint32_t& tcp_negotiation_timeout = 0)
1056+
const BuiltinTransportsOptions& options = BuiltinTransportsOptions())
10501057
{
10511058
participant_qos_.transport().use_builtin_transports = false;
1059+
participant_qos_.transport().max_msg_size_no_frag = options.maxMessageSize;
10521060

10531061
/* Transports configuration */
10541062
// UDP transport for PDP over multicast
@@ -1066,7 +1074,10 @@ class PubSubReader
10661074
data_transport->check_crc = false;
10671075
data_transport->apply_security = false;
10681076
data_transport->enable_tcp_nodelay = true;
1069-
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
1077+
data_transport->maxMessageSize = options.maxMessageSize;
1078+
data_transport->sendBufferSize = options.sockets_buffer_size;
1079+
data_transport->receiveBufferSize = options.sockets_buffer_size;
1080+
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
10701081
participant_qos_.transport().user_transports.push_back(data_transport);
10711082
}
10721083
else
@@ -1080,7 +1091,10 @@ class PubSubReader
10801091
data_transport->check_crc = false;
10811092
data_transport->apply_security = false;
10821093
data_transport->enable_tcp_nodelay = true;
1083-
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
1094+
data_transport->maxMessageSize = options.maxMessageSize;
1095+
data_transport->sendBufferSize = options.sockets_buffer_size;
1096+
data_transport->receiveBufferSize = options.sockets_buffer_size;
1097+
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
10841098
participant_qos_.transport().user_transports.push_back(data_transport);
10851099
}
10861100

@@ -2354,7 +2368,8 @@ class PubSubReaderWithWaitsets : public PubSubReader<TypeSupport>
23542368
do
23552369
{
23562370
reader_.receive(reader_.datareader_, ret);
2357-
} while (ret);
2371+
}
2372+
while (ret);
23582373
}
23592374
}
23602375

@@ -2390,7 +2405,8 @@ class PubSubReaderWithWaitsets : public PubSubReader<TypeSupport>
23902405
do
23912406
{
23922407
reader_.receive(reader_.datareader_, ret);
2393-
} while (ret);
2408+
}
2409+
while (ret);
23942410
}
23952411
}
23962412
}

test/blackbox/api/dds-pim/PubSubWriter.hpp

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ using eprosima::fastrtps::rtps::IPLocator;
6565
using eprosima::fastdds::rtps::UDPTransportDescriptor;
6666
using eprosima::fastdds::rtps::UDPv4TransportDescriptor;
6767
using eprosima::fastdds::rtps::UDPv6TransportDescriptor;
68+
using eprosima::fastdds::rtps::BuiltinTransports;
69+
using eprosima::fastdds::rtps::BuiltinTransportsOptions;
6870

6971
template<class TypeSupport, typename TypeTraits = PubSubTypeTraits<TypeSupport>>
7072
class PubSubWriter
@@ -986,15 +988,15 @@ class PubSubWriter
986988
}
987989

988990
PubSubWriter& setup_transports(
989-
eprosima::fastdds::rtps::BuiltinTransports transports)
991+
BuiltinTransports transports)
990992
{
991993
participant_qos_.setup_transports(transports);
992994
return *this;
993995
}
994996

995997
PubSubWriter& setup_transports(
996-
eprosima::fastdds::rtps::BuiltinTransports transports,
997-
const eprosima::fastdds::rtps::BuiltinTransportsOptions& options)
998+
BuiltinTransports transports,
999+
const BuiltinTransportsOptions& options)
9981000
{
9991001
participant_qos_.setup_transports(transports, options);
10001002
return *this;
@@ -1003,9 +1005,10 @@ class PubSubWriter
10031005
PubSubWriter& setup_large_data_tcp(
10041006
bool v6 = false,
10051007
const uint16_t& port = 0,
1006-
const uint32_t& tcp_negotiation_timeout = 0)
1008+
const BuiltinTransportsOptions& options = BuiltinTransportsOptions())
10071009
{
10081010
participant_qos_.transport().use_builtin_transports = false;
1011+
participant_qos_.transport().max_msg_size_no_frag = options.maxMessageSize;
10091012

10101013
/* Transports configuration */
10111014
// UDP transport for PDP over multicast
@@ -1015,6 +1018,9 @@ class PubSubWriter
10151018
if (v6)
10161019
{
10171020
auto pdp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv6TransportDescriptor>();
1021+
pdp_transport->maxMessageSize = options.maxMessageSize;
1022+
pdp_transport->sendBufferSize = options.sockets_buffer_size;
1023+
pdp_transport->receiveBufferSize = options.sockets_buffer_size;
10181024
participant_qos_.transport().user_transports.push_back(pdp_transport);
10191025

10201026
auto data_transport = std::make_shared<eprosima::fastdds::rtps::TCPv6TransportDescriptor>();
@@ -1023,12 +1029,18 @@ class PubSubWriter
10231029
data_transport->check_crc = false;
10241030
data_transport->apply_security = false;
10251031
data_transport->enable_tcp_nodelay = true;
1026-
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
1032+
data_transport->maxMessageSize = options.maxMessageSize;
1033+
data_transport->sendBufferSize = options.sockets_buffer_size;
1034+
data_transport->receiveBufferSize = options.sockets_buffer_size;
1035+
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
10271036
participant_qos_.transport().user_transports.push_back(data_transport);
10281037
}
10291038
else
10301039
{
10311040
auto pdp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
1041+
pdp_transport->maxMessageSize = options.maxMessageSize;
1042+
pdp_transport->sendBufferSize = options.sockets_buffer_size;
1043+
pdp_transport->receiveBufferSize = options.sockets_buffer_size;
10321044
participant_qos_.transport().user_transports.push_back(pdp_transport);
10331045

10341046
auto data_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
@@ -1037,7 +1049,10 @@ class PubSubWriter
10371049
data_transport->check_crc = false;
10381050
data_transport->apply_security = false;
10391051
data_transport->enable_tcp_nodelay = true;
1040-
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
1052+
data_transport->maxMessageSize = options.maxMessageSize;
1053+
data_transport->sendBufferSize = options.sockets_buffer_size;
1054+
data_transport->receiveBufferSize = options.sockets_buffer_size;
1055+
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
10411056
participant_qos_.transport().user_transports.push_back(data_transport);
10421057
}
10431058

test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ using eprosima::fastrtps::rtps::IPLocator;
4949
using eprosima::fastrtps::rtps::UDPTransportDescriptor;
5050
using eprosima::fastrtps::rtps::UDPv4TransportDescriptor;
5151
using eprosima::fastrtps::rtps::UDPv6TransportDescriptor;
52+
using eprosima::fastdds::rtps::BuiltinTransports;
53+
using eprosima::fastdds::rtps::BuiltinTransportsOptions;
5254

5355
template<class TypeSupport>
5456
class PubSubReader
@@ -163,7 +165,8 @@ class PubSubReader
163165
do
164166
{
165167
reader_.receive_one(sub, ret);
166-
} while (ret);
168+
}
169+
while (ret);
167170
}
168171
}
169172

@@ -749,15 +752,15 @@ class PubSubReader
749752
}
750753

751754
PubSubReader& setup_transports(
752-
eprosima::fastdds::rtps::BuiltinTransports transports)
755+
BuiltinTransports transports)
753756
{
754757
participant_attr_.rtps.setup_transports(transports);
755758
return *this;
756759
}
757760

758761
PubSubReader& setup_transports(
759-
eprosima::fastdds::rtps::BuiltinTransports transports,
760-
const eprosima::fastdds::rtps::BuiltinTransportsOptions& options)
762+
BuiltinTransports transports,
763+
const BuiltinTransportsOptions& options)
761764
{
762765
participant_attr_.rtps.setup_transports(transports, options);
763766
return *this;
@@ -766,9 +769,10 @@ class PubSubReader
766769
PubSubReader& setup_large_data_tcp(
767770
bool v6 = false,
768771
const uint16_t& port = 0,
769-
const uint32_t& tcp_negotiation_timeout = 0)
772+
const BuiltinTransportsOptions& options = BuiltinTransportsOptions())
770773
{
771774
participant_attr_.rtps.useBuiltinTransports = false;
775+
participant_attr_.rtps.max_msg_size_no_frag = options.maxMessageSize;
772776

773777
/* Transports configuration */
774778
// UDP transport for PDP over multicast
@@ -778,6 +782,9 @@ class PubSubReader
778782
if (v6)
779783
{
780784
auto pdp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv6TransportDescriptor>();
785+
pdp_transport->maxMessageSize = options.maxMessageSize;
786+
pdp_transport->sendBufferSize = options.sockets_buffer_size;
787+
pdp_transport->receiveBufferSize = options.sockets_buffer_size;
781788
participant_attr_.rtps.userTransports.push_back(pdp_transport);
782789

783790
auto data_transport = std::make_shared<eprosima::fastdds::rtps::TCPv6TransportDescriptor>();
@@ -786,12 +793,18 @@ class PubSubReader
786793
data_transport->check_crc = false;
787794
data_transport->apply_security = false;
788795
data_transport->enable_tcp_nodelay = true;
789-
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
796+
data_transport->maxMessageSize = options.maxMessageSize;
797+
data_transport->sendBufferSize = options.sockets_buffer_size;
798+
data_transport->receiveBufferSize = options.sockets_buffer_size;
799+
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
790800
participant_attr_.rtps.userTransports.push_back(data_transport);
791801
}
792802
else
793803
{
794804
auto pdp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
805+
pdp_transport->maxMessageSize = options.maxMessageSize;
806+
pdp_transport->sendBufferSize = options.sockets_buffer_size;
807+
pdp_transport->receiveBufferSize = options.sockets_buffer_size;
795808
participant_attr_.rtps.userTransports.push_back(pdp_transport);
796809

797810
auto data_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
@@ -800,7 +813,10 @@ class PubSubReader
800813
data_transport->check_crc = false;
801814
data_transport->apply_security = false;
802815
data_transport->enable_tcp_nodelay = true;
803-
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
816+
data_transport->maxMessageSize = options.maxMessageSize;
817+
data_transport->sendBufferSize = options.sockets_buffer_size;
818+
data_transport->receiveBufferSize = options.sockets_buffer_size;
819+
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
804820
participant_attr_.rtps.userTransports.push_back(data_transport);
805821
}
806822

test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ using eprosima::fastrtps::rtps::IPLocator;
5353
using eprosima::fastrtps::rtps::UDPTransportDescriptor;
5454
using eprosima::fastrtps::rtps::UDPv4TransportDescriptor;
5555
using eprosima::fastrtps::rtps::UDPv6TransportDescriptor;
56+
using eprosima::fastdds::rtps::BuiltinTransports;
57+
using eprosima::fastdds::rtps::BuiltinTransportsOptions;
5658

5759
template<class TypeSupport>
5860
class PubSubWriter
@@ -760,15 +762,15 @@ class PubSubWriter
760762
}
761763

762764
PubSubWriter& setup_transports(
763-
eprosima::fastdds::rtps::BuiltinTransports transports)
765+
BuiltinTransports transports)
764766
{
765767
participant_attr_.rtps.setup_transports(transports);
766768
return *this;
767769
}
768770

769771
PubSubWriter& setup_transports(
770-
eprosima::fastdds::rtps::BuiltinTransports transports,
771-
const eprosima::fastdds::rtps::BuiltinTransportsOptions& options)
772+
BuiltinTransports transports,
773+
const BuiltinTransportsOptions& options)
772774
{
773775
participant_attr_.rtps.setup_transports(transports, options);
774776
return *this;
@@ -777,9 +779,10 @@ class PubSubWriter
777779
PubSubWriter& setup_large_data_tcp(
778780
bool v6 = false,
779781
const uint16_t& port = 0,
780-
const uint32_t& tcp_negotiation_timeout = 0)
782+
const BuiltinTransportsOptions& options = BuiltinTransportsOptions())
781783
{
782784
participant_attr_.rtps.useBuiltinTransports = false;
785+
participant_attr_.rtps.max_msg_size_no_frag = options.maxMessageSize;
783786

784787
/* Transports configuration */
785788
// UDP transport for PDP over multicast
@@ -789,6 +792,9 @@ class PubSubWriter
789792
if (v6)
790793
{
791794
auto pdp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv6TransportDescriptor>();
795+
pdp_transport->maxMessageSize = options.maxMessageSize;
796+
pdp_transport->sendBufferSize = options.sockets_buffer_size;
797+
pdp_transport->receiveBufferSize = options.sockets_buffer_size;
792798
participant_attr_.rtps.userTransports.push_back(pdp_transport);
793799

794800
auto data_transport = std::make_shared<eprosima::fastdds::rtps::TCPv6TransportDescriptor>();
@@ -797,12 +803,18 @@ class PubSubWriter
797803
data_transport->check_crc = false;
798804
data_transport->apply_security = false;
799805
data_transport->enable_tcp_nodelay = true;
800-
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
806+
data_transport->maxMessageSize = options.maxMessageSize;
807+
data_transport->sendBufferSize = options.sockets_buffer_size;
808+
data_transport->receiveBufferSize = options.sockets_buffer_size;
809+
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
801810
participant_attr_.rtps.userTransports.push_back(data_transport);
802811
}
803812
else
804813
{
805814
auto pdp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
815+
pdp_transport->maxMessageSize = options.maxMessageSize;
816+
pdp_transport->sendBufferSize = options.sockets_buffer_size;
817+
pdp_transport->receiveBufferSize = options.sockets_buffer_size;
806818
participant_attr_.rtps.userTransports.push_back(pdp_transport);
807819

808820
auto data_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
@@ -811,7 +823,10 @@ class PubSubWriter
811823
data_transport->check_crc = false;
812824
data_transport->apply_security = false;
813825
data_transport->enable_tcp_nodelay = true;
814-
data_transport->tcp_negotiation_timeout = tcp_negotiation_timeout;
826+
data_transport->maxMessageSize = options.maxMessageSize;
827+
data_transport->sendBufferSize = options.sockets_buffer_size;
828+
data_transport->receiveBufferSize = options.sockets_buffer_size;
829+
data_transport->tcp_negotiation_timeout = options.tcp_negotiation_timeout;
815830
participant_attr_.rtps.userTransports.push_back(data_transport);
816831
}
817832

0 commit comments

Comments
 (0)