Skip to content

Commit 148619e

Browse files
authored
Add environment variable to control override of DomainParticipantQos (#41)
* Add environment variable to control override of DomainParticipantQos * Add entry for RMW_CONNEXT_PARTICIPANT_QOS_OVERRIDE_POLICY to README * Add note about relation to RMW_CONNEXT_DISABLE_FAST_ENDPOINT_DISCOVERY * Remove accidentally committed debug statement Signed-off-by: Andrea Sorbini <[email protected]>
1 parent ac2dd6f commit 148619e

File tree

5 files changed

+262
-139
lines changed

5 files changed

+262
-139
lines changed

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ variables.
153153
- [RMW_CONNEXT_ENDPOINT_QOS_OVERRIDE_POLICY](#RMW_CONNEXT_ENDPOINT_QOS_OVERRIDE_POLICY)
154154
- [RMW_CONNEXT_INITIAL_PEERS](#RMW_CONNEXT_INITIAL_PEERS)
155155
- [RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE](#RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE)
156+
- [RMW_CONNEXT_PARTICIPANT_QOS_OVERRIDE_POLICY](#RMW_CONNEXT_PARTICIPANT_QOS_OVERRIDE_POLICY)
156157
- [RMW_CONNEXT_REQUEST_REPLY_MAPPING](#RMW_CONNEXT_REQUEST_REPLY_MAPPING)
157158
- [RMW_CONNEXT_UDP_INTERFACE](#RMW_CONNEXT_UDP_INTERFACE)
158159
- [RMW_CONNEXT_USE_DEFAULT_PUBLISH_MODE](#RMW_CONNEXT_USE_DEFAULT_PUBLISH_MODE)
@@ -276,6 +277,26 @@ In particular, when this mode is enabled, `rmw_connextdds` will revert to adding
276277
a suffix (`_`) to the end of the names of the attributes of the ROS2 data types
277278
propagated via DDS discovery.
278279

280+
### RMW_CONNEXT_PARTICIPANT_QOS_OVERRIDE_POLICY
281+
282+
Control how `rmw_connextdds` will override the default DomainParticipantQos obtained
283+
from Connext.
284+
285+
If this variable is unspecified, or set to `all`, then `rmw_connextdds` will modify
286+
the default DomainParticipantQos with settings derived from ROS 2 options (e.g.
287+
"localhost only", or "node enclave"), and some additional optimizations meant to
288+
improve the out of the box experiene (e.g. speed up endpoint discovery, and increase
289+
the size of type information shared via discovery).
290+
291+
If the variable is set to `basic`, then only those settings associated with ROS 2
292+
options will be modified.
293+
294+
If the variable is set to `never`, then no settings will be modified and the
295+
DomainParticipantQos will be used as is.
296+
297+
Note that values `basic` and `never` will disable the same endpoint discovery
298+
optimizations controlled by [RMW_CONNEXT_DISABLE_FAST_ENDPOINT_DISCOVERY](#RMW_CONNEXT_DISABLE_FAST_ENDPOINT_DISCOVERY).
299+
279300
### RMW_CONNEXT_REQUEST_REPLY_MAPPING
280301

281302
The [DDS-RPC specification](https://www.omg.org/spec/DDS-RPC/About-DDS-RPC/)

rmw_connextdds_common/include/rmw_connextdds/context.hpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,25 @@ struct rmw_context_impl_t
9696
bool optimize_large_data{true};
9797
#endif /* RMW_CONNEXT_DEFAULT_LARGE_DATA_OPTIMIZATIONS */
9898

99+
enum class participant_qos_override_policy_t
100+
{
101+
// Always override the default DomainParticipantQoS obtained at runtime from
102+
// Connext with RMW-specific configuration. This will include settings derived
103+
// from ROS 2 configuration parameters (e.g. "localhost_only", or "enclave"),
104+
// but also some additional configurations that the RMW performs arbitrarly
105+
// to improve the out of the box experience. Note that some of these customizations
106+
// can also be disabled individually (e.g. fast endpoint discovery).
107+
All,
108+
// Only perform basic modifications on the default DomainParticipantQos value
109+
// based on ROS 2 configuration parameters (e.g. "localhost only", and "enclave").
110+
// All other RMW-specific customizations will not be applied.
111+
Basic,
112+
// Use the default DomainParticipantQoS returned by Connext without any modification.
113+
Never,
114+
};
115+
116+
participant_qos_override_policy_t participant_qos_override_policy;
117+
99118
enum class endpoint_qos_override_policy_t
100119
{
101120
// Use default QoS policy got from the DDS qos profile file applying topic filters
@@ -113,6 +132,8 @@ struct rmw_context_impl_t
113132
endpoint_qos_override_policy_t endpoint_qos_override_policy;
114133
std::regex endpoint_qos_override_policy_topics_regex;
115134

135+
struct DDS_StringSeq initial_peers = DDS_SEQUENCE_INITIALIZER;
136+
116137
/* Participant reference count*/
117138
size_t node_count{0};
118139
std::mutex initialization_mutex;

rmw_connextdds_common/include/rmw_connextdds/static_config.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,12 @@
9494

9595
#ifndef RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY
9696
#define RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY "RMW_CONNEXT_ENDPOINT_QOS_OVERRIDE_POLICY"
97-
#endif /* RMW_CONNEXT_ENV_ALLOW_TOPIC_QOS_PROFILES */
97+
#endif /* RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY */
9898

99+
#ifndef RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY
100+
#define RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY \
101+
"RMW_CONNEXT_PARTICIPANT_QOS_OVERRIDE_POLICY"
102+
#endif /* RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY */
99103

100104
/******************************************************************************
101105
* DDS Implementation

rmw_connextdds_common/src/common/rmw_context.cpp

Lines changed: 151 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -91,35 +91,14 @@ rmw_connextdds_initialize_participant_qos(
9191
return RMW_RET_ERROR;
9292
}
9393

94-
/* Lookup and configure initial peer from environment */
95-
const char * initial_peers = nullptr;
96-
const char * lookup_rc =
97-
rcutils_get_env(RMW_CONNEXT_ENV_INITIAL_PEERS, &initial_peers);
98-
99-
if (nullptr != lookup_rc || nullptr == initial_peers) {
100-
RMW_CONNEXT_LOG_ERROR_A_SET(
101-
"failed to lookup from environment: "
102-
"var=%s, "
103-
"rc=%s ",
104-
RMW_CONNEXT_ENV_INITIAL_PEERS,
105-
lookup_rc)
106-
return RMW_RET_ERROR;
107-
}
108-
109-
if ('\0' != initial_peers[0]) {
110-
rmw_ret_t rc = rmw_connextdds_parse_string_list(
111-
initial_peers,
112-
&dp_qos.discovery.initial_peers,
113-
',' /* delimiter */,
114-
true /* trim_elements */,
115-
false /* allow_empty_elements */,
116-
false /* append_values */);
117-
if (RMW_RET_OK != rc) {
118-
RMW_CONNEXT_LOG_ERROR_A(
119-
"failed to parse initial peers: '%s'", initial_peers)
120-
return rc;
94+
if (ctx->participant_qos_override_policy ==
95+
rmw_context_impl_t::participant_qos_override_policy_t::All &&
96+
DDS_StringSeq_get_length(&ctx->initial_peers) > 0)
97+
{
98+
if (!DDS_StringSeq_copy(&dp_qos.discovery.initial_peers, &ctx->initial_peers)) {
99+
RMW_CONNEXT_LOG_ERROR_SET("failed to copy initial peers sequence")
100+
return RMW_RET_ERROR;
121101
}
122-
RMW_CONNEXT_LOG_DEBUG_A("initial DDS peers: %s", initial_peers)
123102
}
124103

125104
return RMW_RET_OK;
@@ -175,60 +154,13 @@ rmw_context_impl_t::initialize_node(
175154
return RMW_RET_OK;
176155
}
177156

178-
179157
rmw_ret_t
180158
rmw_context_impl_t::initialize_participant(const bool localhost_only)
181159
{
182160
RMW_CONNEXT_LOG_DEBUG("initializing DDS DomainParticipant")
183161

184162
this->localhost_only = localhost_only;
185163

186-
/* Lookup RMW_CONNEXT_ENV_ALLOW_TOPIC_QOS_PROFILES env variable.*/
187-
const char * endpoint_qos_policy = nullptr;
188-
const char * lookup_rc = rcutils_get_env(
189-
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY, &endpoint_qos_policy);
190-
191-
if (nullptr != lookup_rc || nullptr == endpoint_qos_policy) {
192-
RMW_CONNEXT_LOG_ERROR_A_SET(
193-
"failed to lookup from environment: "
194-
"var=%s, "
195-
"rc=%s ",
196-
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY,
197-
lookup_rc)
198-
return RMW_RET_ERROR;
199-
}
200-
201-
this->endpoint_qos_override_policy = rmw_context_impl_t::endpoint_qos_override_policy_t::Always;
202-
const char dds_topic_policy_prefix[] = "dds_topics: ";
203-
const char never_policy[] = "never";
204-
const char always_policy[] = "always";
205-
if (
206-
0 == strncmp(
207-
endpoint_qos_policy, dds_topic_policy_prefix, sizeof(dds_topic_policy_prefix) - 1u))
208-
{
209-
this->endpoint_qos_override_policy =
210-
rmw_context_impl_t::endpoint_qos_override_policy_t::DDSTopics;
211-
try {
212-
this->endpoint_qos_override_policy_topics_regex =
213-
&endpoint_qos_policy[sizeof(dds_topic_policy_prefix) - 1u];
214-
} catch (std::regex_error & err) {
215-
RMW_CONNEXT_LOG_ERROR_A_SET(
216-
"regex expression provided in {%s} environment variable is invalid: %s\n",
217-
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY,
218-
err.what());
219-
return RMW_RET_ERROR;
220-
}
221-
} else if (0 == strcmp(endpoint_qos_policy, never_policy)) {
222-
this->endpoint_qos_override_policy = rmw_context_impl_t::endpoint_qos_override_policy_t::Never;
223-
} else if (endpoint_qos_policy[0] != '\0' && strcmp(endpoint_qos_policy, always_policy) != 0) {
224-
RMW_CONNEXT_LOG_ERROR_A_SET(
225-
"Environment variable {%s} has an unexpected value {%s}. "
226-
"Allowed values are {always}, {never} or {dds_topics: <regex_expression>}.\n",
227-
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY,
228-
endpoint_qos_policy);
229-
return RMW_RET_ERROR;
230-
}
231-
232164
if (nullptr == RMW_Connext_gv_DomainParticipantFactory) {
233165
RMW_CONNEXT_LOG_ERROR("DDS DomainParticipantFactory not initialized")
234166
return RMW_RET_ERROR;
@@ -708,6 +640,69 @@ rmw_api_connextdds_init_options_fini(rmw_init_options_t * init_options)
708640
return ret;
709641
}
710642

643+
static
644+
rmw_ret_t
645+
rmw_connextdds_parse_participant_qos_override_policy(
646+
const char * const user_input,
647+
rmw_context_impl_t::participant_qos_override_policy_t & policy)
648+
{
649+
static const char pfx_never[] = "never";
650+
static const char pfx_all[] = "all";
651+
static const char pfx_basic[] = "basic";
652+
653+
policy = rmw_context_impl_t::participant_qos_override_policy_t::All;
654+
655+
if (0 == strcmp(user_input, pfx_never)) {
656+
policy = rmw_context_impl_t::participant_qos_override_policy_t::Never;
657+
} else if (0 == strcmp(user_input, pfx_basic)) {
658+
policy = rmw_context_impl_t::participant_qos_override_policy_t::Basic;
659+
} else if (user_input[0] != '\0' && strcmp(user_input, pfx_all) != 0) {
660+
RMW_CONNEXT_LOG_ERROR_A_SET(
661+
"unexpected value for participant qos override policy. "
662+
"Allowed values are {all}, {basic}, or {never}: %s",
663+
user_input);
664+
return RMW_RET_ERROR;
665+
}
666+
667+
return RMW_RET_OK;
668+
}
669+
670+
static
671+
rmw_ret_t
672+
rmw_connextdds_parse_endpoint_qos_override_policy(
673+
const char * const user_input,
674+
rmw_context_impl_t::endpoint_qos_override_policy_t & policy,
675+
std::regex & policy_regex)
676+
{
677+
static const char pfx_dds_topics[] = "dds_topics: ";
678+
static const size_t pfx_dds_topics_len = sizeof(pfx_dds_topics) - 1u;
679+
static const char pfx_never[] = "never";
680+
static const char pfx_always[] = "always";
681+
682+
policy = rmw_context_impl_t::endpoint_qos_override_policy_t::Always;
683+
684+
if (0 == strncmp(user_input, pfx_dds_topics, pfx_dds_topics_len)) {
685+
policy = rmw_context_impl_t::endpoint_qos_override_policy_t::DDSTopics;
686+
try {
687+
policy_regex = &user_input[pfx_dds_topics_len];
688+
} catch (std::regex_error & err) {
689+
RMW_CONNEXT_LOG_ERROR_A_SET(
690+
"failed to parse regex for endpoint qos override policy: %s",
691+
err.what());
692+
return RMW_RET_ERROR;
693+
}
694+
} else if (0 == strcmp(user_input, pfx_never)) {
695+
policy = rmw_context_impl_t::endpoint_qos_override_policy_t::Never;
696+
} else if (user_input[0] != '\0' && strcmp(user_input, pfx_always) != 0) {
697+
RMW_CONNEXT_LOG_ERROR_A_SET(
698+
"unexpected value for endpoint qos override policy. "
699+
"Allowed values are {always}, {never} or {dds_topics: <regex_expression>}: %s",
700+
user_input);
701+
return RMW_RET_ERROR;
702+
}
703+
704+
return RMW_RET_OK;
705+
}
711706

712707
rmw_ret_t
713708
rmw_api_connextdds_init(
@@ -810,6 +805,56 @@ rmw_api_connextdds_init(
810805
}
811806
ctx->use_default_publish_mode = '\0' != use_default_publish_mode_env[0];
812807

808+
// Check if the user specified a custom override policy for participant qos.
809+
const char * participant_qos_policy = nullptr;
810+
lookup_rc = rcutils_get_env(
811+
RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY, &participant_qos_policy);
812+
813+
if (nullptr != lookup_rc || nullptr == participant_qos_policy) {
814+
RMW_CONNEXT_LOG_ERROR_A_SET(
815+
"failed to lookup from environment: "
816+
"var=%s, "
817+
"rc=%s ",
818+
RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY,
819+
lookup_rc)
820+
return RMW_RET_ERROR;
821+
}
822+
823+
rc = rmw_connextdds_parse_participant_qos_override_policy(
824+
participant_qos_policy, ctx->participant_qos_override_policy);
825+
if (RMW_RET_OK != rc) {
826+
RMW_CONNEXT_LOG_ERROR_A_SET(
827+
"failed to parse value for environment variable {%s}",
828+
RMW_CONNEXT_ENV_PARTICIPANT_QOS_OVERRIDE_POLICY);
829+
return RMW_RET_ERROR;
830+
}
831+
832+
// Check if the user specified a custom override policy for endpoint qos.
833+
const char * endpoint_qos_policy = nullptr;
834+
lookup_rc = rcutils_get_env(
835+
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY, &endpoint_qos_policy);
836+
837+
if (nullptr != lookup_rc || nullptr == endpoint_qos_policy) {
838+
RMW_CONNEXT_LOG_ERROR_A_SET(
839+
"failed to lookup from environment: "
840+
"var=%s, "
841+
"rc=%s ",
842+
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY,
843+
lookup_rc)
844+
return RMW_RET_ERROR;
845+
}
846+
847+
rc = rmw_connextdds_parse_endpoint_qos_override_policy(
848+
endpoint_qos_policy,
849+
ctx->endpoint_qos_override_policy,
850+
ctx->endpoint_qos_override_policy_topics_regex);
851+
if (RMW_RET_OK != rc) {
852+
RMW_CONNEXT_LOG_ERROR_A_SET(
853+
"failed to parse value for environment variable {%s}",
854+
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY);
855+
return RMW_RET_ERROR;
856+
}
857+
813858
// Check if we should run in "compatibility mode" with Cyclone DDS.
814859
const char * cyclone_compatible_env = nullptr;
815860
lookup_rc = rcutils_get_env(
@@ -922,6 +967,37 @@ rmw_api_connextdds_init(
922967
ctx->optimize_large_data = '\0' == disable_optimize_large_data_env[0];
923968
#endif /* RMW_CONNEXT_DEFAULT_LARGE_DATA_OPTIMIZATIONS */
924969

970+
/* Lookup and configure initial peer from environment */
971+
const char * initial_peers = nullptr;
972+
lookup_rc =
973+
rcutils_get_env(RMW_CONNEXT_ENV_INITIAL_PEERS, &initial_peers);
974+
975+
if (nullptr != lookup_rc || nullptr == initial_peers) {
976+
RMW_CONNEXT_LOG_ERROR_A_SET(
977+
"failed to lookup from environment: "
978+
"var=%s, "
979+
"rc=%s ",
980+
RMW_CONNEXT_ENV_INITIAL_PEERS,
981+
lookup_rc)
982+
return RMW_RET_ERROR;
983+
}
984+
985+
if ('\0' != initial_peers[0]) {
986+
rmw_ret_t rc = rmw_connextdds_parse_string_list(
987+
initial_peers,
988+
&ctx->initial_peers,
989+
',' /* delimiter */,
990+
true /* trim_elements */,
991+
false /* allow_empty_elements */,
992+
false /* append_values */);
993+
if (RMW_RET_OK != rc) {
994+
RMW_CONNEXT_LOG_ERROR_A(
995+
"failed to parse initial peers: '%s'", initial_peers)
996+
return rc;
997+
}
998+
RMW_CONNEXT_LOG_DEBUG_A("initial DDS peers: %s", initial_peers)
999+
}
1000+
9251001
if (nullptr == RMW_Connext_gv_DomainParticipantFactory) {
9261002
RMW_CONNEXT_LOG_DEBUG("initializing DDS DomainParticipantFactory")
9271003

0 commit comments

Comments
 (0)