Skip to content

Commit fe87167

Browse files
mergify[bot]cferreiragonzEugenioCollado
authored
Improve DS routines (#5764) (#5784)
* Improve DS routines (#5764) * Refs #22814: Data(p) test Signed-off-by: cferreiragonz <[email protected]> * Refs #22814: Data(r/w) test Signed-off-by: cferreiragonz <[email protected]> * Refs #22814: Tristate for ParticipantsAckStatus Signed-off-by: cferreiragonz <[email protected]> * Refs #22814: Send direct messages to new clients Signed-off-by: cferreiragonz <[email protected]> * Refs #22814: Review - Changes Signed-off-by: cferreiragonz <[email protected]> * Refs #22814: Uncrustify Signed-off-by: cferreiragonz <[email protected]> --------- Signed-off-by: cferreiragonz <[email protected]> Signed-off-by: Eugenio Collado <[email protected]> * Fix tests compilation Signed-off-by: Eugenio Collado <[email protected]> * Add GUID prefix and builtin data filter Signed-off-by: cferreiragonz <[email protected]> * Avoid using parametrized test struct Signed-off-by: cferreiragonz <[email protected]> * Move test to DDS suite Signed-off-by: cferreiragonz <[email protected]> --------- Signed-off-by: cferreiragonz <[email protected]> Signed-off-by: Eugenio Collado <[email protected]> Co-authored-by: Carlos Ferreira González <[email protected]> Co-authored-by: Eugenio Collado <[email protected]>
1 parent fd74fc5 commit fe87167

File tree

12 files changed

+520
-37
lines changed

12 files changed

+520
-37
lines changed

include/fastdds/rtps/transport/test_UDPv4TransportDescriptor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ struct test_UDPv4TransportDescriptor : public SocketTransportDescriptor
4141
mutable std::atomic<uint8_t> dropDataMessagesPercentage;
4242
//! Filtering function for dropping data messages
4343
filter drop_data_messages_filter_;
44+
//! Filtering function for dropping builtin data messages
45+
filter drop_builtin_data_messages_filter_;
4446
//! Flag to enable dropping of discovery Participant DATA(P) messages
4547
bool dropParticipantBuiltinTopicData;
4648
//! Flag to enable dropping of discovery Writer DATA(W) messages

src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ namespace fastdds {
3636
namespace rtps {
3737
namespace ddb {
3838

39+
using ParticipantState = DiscoveryParticipantsAckStatus::ParticipantState;
40+
3941
DiscoveryDataBase::DiscoveryDataBase(
4042
fastrtps::rtps::GuidPrefix_t server_guid_prefix,
4143
std::set<fastrtps::rtps::GuidPrefix_t> servers)
@@ -267,8 +269,8 @@ void DiscoveryDataBase::update_change_and_unmatch_(
267269
changes_to_release_.push_back(entity.update_and_unmatch(new_change));
268270
// Manually set relevant participants ACK status of this server, and of the participant that sent the
269271
// change, to 1. This way, we avoid backprogation of the data.
270-
entity.add_or_update_ack_participant(server_guid_prefix_, true);
271-
entity.add_or_update_ack_participant(new_change->writerGUID.guidPrefix, true);
272+
entity.add_or_update_ack_participant(server_guid_prefix_, ParticipantState::ACKED);
273+
entity.add_or_update_ack_participant(new_change->writerGUID.guidPrefix, ParticipantState::ACKED);
272274
}
273275

274276
void DiscoveryDataBase::add_ack_(
@@ -292,7 +294,7 @@ void DiscoveryDataBase::add_ack_(
292294
// database has been updated, so this ACK is not relevant anymore
293295
if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity())
294296
{
295-
it->second.add_or_update_ack_participant(acked_entity, true);
297+
it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED);
296298
}
297299
}
298300
}
@@ -307,7 +309,7 @@ void DiscoveryDataBase::add_ack_(
307309
// database has been updated, so this ACK is not relevant anymore
308310
if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity())
309311
{
310-
it->second.add_or_update_ack_participant(acked_entity, true);
312+
it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED);
311313
}
312314
}
313315
}
@@ -322,7 +324,7 @@ void DiscoveryDataBase::add_ack_(
322324
// database has been updated, so this ACK is not relevant anymore
323325
if (it->second.change()->write_params.sample_identity() == change->write_params.sample_identity())
324326
{
325-
it->second.add_or_update_ack_participant(acked_entity, true);
327+
it->second.add_or_update_ack_participant(acked_entity, ParticipantState::ACKED);
326328
}
327329
}
328330
}
@@ -694,7 +696,7 @@ void DiscoveryDataBase::create_new_participant_from_change_(
694696

695697
// Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way,
696698
// we avoid backprogation of the data.
697-
ret.first->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
699+
ret.first->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);
698700

