Skip to content
This repository was archived by the owner on Oct 7, 2021. It is now read-only.

Commit 2ef6b61

Browse files
YuShengdirk-thomas
authored andcommitted
use a common datareader to fetch node info from participants.(#242) (#275)
Remove get_discovered_* interface in rmw_node_names.cpp and rmw_node_info_and_types.cpp. Use a built-in datareader to fetch nodes name from *alive* participants. Signed-off-by: YuSheng <[email protected]> fix lint Signed-off-by: YuSheng <[email protected]> return loan samples when found matched node in rmw_node_info_and_types.cpp Signed-off-by: YuSheng <[email protected]>
1 parent c904e3d commit 2ef6b61

File tree

2 files changed

+113
-66
lines changed

2 files changed

+113
-66
lines changed

rmw_opensplice_cpp/src/rmw_node_info_and_types.cpp

Lines changed: 58 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ __get_key(
8383
const char * node_namespace,
8484
DDS::InstanceHandle_t & key)
8585
{
86+
DDS::DataReader_var loopup_datareader;
87+
DDS::ReturnCode_t status;
8688
auto participant = node_info->participant;
8789
if (!participant) {
8890
RMW_SET_ERROR_MSG("participant handle is null");
@@ -97,42 +99,69 @@ __get_key(
9799
return RMW_RET_OK;
98100
}
99101

100-
DDS::InstanceHandleSeq handles;
101-
if (participant->get_discovered_participants(handles) != DDS::RETCODE_OK) {
102-
RMW_SET_ERROR_MSG("unable to fetch discovered participants.");
102+
// Use opensplice get_discovered_participants will get a list of participants including
103+
// alive or disposed. However, we don't want the disposed participants. So we use a datareader
104+
// to read instance alive samples from the built-in topic: "DCPSParticipant", to retrive the
105+
// alive node info from userdata.
106+
// Reference: https://github.com/ADLINK-IST/opensplice/issues/79#issuecomment-456367434
107+
108+
auto builtinSubscriber = participant->get_builtin_subscriber();
109+
loopup_datareader = builtinSubscriber->lookup_datareader("DCPSParticipant");
110+
auto participantReader =
111+
DDS::ParticipantBuiltinTopicDataDataReader::_narrow(loopup_datareader.in());
112+
113+
DDS::ParticipantBuiltinTopicDataSeq data;
114+
DDS::SampleInfoSeq info;
115+
116+
DDS::Duration_t wait_for_historical_data_timeout = {1, 0};
117+
118+
// Make sure all historical data is delivered in the DataReader
119+
participantReader->wait_for_historical_data(wait_for_historical_data_timeout);
120+
121+
// Take samples from DCPSParticipant topic
122+
status = participantReader->read(data, info, DDS::LENGTH_UNLIMITED,
123+
DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
124+
125+
if (status != DDS::RETCODE_OK) {
126+
RMW_SET_ERROR_MSG("unable to discover participants.");
103127
return RMW_RET_ERROR;
104128
}
105129

106-
for (DDS::ULong i = 0; i < handles.length(); ++i) {
107-
DDS::ParticipantBuiltinTopicData pbtd;
108-
dds_ret = participant->get_discovered_participant_data(pbtd, handles[i]);
109-
if (dds_ret == DDS::RETCODE_OK) {
110-
uint8_t * buf = pbtd.user_data.value.get_buffer(false);
111-
if (buf) {
112-
std::vector<uint8_t> kv(buf, buf + pbtd.user_data.value.length());
113-
auto map = rmw::impl::cpp::parse_key_value(kv);
114-
auto name_found = map.find("name");
115-
auto ns_found = map.find("namespace");
116-
117-
if (name_found != map.end() && ns_found != map.end()) {
118-
std::string name(name_found->second.begin(), name_found->second.end());
119-
std::string ns(ns_found->second.begin(), ns_found->second.end());
120-
RCUTILS_LOG_DEBUG_NAMED(
121-
"rmw_opensplice_cpp",
122-
"Found node %s", name.c_str());
123-
if (strcmp(node_name, name.c_str()) == 0 &&
124-
strcmp(node_namespace, ns.c_str()) == 0)
125-
{
126-
key = DDS_BuiltinTopicKey_to_InstanceHandle(pbtd.key);
127-
return RMW_RET_OK;
128-
}
130+
bool node_found = false;
131+
for (DDS::ULong i = 0; i < data.length(); ++i) {
132+
if (info[i].instance_state == DDS::ALIVE_INSTANCE_STATE) {
133+
uint8_t * buf = data[i].user_data.value.get_buffer(false);
134+
if (buf == nullptr) {
135+
continue;
136+
}
137+
std::vector<uint8_t> kv(buf, buf + data[i].user_data.value.length());
138+
auto map = rmw::impl::cpp::parse_key_value(kv);
139+
auto name_found = map.find("name");
140+
auto ns_found = map.find("namespace");
141+
142+
if (name_found != map.end() && ns_found != map.end()) {
143+
std::string name(name_found->second.begin(), name_found->second.end());
144+
std::string ns(ns_found->second.begin(), ns_found->second.end());
145+
RCUTILS_LOG_DEBUG_NAMED(
146+
"rmw_opensplice_cpp",
147+
"Found node %s", name.c_str());
148+
if (strcmp(node_name, name.c_str()) == 0 &&
149+
strcmp(node_namespace, ns.c_str()) == 0)
150+
{
151+
key = DDS_BuiltinTopicKey_to_InstanceHandle(data[i].key);
152+
node_found = true;
153+
break;
129154
}
130155
}
131-
} else {
132-
RMW_SET_ERROR_MSG("unable to fetch discovered participants data.");
133-
return RMW_RET_ERROR;
134156
}
135157
}
158+
159+
participantReader->return_loan(data, info);
160+
161+
if (node_found == true) {
162+
return RMW_RET_OK;
163+
}
164+
136165
RMW_SET_ERROR_MSG("unable to match node_name/namespace with discovered nodes.");
137166
return RMW_RET_ERROR;
138167
}

rmw_opensplice_cpp/src/rmw_node_names.cpp

Lines changed: 55 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include <vector>
1616
#include <string>
17+
#include <iostream>
1718

1819
#include "rcutils/types/string_array.h"
1920
#include "rcutils/logging_macros.h"
@@ -36,6 +37,8 @@ rmw_get_node_names(
3637
rcutils_string_array_t * node_names,
3738
rcutils_string_array_t * node_namespaces)
3839
{
40+
DDS::DataReader_var loopup_datareader;
41+
DDS::ReturnCode_t status;
3942
if (!node) {
4043
RMW_SET_ERROR_MSG("node handle is null");
4144
return RMW_RET_ERROR;
@@ -71,18 +74,37 @@ rmw_get_node_names(
7174
return RMW_RET_ERROR;
7275
}
7376

74-
DDS::InstanceHandleSeq handles;
75-
if (participant->get_discovered_participants(handles) != DDS::RETCODE_OK) {
76-
RMW_SET_ERROR_MSG("unable to fetch discovered participants.");
77+
// Use opensplice get_discovered_participants will get a list of participants including
78+
// alive or disposed. However, we don't want the disposed participants. So we use a datareader
79+
// to read instance alive samples from the built-in topic: "DCPSParticipant", to retrive the
80+
// alive node info from userdata.
81+
// Reference: https://github.com/ADLINK-IST/opensplice/issues/79#issuecomment-456367434
82+
83+
auto builtinSubscriber = participant->get_builtin_subscriber();
84+
loopup_datareader = builtinSubscriber->lookup_datareader("DCPSParticipant");
85+
auto participantReader =
86+
DDS::ParticipantBuiltinTopicDataDataReader::_narrow(loopup_datareader.in());
87+
88+
DDS::ParticipantBuiltinTopicDataSeq data;
89+
DDS::SampleInfoSeq info;
90+
91+
DDS::Duration_t wait_for_historical_data_timeout = {1, 0};
92+
93+
// Make sure all historical data is delivered in the DataReader
94+
participantReader->wait_for_historical_data(wait_for_historical_data_timeout);
95+
96+
// Take samples from DCPSParticipant topic
97+
status = participantReader->read(data, info, DDS::LENGTH_UNLIMITED,
98+
DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
99+
100+
if (status != DDS::RETCODE_OK) {
101+
RMW_SET_ERROR_MSG("unable to discover participants.");
77102
return RMW_RET_ERROR;
78103
}
79104

80-
// Collect all Node names from the list of instances
81-
82105
rcutils_allocator_t allocator = rcutils_get_default_allocator();
83106

84-
// allocate a temporary list for all Node names according to the maximum that can be expected.
85-
int length = handles.length();
107+
int length = data.length();
86108
rcutils_string_array_t node_list = rcutils_get_zero_initialized_string_array();
87109
rcutils_ret_t rcutils_ret = rcutils_string_array_init(&node_list, length, &allocator);
88110
if (rcutils_ret != RCUTILS_RET_OK) {
@@ -100,41 +122,37 @@ rmw_get_node_names(
100122
}
101123

102124
int n = 0;
103-
for (auto i = 0; i < length; ++i) {
104-
DDS::ParticipantBuiltinTopicData pbtd;
105-
106-
auto dds_ret = participant->get_discovered_participant_data(pbtd, handles[i]);
107-
if (dds_ret == DDS::RETCODE_OK) {
108-
uint8_t * buf = pbtd.user_data.value.get_buffer(false);
109-
if (buf) {
110-
std::vector<uint8_t> kv(buf, buf + pbtd.user_data.value.length());
111-
auto map = rmw::impl::cpp::parse_key_value(kv);
112-
auto name_found = map.find("name");
113-
auto ns_found = map.find("namespace");
114-
115-
if (name_found != map.end() && ns_found != map.end()) {
116-
std::string name(name_found->second.begin(), name_found->second.end());
117-
node_list.data[n] = rcutils_strndup(name.c_str(), name.size(), allocator);
118-
if (!node_list.data[n]) {
119-
RMW_SET_ERROR_MSG("could not allocate memory for node name");
120-
goto fail;
121-
}
122-
123-
std::string ns(ns_found->second.begin(), ns_found->second.end());
124-
ns_list.data[n] = rcutils_strndup(ns.c_str(), ns.size(), allocator);
125-
if (!ns_list.data[n]) {
126-
RMW_SET_ERROR_MSG("could not allocate memory for node name");
127-
goto fail;
128-
}
129-
n++;
125+
for (auto i = 0; i < length; i++) {
126+
if (info[i].instance_state == DDS::ALIVE_INSTANCE_STATE) {
127+
uint8_t * buf = data[i].user_data.value.get_buffer(false);
128+
if (buf == nullptr) {
129+
continue;
130+
}
131+
std::vector<uint8_t> kv(buf, buf + data[i].user_data.value.length());
132+
auto map = rmw::impl::cpp::parse_key_value(kv);
133+
auto name_found = map.find("name");
134+
auto ns_found = map.find("namespace");
135+
if (name_found != map.end() && ns_found != map.end()) {
136+
std::string name(name_found->second.begin(), name_found->second.end());
137+
node_list.data[n] = rcutils_strndup(name.c_str(), name.size(), allocator);
138+
if (!node_list.data[n]) {
139+
RMW_SET_ERROR_MSG("could not allocate memory for node name");
140+
goto fail;
130141
}
142+
143+
std::string ns(ns_found->second.begin(), ns_found->second.end());
144+
ns_list.data[n] = rcutils_strndup(ns.c_str(), ns.size(), allocator);
145+
if (!ns_list.data[n]) {
146+
RMW_SET_ERROR_MSG("could not allocate memory for node name");
147+
goto fail;
148+
}
149+
n++;
131150
}
132-
} else {
133-
RMW_SET_ERROR_MSG("unable to fetch discovered participants data.");
134-
return RMW_RET_ERROR;
135151
}
136152
}
137153

154+
participantReader->return_loan(data, info);
155+
138156
// Allocate the node_names out-buffer according to the number of Node names
139157
rcutils_ret = rcutils_string_array_init(node_names, n, &allocator);
140158
if (rcutils_ret != RCUTILS_RET_OK) {

0 commit comments

Comments
 (0)