Skip to content

Commit 8d33a5b

Browse files
authored
Remove out the responsibility for bandwidth limitation from RTPSMessageGroup (#6214)
* Refs #23867. Support several flowcontroller factories Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23867. Add new IRTPSMessageGroupLimitation Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23867. Fixes Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23867. Add more info to IRTPSMessageGroupLimitation Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23867. Add doxygen documentation Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23867. Apply suggestions Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23867. Little refactor Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23867. fix tests Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23867. Fix compilation error on Windows Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> --------- Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
1 parent 613789a commit 8d33a5b

File tree

13 files changed

+379
-572
lines changed

13 files changed

+379
-572
lines changed

src/cpp/fastdds/types.hpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#ifndef FASTDDS_TYPES_HPP
2+
#define FASTDDS_TYPES_HPP
3+
4+
#include <rtps/flowcontrol/FlowControllerFactory.hpp>
5+
6+
namespace eprosima {
7+
namespace fastdds {
8+
9+
using FlowControllerFactoryType = fastdds::rtps::FlowControllerFactory;
10+
11+
} // namespace fastdds
12+
} // namespace eprosima
13+
14+
#endif // FASTDDS_TYPES_HPP

src/cpp/rtps/flowcontrol/FlowControllerFactory.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class FlowControllerFactory
3838
*
3939
* @param participant Pointer to the participant owner of this object.
4040
*/
41-
void init(
41+
virtual void init(
4242
fastdds::rtps::RTPSParticipantImpl* participant);
4343

4444
/*!
@@ -60,7 +60,7 @@ class FlowControllerFactory
6060
const std::string& flow_controller_name,
6161
const fastdds::rtps::WriterAttributes& writer_attributes);
6262

63-
private:
63+
protected:
6464

6565
fastdds::rtps::RTPSParticipantImpl* participant_ = nullptr;
6666

src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <fastdds/utils/TimedConditionVariable.hpp>
1414
#include <fastdds/utils/TimedMutex.hpp>
1515

16+
#include <rtps/messages/IRTPSMessageGroupLimitation.hpp>
1617
#include <rtps/messages/RTPSMessageGroup.hpp>
1718
#include <rtps/participant/RTPSParticipantImpl.hpp>
1819
#include <rtps/writer/BaseWriter.hpp>
@@ -277,19 +278,20 @@ struct FlowControllerSyncPublishMode : public FlowControllerPureSyncPublishMode,
277278
};
278279

279280
//! Sends all samples asynchronously but with bandwidth limitation.
280-
struct FlowControllerLimitedAsyncPublishMode : public FlowControllerAsyncPublishMode
281+
struct FlowControllerLimitedAsyncPublishMode : public FlowControllerAsyncPublishMode, public IRTPSMessageGroupLimitation
281282
{
282283
FlowControllerLimitedAsyncPublishMode(
283284
RTPSParticipantImpl* participant,
284285
const FlowControllerDescriptor* descriptor)
285286
: FlowControllerAsyncPublishMode(participant, descriptor)
287+
, sent_bytes_limitation_(static_cast<uint32_t>(descriptor->max_bytes_per_period))
286288
{
287289
assert(nullptr != descriptor);
288290
assert(0 < descriptor->max_bytes_per_period);
289291

290292
max_bytes_per_period = descriptor->max_bytes_per_period;
291293
period_ms = std::chrono::milliseconds(descriptor->period_ms);
292-
group.set_sent_bytes_limitation(static_cast<uint32_t>(max_bytes_per_period));
294+
group.set_limitation(this);
293295
}
294296

295297
bool fast_check_is_there_slot_for_change(
@@ -311,7 +313,7 @@ struct FlowControllerLimitedAsyncPublishMode : public FlowControllerAsyncPublish
311313

312314
}
313315

314-
bool ret = (max_bytes_per_period - group.get_current_bytes_processed()) > size_to_check;
316+
bool ret = (max_bytes_per_period - current_sent_bytes_) > size_to_check;
315317

316318
if (!ret)
317319
{
@@ -345,7 +347,7 @@ struct FlowControllerLimitedAsyncPublishMode : public FlowControllerAsyncPublish
345347
{
346348
last_period_ = std::chrono::steady_clock::now();
347349
force_wait_ = false;
348-
group.reset_current_bytes_processed();
350+
current_sent_bytes_ = 0;
349351
}
350352

351353
return reset_limit;
@@ -365,15 +367,39 @@ struct FlowControllerLimitedAsyncPublishMode : public FlowControllerAsyncPublish
365367
}
366368
}
367369

370+
void add_sent_bytes_by_group(
371+
uint32_t bytes,
372+
RTPSMessageSenderInterface&) override
373+
{
374+
current_sent_bytes_ += bytes;
375+
}
376+
377+
bool data_exceeds_limitation(
378+
CacheChange_t&,
379+
uint32_t size_to_add,
380+
uint32_t pending_to_send,
381+
RTPSMessageSenderInterface&) override
382+
{
383+
return
384+
// either limitation has already been reached
385+
(sent_bytes_limitation_ <= (current_sent_bytes_ + pending_to_send)) ||
386+
// or adding size_to_add will exceed limitation
387+
(size_to_add > (sent_bytes_limitation_ - (current_sent_bytes_ + pending_to_send)));
388+
}
389+
368390
int32_t max_bytes_per_period = 0;
369391

370392
std::chrono::milliseconds period_ms;
371393

372394
private:
373395

374-
bool force_wait_ = false;
396+
bool force_wait_ {false};
397+
398+
std::chrono::steady_clock::time_point last_period_ {std::chrono::steady_clock::now()};
375399

376-
std::chrono::steady_clock::time_point last_period_ = std::chrono::steady_clock::now();
400+
uint32_t sent_bytes_limitation_ {0};
401+
402+
uint32_t current_sent_bytes_ {0};
377403
};
378404

379405

@@ -1068,7 +1094,7 @@ class FlowControllerImpl : public FlowController
10681094
return get_max_payload_impl();
10691095
}
10701096

1071-
private:
1097+
protected:
10721098

10731099
/*!
10741100
* Initialize asynchronous thread.
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* @file IRTPSMessageGroupLimitation.h
17+
*
18+
*/
19+
20+
#ifndef FASTDDS_RTPS_MESSAGES__IRTPSMESSAGEGROUPLIMITATION_HPP
21+
#define FASTDDS_RTPS_MESSAGES__IRTPSMESSAGEGROUPLIMITATION_HPP
22+
23+
#include <cstdint>
24+
25+
namespace eprosima {
26+
namespace fastdds {
27+
namespace rtps {
28+
29+
struct CacheChange_t;
30+
class RTPSMessageSenderInterface;
31+
32+
/*!
33+
* Interface to implement mechanisms that limit the data sent by an RTPSMessageGroup under certain conditions,
34+
* taking into account the bytes already sent by said group.
35+
*/
36+
class IRTPSMessageGroupLimitation
37+
{
38+
public:
39+
40+
/*!
41+
* RTPSMessageGroup uses this method to announce the number of bytes just sent by the group.
42+
*
43+
* @param bytes Number of bytes just sent by the group.
44+
* @param sender RTPSMessageSenderInterface used for delivering sent bytes.
45+
*/
46+
virtual void add_sent_bytes_by_group(
47+
uint32_t bytes,
48+
RTPSMessageSenderInterface& sender) = 0;
49+
50+
/*!
51+
* RTPSMessageGroup uses this method to query whether adding a new change to the group would exceed the
52+
* limitation or not.
53+
*
54+
* @param change Change to be added.
55+
* @param size_to_add Size in bytes that adding the change would imply.
56+
* @param pending_to_send Number of bytes pending to be sent by the group.
57+
* @param sender RTPSMessageSenderInterface that the group will use for data delivery.
58+
*
59+
* @return True if adding the change would exceed the limitation, false otherwise.
60+
*/
61+
virtual bool data_exceeds_limitation(
62+
CacheChange_t& change,
63+
uint32_t size_to_add,
64+
uint32_t pending_to_send,
65+
RTPSMessageSenderInterface& sender) = 0;
66+
};
67+
68+
} // namespace rtps
69+
} // namespace fastdds
70+
} // namespace eprosima
71+
72+
#endif // FASTDDS_RTPS_MESSAGES__IRTPSMESSAGEGROUPLIMITATION_HPP
73+

src/cpp/rtps/messages/RTPSMessageGroup.cpp

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -69,21 +69,6 @@ class CacheChangeInlineQoSWriter final : public InlineQosWriter
6969

7070
};
7171

72-
static bool data_exceeds_limitation(
73-
uint32_t size_to_add,
74-
uint32_t limitation,
75-
uint32_t total_sent,
76-
uint32_t pending_to_send)
77-
{
78-
return
79-
// Limitation has been set and
80-
(0 < limitation) &&
81-
// either limitation has already been reached
82-
((limitation <= (total_sent + pending_to_send)) ||
83-
// or adding size_to_add will exceed limitation
84-
(size_to_add > (limitation - (total_sent + pending_to_send))));
85-
}
86-
8772
static bool append_message(
8873
RTPSParticipantImpl* participant,
8974
CDRMessage_t* full_msg,
@@ -260,6 +245,7 @@ RTPSMessageGroup::RTPSMessageGroup(
260245
, max_blocking_time_point_(max_blocking_time_point)
261246
, send_buffer_(!internal_buffer ? participant->get_send_buffer(max_blocking_time_point) : nullptr)
262247
, internal_buffer_(internal_buffer)
248+
, config_send_buffer_size_(participant->get_min_network_send_buffer_size())
263249
{
264250
// Avoid warning when neither SECURITY nor DEBUG is used
265251
(void)participant;
@@ -406,7 +392,18 @@ void RTPSMessageGroup::send()
406392
{
407393
throw timeout();
408394
}
409-
current_sent_bytes_ += buffers_bytes_;
395+
396+
current_send_buffer_size_ += buffers_bytes_;
397+
if (current_send_buffer_size_ > config_send_buffer_size_)
398+
{
399+
++num_of_exceeded_send_buffer_size;
400+
current_send_buffer_size_ = current_send_buffer_size_ - config_send_buffer_size_;
401+
}
402+
403+
if (nullptr != limitation_)
404+
{
405+
limitation_->add_sent_bytes_by_group(buffers_bytes_, *sender_);
406+
}
410407
}
411408
}
412409
}
@@ -572,8 +569,7 @@ bool RTPSMessageGroup::add_data(
572569

573570
// Check limitation
574571
uint32_t data_size = change.serializedPayload.length;
575-
if (data_exceeds_limitation(data_size, sent_bytes_limitation_, current_sent_bytes_,
576-
buffers_bytes_))
572+
if (nullptr != limitation_ && limitation_->data_exceeds_limitation(change, data_size, buffers_bytes_, *sender_))
577573
{
578574
flush_and_reset();
579575
throw limit_exceeded();
@@ -692,8 +688,7 @@ bool RTPSMessageGroup::add_data_frag(
692688
uint32_t fragment_size = fragment_number < change.getFragmentCount() ? change.getFragmentSize() :
693689
change.serializedPayload.length - fragment_start;
694690
// Check limitation
695-
if (data_exceeds_limitation(fragment_size, sent_bytes_limitation_, current_sent_bytes_,
696-
buffers_bytes_))
691+
if (nullptr != limitation_ && limitation_->data_exceeds_limitation(change, fragment_size, buffers_bytes_, *sender_))
697692
{
698693
flush_and_reset();
699694
throw limit_exceeded();

src/cpp/rtps/messages/RTPSMessageGroup.hpp

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include <fastdds/rtps/transport/NetworkBuffer.hpp>
3333
#include <fastdds/utils/collections/ResourceLimitedVector.hpp>
3434

35+
#include <rtps/messages/IRTPSMessageGroupLimitation.hpp>
3536
#include <rtps/messages/RTPSMessageCreator.hpp>
3637

3738
namespace eprosima {
@@ -232,20 +233,20 @@ class RTPSMessageGroup
232233
return std::numeric_limits<uint16_t>::max() - data_frag_header_size_ - max_inline_qos_size_ - 3;
233234
}
234235

235-
void set_sent_bytes_limitation(
236-
uint32_t limit)
236+
void set_limitation(
237+
IRTPSMessageGroupLimitation* limitation)
237238
{
238-
sent_bytes_limitation_ = limit;
239+
limitation_ = limitation;
239240
}
240241

241-
void reset_current_bytes_processed()
242+
uint32_t current_buffer_bytes() const
242243
{
243-
current_sent_bytes_ = 0;
244+
return buffers_bytes_;
244245
}
245246

246-
inline uint32_t get_current_bytes_processed() const
247+
uint32_t num_exceeded_send_buffer_size() const
247248
{
248-
return current_sent_bytes_ + buffers_bytes_;
249+
return num_of_exceeded_send_buffer_size;
249250
}
250251

251252
private:
@@ -326,17 +327,17 @@ class RTPSMessageGroup
326327
void add_stats_submsg();
327328
#endif // FASTDDS_STATISTICS
328329

329-
RTPSMessageSenderInterface* sender_ = nullptr;
330+
RTPSMessageSenderInterface* sender_ {nullptr};
330331

331-
Endpoint* endpoint_ = nullptr;
332+
Endpoint* endpoint_ {nullptr};
332333

333-
CDRMessage_t* header_msg_ = nullptr;
334+
CDRMessage_t* header_msg_ {nullptr};
334335

335-
CDRMessage_t* submessage_msg_ = nullptr;
336+
CDRMessage_t* submessage_msg_ {nullptr};
336337

337338
GuidPrefix_t current_dst_;
338339

339-
RTPSParticipantImpl* participant_ = nullptr;
340+
RTPSParticipantImpl* participant_ {nullptr};
340341

341342
#if HAVE_SECURITY
342343

@@ -348,11 +349,7 @@ class RTPSMessageGroup
348349

349350
std::unique_ptr<RTPSMessageGroup_t> send_buffer_;
350351

351-
bool internal_buffer_ = false;
352-
353-
uint32_t sent_bytes_limitation_ = 0;
354-
355-
uint32_t current_sent_bytes_ = 0;
352+
bool internal_buffer_ {false};
356353

357354
// Next buffer that will be sent
358355
eprosima::fastdds::rtps::NetworkBuffer pending_buffer_;
@@ -363,6 +360,12 @@ class RTPSMessageGroup
363360
// Vector of payloads of which the RTPSMessageGroup is the owner
364361
ResourceLimitedVector<eprosima::fastdds::rtps::SerializedPayload_t>* payloads_to_send_ = nullptr;
365362

363+
const uint32_t config_send_buffer_size_ {0};
364+
365+
uint32_t current_send_buffer_size_ {0};
366+
367+
uint32_t num_of_exceeded_send_buffer_size {0};
368+
366369
// Bytes to send in the next list of buffers
367370
uint32_t buffers_bytes_ = 0;
368371

@@ -371,6 +374,8 @@ class RTPSMessageGroup
371374

372375
// Fixed padding to be used whenever needed
373376
const octet padding_[3] = {0, 0, 0};
377+
378+
IRTPSMessageGroupLimitation* limitation_ {nullptr};
374379
};
375380

376381
} // namespace rtps

src/cpp/rtps/participant/RTPSParticipantImpl.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
#include <fastdds/rtps/history/IPayloadPool.hpp>
4949
#include <fastdds/rtps/transport/SenderResource.hpp>
5050

51-
#include "../flowcontrol/FlowControllerFactory.hpp"
51+
#include <fastdds/types.hpp>
5252
#include <fastdds/utils/TypePropagation.hpp>
5353
#include <rtps/builtin/data/ReaderProxyData.hpp>
5454
#include <rtps/builtin/data/WriterProxyData.hpp>
@@ -746,7 +746,7 @@ class RTPSParticipantImpl
746746
/*
747747
* Flow controller factory.
748748
*/
749-
FlowControllerFactory flow_controller_factory_;
749+
FlowControllerFactoryType flow_controller_factory_;
750750

751751
#if HAVE_SECURITY
752752
security::ParticipantSecurityAttributes security_attributes_;

0 commit comments

Comments
 (0)