699701
// If the DATA(p) it's from this server, it is already in history and we do nothing here
700702
if (change_guid.guidPrefix != server_guid_prefix_)
@@ -796,7 +798,7 @@ void DiscoveryDataBase::update_participant_from_change_(
796798
if (ch->write_params.sample_identity().sequence_number() ==
797799
participant_info.change()->write_params.sample_identity().sequence_number())
798800
{
799-
participant_info.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
801+
participant_info.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);
800802
}
801803

802804
// we release it if it's the same or if it is lower
@@ -846,7 +848,7 @@ void DiscoveryDataBase::create_writers_from_change_(
846848
if (ch->write_params.sample_identity().sequence_number() ==
847849
writer_it->second.change()->write_params.sample_identity().sequence_number())
848850
{
849-
writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
851+
writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);
850852
}
851853

852854
// we release it if it's the same or if it is lower
@@ -894,7 +896,7 @@ void DiscoveryDataBase::create_writers_from_change_(
894896

895897
// Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way,
896898
// we avoid backprogation of the data.
897-
writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
899+
writer_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);
898900

899901
// if topic is virtual, it must iterate over all readers
900902
if (topic_name == virtual_topic_)
@@ -964,7 +966,7 @@ void DiscoveryDataBase::create_readers_from_change_(
964966
if (ch->write_params.sample_identity().sequence_number() ==
965967
reader_it->second.change()->write_params.sample_identity().sequence_number())
966968
{
967-
reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
969+
reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);
968970
}
969971

970972
// we release it if it's the same or if it is lower
@@ -1012,7 +1014,7 @@ void DiscoveryDataBase::create_readers_from_change_(
10121014

10131015
// Manually set to 1 the relevant participants ACK status of the participant that sent the change. This way,
10141016
// we avoid backprogation of the data.
1015-
reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, true);
1017+
reader_it->second.add_or_update_ack_participant(ch->writerGUID.guidPrefix, ParticipantState::ACKED);
10161018

10171019
// if topic is virtual, it must iterate over all readers
10181020
if (topic_name == virtual_topic_)
@@ -1407,37 +1409,42 @@ bool DiscoveryDataBase::process_dirty_topics()
14071409
// Find participants with writer info and participant with reader info in participants_
14081410
parts_reader_it = participants_.find(reader.guidPrefix);
14091411
parts_writer_it = participants_.find(writer.guidPrefix);
1410-
// Find reader info in readers_
1411-
readers_it = readers_.find(reader);
1412-
// Find writer info in writers_
1413-
writers_it = writers_.find(writer);
14141412

