@@ -481,9 +481,6 @@ void DiscoveryDataBase::process_pdp_data_queue()
481
481
// Lock(exclusive mode) mutex locally
482
482
std::lock_guard<std::recursive_mutex> guard (mutex_);
483
483
484
- // Swap DATA queues
485
- pdp_data_queue_.Swap ();
486
-
487
484
// Process all messages in the queque
488
485
while (!pdp_data_queue_.Empty ())
489
486
{
@@ -521,9 +518,6 @@ bool DiscoveryDataBase::process_edp_data_queue()
521
518
// Lock(exclusive mode) mutex locally
522
519
std::lock_guard<std::recursive_mutex> guard (mutex_);
523
520
524
- // Swap DATA queues
525
- edp_data_queue_.Swap ();
526
-
527
521
eprosima::fastrtps::rtps::CacheChange_t* change;
528
522
std::string topic_name;
529
523
@@ -730,32 +724,71 @@ void DiscoveryDataBase::update_participant_from_change_(
730
724
{
731
725
fastrtps::rtps::GUID_t change_guid = guid_from_change (ch);
732
726
727
+ assert (ch->kind == eprosima::fastrtps::rtps::ALIVE);
728
+
729
+ // If the change corresponds to a previously removed participant (which hasn't yet been removed from the map since
730
+ // the DATA(Up) is still unacked), update map with new data and behave as if it was a new participant.
731
+ // Remove also the old change from the disposals collection, if it was added just before
732
+ if (participant_info.change ()->kind != eprosima::fastrtps::rtps::ALIVE)
733
+ {
734
+ // Update the change data
735
+ participant_info.participant_change_data (change_data);
736
+
737
+ // Remove old change from disposals if it was added just before to avoid sending data UP
738
+ auto it = std::find (disposals_.begin (), disposals_.end (), participant_info.change ());
739
+ if (it != disposals_.end ())
740
+ {
741
+ disposals_.erase (it);
742
+ }
743
+
744
+ // Update change. This should add the UNALIVE change to changes_to_release_, which should later both remove the
745
+ // change from the writer's history and release the change
746
+ update_change_and_unmatch_ (ch, participant_info);
747
+
748
+ // If it is local and server we have to create virtual endpoints, except for our own server
749
+ if (change_guid.guidPrefix != server_guid_prefix_ && !change_data.is_client () && change_data.is_local ())
750
+ {
751
+ // Match new server and create virtual endpoints
752
+ // NOTE: match after having updated the change, so virtual endpoints are not discarded for having
753
+ // an associated unalive participant
754
+ match_new_server_ (change_guid.guidPrefix );
755
+ }
756
+
757
+ // Treat as a new participant found
758
+ new_updates_++;
759
+ if (change_guid.guidPrefix != server_guid_prefix_)
760
+ {
761
+ server_acked_by_all (false );
762
+ }
763
+ }
764
+
733
765
// Specific case when a Data(P) from an entity A known as remote comes from the very entity A (we have
734
766
// the Data(P) because of other server B, but now it arrives from A itself)
735
767
// The entity A changes to local
736
768
// Must be local data, or else it is a remote endpoint and should not be changed
737
- if (change_guid.guidPrefix != server_guid_prefix_ && change_data.is_local () &&
769
+ else if (change_guid.guidPrefix != server_guid_prefix_ && change_data.is_local () &&
738
770
DiscoveryDataBase::participant_data_has_changed_ (participant_info, change_data))
739
771
{
772
+ // Update the change data
773
+ participant_info.participant_change_data (change_data);
774
+
775
+ // Update change
776
+ update_change_and_unmatch_ (ch, participant_info);
777
+
740
778
// If the participant changes to server local, virtual endpoints must be added
741
779
// If it is local and server the only possibility is it was a remote server and it must be converted to local
742
780
if (!change_data.is_client ())
743
781
{
782
+ // NOTE: match after having updated the change in order to send the new Data(P)
744
783
match_new_server_ (change_guid.guidPrefix );
745
784
}
746
785
747
- // Update the change data
748
- participant_info.participant_change_data (change_data);
749
-
750
- // Update change
751
- update_change_and_unmatch_ (ch, participant_info);
752
-
753
786
// Treat as a new participant found
754
787
new_updates_++;
755
788
server_acked_by_all (false );
756
789
757
790
// It is possible that this Data(P) is in our history if it has not been acked by all
758
- // In this case we have to resent it with the new update
791
+ // In this case we have to resend it with the new update
759
792
if (!participant_info.is_acked_by_all ())
760
793
{
761
794
add_pdp_to_send_ (ch);
@@ -858,6 +891,29 @@ void DiscoveryDataBase::create_writers_from_change_(
858
891
// The writer was NOT known by the database
859
892
else
860
893
{
894
+ // Check if corresponding participant is known, abort otherwise
895
+ // NOTE: Processing a DATA(w) should always be preceded by the reception and processing of its corresponding
896
+ // participant. However, one may receive a DATA(w) just after the participant has been removed, case in which the
897
+ // former should no longer be processed.
898
+ std::map<eprosima::fastrtps::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator writer_part_it =
899
+ participants_.find (writer_guid.guidPrefix );
900
+ if (writer_part_it == participants_.end ())
901
+ {
902
+ EPROSIMA_LOG_ERROR (DISCOVERY_DATABASE,
903
+ " Writer " << writer_guid << " has no associated participant. Skipping" );
904
+ assert (topic_name != virtual_topic_);
905
+ changes_to_release_.push_back (ch); // Release change so it can be reused
906
+ return ;
907
+ }
908
+ else if (writer_part_it->second .change ()->kind != fastrtps::rtps::ChangeKind_t::ALIVE)
909
+ {
910
+ EPROSIMA_LOG_WARNING (DISCOVERY_DATABASE,
911
+ " Writer " << writer_guid << " is associated to a removed participant. Skipping" );
912
+ assert (topic_name != virtual_topic_);
913
+ changes_to_release_.push_back (ch); // Release change so it can be reused
914
+ return ;
915
+ }
916
+
861
917
// Add entry to writers_
862
918
DiscoveryEndpointInfo tmp_writer (
863
919
ch,
@@ -878,18 +934,7 @@ void DiscoveryDataBase::create_writers_from_change_(
878
934
new_updates_++;
879
935
880
936
// Add entry to participants_[guid_prefix]::writers
881
- std::map<eprosima::fastrtps::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator writer_part_it =
882
- participants_.find (writer_guid.guidPrefix );
883
- if (writer_part_it != participants_.end ())
884
- {
885
- writer_part_it->second .add_writer (writer_guid);
886
- }
887
- else
888
- {
889
- EPROSIMA_LOG_ERROR (DISCOVERY_DATABASE,
890
- " Writer " << writer_guid << " has no associated participant. Skipping" );
891
- return ;
892
- }
937
+ writer_part_it->second .add_writer (writer_guid);
893
938
894
939
// Add writer to writers_by_topic_[topic_name]
895
940
add_writer_to_topic_ (writer_guid, topic_name);
@@ -976,6 +1021,29 @@ void DiscoveryDataBase::create_readers_from_change_(
976
1021
// The reader was NOT known by the database
977
1022
else
978
1023
{
1024
+ // Check if corresponding participant is known, abort otherwise
1025
+ // NOTE: Processing a DATA(r) should always be preceded by the reception and processing of its corresponding
1026
+ // participant. However, one may receive a DATA(r) just after the participant has been removed, case in which the
1027
+ // former should no longer be processed.
1028
+ std::map<eprosima::fastrtps::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator reader_part_it =
1029
+ participants_.find (reader_guid.guidPrefix );
1030
+ if (reader_part_it == participants_.end ())
1031
+ {
1032
+ EPROSIMA_LOG_ERROR (DISCOVERY_DATABASE,
1033
+ " Reader " << reader_guid << " has no associated participant. Skipping" );
1034
+ assert (topic_name != virtual_topic_);
1035
+ changes_to_release_.push_back (ch); // Release change so it can be reused
1036
+ return ;
1037
+ }
1038
+ else if (reader_part_it->second .change ()->kind != fastrtps::rtps::ChangeKind_t::ALIVE)
1039
+ {
1040
+ EPROSIMA_LOG_WARNING (DISCOVERY_DATABASE,
1041
+ " Reader " << reader_guid << " is associated to a removed participant. Skipping" );
1042
+ assert (topic_name != virtual_topic_);
1043
+ changes_to_release_.push_back (ch); // Release change so it can be reused
1044
+ return ;
1045
+ }
1046
+
979
1047
// Add entry to readers_
980
1048
DiscoveryEndpointInfo tmp_reader (
981
1049
ch,
@@ -996,18 +1064,7 @@ void DiscoveryDataBase::create_readers_from_change_(
996
1064
new_updates_++;
997
1065
998
1066
// Add entry to participants_[guid_prefix]::readers
999
- std::map<eprosima::fastrtps::rtps::GuidPrefix_t, DiscoveryParticipantInfo>::iterator reader_part_it =
1000
- participants_.find (reader_guid.guidPrefix );
1001
- if (reader_part_it != participants_.end ())
1002
- {
1003
- reader_part_it->second .add_reader (reader_guid);
1004
- }
1005
- else
1006
- {
1007
- EPROSIMA_LOG_ERROR (DISCOVERY_DATABASE,
1008
- " Reader " << reader_guid << " has no associated participant. Skipping" );
1009
- return ;
1010
- }
1067
+ reader_part_it->second .add_reader (reader_guid);
1011
1068
1012
1069
// Add reader to readers_by_topic_[topic_name]
1013
1070
add_reader_to_topic_ (reader_guid, topic_name);
@@ -1287,7 +1344,7 @@ void DiscoveryDataBase::process_dispose_participant_(
1287
1344
delete_reader_entity_ (reader_guid);
1288
1345
}
1289
1346
1290
- // All participant endoints must be already unmatched in others endopoints relevant_ack maps
1347
+ // All participant endpoints must be already unmatched in others endpoints relevant_ack maps
1291
1348
1292
1349
// Unmatch own participant
1293
1350
unmatch_participant_ (participant_guid.guidPrefix );
@@ -1558,6 +1615,14 @@ bool DiscoveryDataBase::data_queue_empty()
1558
1615
return (pdp_data_queue_.BothEmpty () && edp_data_queue_.BothEmpty ());
1559
1616
}
1560
1617
1618
+ void DiscoveryDataBase::swap_data_queues ()
1619
+ {
1620
+ // Swap EDP before PDP to avoid race condition in which both data P and w/r are received at the same time,
1621
+ // just after having swapped the PDP queue
1622
+ edp_data_queue_.Swap ();
1623
+ pdp_data_queue_.Swap ();
1624
+ }
1625
+
1561
1626
bool DiscoveryDataBase::is_participant (
1562
1627
const eprosima::fastrtps::rtps::GUID_t& guid)
1563
1628
{
0 commit comments