14151413
// Check in `participants_` whether the client with the reader has acknowledge the PDP of the client
14161414
// with the writer.
14171415
if (parts_reader_it != participants_.end())
14181416
{
14191417
if (parts_reader_it->second.is_matched(writer.guidPrefix))
14201418
{
1419+
// Find reader info in readers_
1420+
readers_it = readers_.find(reader);
14211421
// Check the status of the writer in `readers_[reader]::relevant_participants_builtin_ack_status`.
14221422
if (readers_it != readers_.end() &&
14231423
readers_it->second.is_relevant_participant(writer.guidPrefix) &&
1424-
!readers_it->second.is_matched(writer.guidPrefix))
1424+
!readers_it->second.is_waiting_ack(writer.guidPrefix))
14251425
{
14261426
// If the status is 0, add DATA(r) to a `edp_publications_to_send_` (if it's not there).
14271427
if (add_edp_subscriptions_to_send_(readers_it->second.change()))
14281428
{
14291429
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind DATA(r) to send: "
14301430
<< readers_it->second.change()->instanceHandle);
1431+
readers_it->second.add_or_update_ack_participant(writer.guidPrefix,
1432+
ParticipantState::WAITING_ACK);
14311433
}
14321434
}
14331435
}
14341436
else if (parts_reader_it->second.is_relevant_participant(writer.guidPrefix))
14351437
{
1436-
// Add DATA(p) of the client with the writer to `pdp_to_send_` (if it's not there).
1437-
if (add_pdp_to_send_(parts_reader_it->second.change()))
1438+
if (!parts_reader_it->second.is_waiting_ack(writer.guidPrefix))
14381439
{
1439-
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind readers' DATA(p) to send: "
1440-
<< parts_reader_it->second.change()->instanceHandle);
1440+
// Add DATA(p) of the client with the writer to `pdp_to_send_` (if it's not there).
1441+
if (add_pdp_to_send_(parts_reader_it->second.change()))
1442+
{
1443+
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding readers' DATA(p) to send: "
1444+
<< parts_reader_it->second.change()->instanceHandle);
1445+
parts_reader_it->second.add_or_update_ack_participant(writer.guidPrefix,
1446+
ParticipantState::WAITING_ACK);
1447+
}
14411448
}
14421449
// Set topic as not-clearable.
14431450
is_clearable = false;
@@ -1450,26 +1457,35 @@ bool DiscoveryDataBase::process_dirty_topics()
14501457
{
14511458
if (parts_writer_it->second.is_matched(reader.guidPrefix))
14521459
{
1460+
// Find writer info in writers_
1461+
writers_it = writers_.find(writer);
14531462
// Check the status of the reader in `writers_[writer]::relevant_participants_builtin_ack_status`.
14541463
if (writers_it != writers_.end() &&
14551464
writers_it->second.is_relevant_participant(reader.guidPrefix) &&
1456-
!writers_it->second.is_matched(reader.guidPrefix))
1465+
!writers_it->second.is_waiting_ack(reader.guidPrefix))
14571466
{
14581467
// If the status is 0, add DATA(w) to a `edp_subscriptions_to_send_` (if it's not there).
14591468
if (add_edp_publications_to_send_(writers_it->second.change()))
14601469
{
14611470
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind DATA(w) to send: "
14621471
<< writers_it->second.change()->instanceHandle);
1472+
writers_it->second.add_or_update_ack_participant(reader.guidPrefix,
1473+
ParticipantState::WAITING_ACK);
14631474
}
14641475
}
14651476
}
14661477
else if (parts_writer_it->second.is_relevant_participant(reader.guidPrefix))
14671478
{
1468-
// Add DATA(p) of the client with the reader to `pdp_to_send_` (if it's not there).
1469-
if (add_pdp_to_send_(parts_writer_it->second.change()))
1479+
if (!parts_writer_it->second.is_waiting_ack(reader.guidPrefix))
14701480
{
1471-
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Addind writers' DATA(p) to send: "
1472-
<< parts_writer_it->second.change()->instanceHandle);
1481+
// Add DATA(p) of the client with the reader to `pdp_to_send_` (if it's not there).
1482+
if (add_pdp_to_send_(parts_writer_it->second.change()))
1483+
{
1484+
EPROSIMA_LOG_INFO(DISCOVERY_DATABASE, "Adding writers' DATA(p) to send: "
1485+
<< parts_writer_it->second.change()->instanceHandle);
1486+
parts_writer_it->second.add_or_update_ack_participant(reader.guidPrefix,
1487+
ParticipantState::WAITING_ACK);
1488+
}
14731489
}
14741490
// Set topic as not-clearable.
14751491
is_clearable = false;
@@ -2463,7 +2479,7 @@ bool DiscoveryDataBase::from_json(
24632479
// Populate GuidPrefix_t
24642480
std::istringstream(it_ack.key()) >> prefix_aux_ack;
24652481

2466-
dpi.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<bool>());
2482+
dpi.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<ParticipantState>());
24672483
}
24682484

24692485
// Add Participant
@@ -2501,7 +2517,7 @@ bool DiscoveryDataBase::from_json(
25012517
// Populate GuidPrefix_t
25022518
std::istringstream(it_ack.key()) >> prefix_aux_ack;
25032519

2504-
dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<bool>());
2520+
dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<ParticipantState>());
25052521
}
25062522

25072523
// Add Participant
@@ -2561,7 +2577,7 @@ bool DiscoveryDataBase::from_json(
25612577
// Populate GuidPrefix_t
25622578
std::istringstream(it_ack.key()) >> prefix_aux_ack;
25632579

2564-
dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<bool>());
2580+
dei.add_or_update_ack_participant(prefix_aux_ack, it_ack.value().get<ParticipantState>());
25652581
}
25662582

25672583
// Add Participant

src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.cpp

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ namespace ddb {
3434

3535
void DiscoveryParticipantsAckStatus::add_or_update_participant(
3636
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p,
37-
bool status = false)
37+
ParticipantState status = ParticipantState::PENDING_SEND)
3838
{
3939
relevant_participants_map_[guid_p] = status;
4040
}
@@ -45,13 +45,24 @@ void DiscoveryParticipantsAckStatus::remove_participant(
4545
relevant_participants_map_.erase(guid_p);
4646
}
4747

48+
bool DiscoveryParticipantsAckStatus::is_waiting_ack(
49+
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const
50+
{
51+
auto it = relevant_participants_map_.find(guid_p);
52+
if (it != relevant_participants_map_.end())
53+
{
54+
return it->second >= ParticipantState::WAITING_ACK;
55+
}
56+
return false;
57+
}
58+
4859
bool DiscoveryParticipantsAckStatus::is_matched(
4960
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const
5061
{
5162
auto it = relevant_participants_map_.find(guid_p);
5263
if (it != relevant_participants_map_.end())
5364
{
54-
return it->second;
65+
return it->second == ParticipantState::ACKED;
5566
}
5667
return false;
5768
}
@@ -60,7 +71,7 @@ void DiscoveryParticipantsAckStatus::unmatch_all()
6071
{
6172
for (auto it = relevant_participants_map_.begin(); it != relevant_participants_map_.end(); ++it)
6273
{
63-
it->second = false;
74+
it->second = ParticipantState::PENDING_SEND;
6475
}
6576
}
6677

@@ -89,7 +100,7 @@ bool DiscoveryParticipantsAckStatus::is_acked_by_all() const
89100
{
90101
for (auto it = relevant_participants_map_.begin(); it != relevant_participants_map_.end(); ++it)
91102
{
92-
if (!it->second)
103+
if (it->second != ParticipantState::ACKED)
93104
{
94105
return false;
95106
}

src/cpp/rtps/builtin/discovery/database/DiscoveryParticipantsAckStatus.hpp

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,25 @@ class DiscoveryParticipantsAckStatus
4545

4646
~DiscoveryParticipantsAckStatus() = default;
4747

48+
enum class ParticipantState : uint8_t
49+
{
50+
PENDING_SEND, // Data(p) has not been sent yet
51+
WAITING_ACK, // Data(p) has already been sent but ACK has not been received
52+
ACKED // Data(p) has been acked
53+
};
54+
4855
void add_or_update_participant(
4956
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p,
50-
bool status);
57+
ParticipantState status);
5158

5259
void remove_participant(
5360
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p);
5461

5562
void unmatch_all();
5663

64+
bool is_waiting_ack(
65+
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const;
66+
5767
bool is_matched(
5868
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const;
5969

@@ -69,9 +79,31 @@ class DiscoveryParticipantsAckStatus
6979

7080
private:
7181

72-
std::map<eprosima::fastrtps::rtps::GuidPrefix_t, bool> relevant_participants_map_;
82+
std::map<eprosima::fastrtps::rtps::GuidPrefix_t, ParticipantState> relevant_participants_map_;
7383
};
7484

85+
inline std::ostream& operator <<(
86+
std::ostream& os,
87+
DiscoveryParticipantsAckStatus::ParticipantState child)
88+
{
89+
switch (child)
90+
{
91+
case DiscoveryParticipantsAckStatus::ParticipantState::PENDING_SEND:
92+
os << "PENDING_SEND";
93+
break;
94+
case DiscoveryParticipantsAckStatus::ParticipantState::WAITING_ACK:
95+
os << "WAITING_ACK";
96+
break;
97+
case DiscoveryParticipantsAckStatus::ParticipantState::ACKED:
98+
os << "ACKED";
99+
break;
100+
default:
101+
os << "UNKNOWN";
102+
break;
103+
}
104+
return os;
105+
}
106+
75107
} /* namespace ddb */
76108
} /* namespace rtps */
77109
} /* namespace fastdds */

src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ DiscoverySharedInfo::DiscoverySharedInfo(
3636
: change_(change)
3737
{
3838
// the server already knows every message
39-
add_or_update_ack_participant(known_participant, true);
39+
add_or_update_ack_participant(known_participant, DiscoveryParticipantsAckStatus::ParticipantState::ACKED);
4040
}
4141

4242
eprosima::fastrtps::rtps::CacheChange_t* DiscoverySharedInfo::update_and_unmatch(

src/cpp/rtps/builtin/discovery/database/DiscoverySharedInfo.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class DiscoverySharedInfo
5656

5757
void add_or_update_ack_participant(
5858
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p,
59-
bool status = false)
59+
DiscoveryParticipantsAckStatus::ParticipantState status = DiscoveryParticipantsAckStatus::ParticipantState::PENDING_SEND)
6060
{
6161
EPROSIMA_LOG_INFO(
6262
DISCOVERY_DATABASE,
@@ -72,6 +72,12 @@ class DiscoverySharedInfo
7272
relevant_participants_builtin_ack_status_.remove_participant(guid_p);
7373
}
7474

75+
bool is_waiting_ack(
76+
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const
77+
{
78+
return relevant_participants_builtin_ack_status_.is_waiting_ack(guid_p);
79+
}
80+
7581
bool is_matched(
7682
const eprosima::fastrtps::rtps::GuidPrefix_t& guid_p) const
7783
{

src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1661,7 +1661,6 @@ void PDPServer::send_announcement(
16611661
EPROSIMA_LOG_ERROR(RTPS_PDP_SERVER, "Error sending announcement from server to clients");
16621662
}
16631663
}
1664-
16651664
}
16661665

16671666
bool PDPServer::read_backup(

0 commit comments

Comments
 (0)