Skip to content

Commit 960935b

Browse files
authored
Merge pull request OpenDDS#5092 from mitza-oci/relay-control
Updated RtpsRelayControl and its supporting code in RtpsRelay
2 parents afaabf4 + 1ea7720 commit 960935b

File tree

3 files changed

+21
-10
lines changed

3 files changed

+21
-10
lines changed

tools/rtpsrelay/RelayConfigControlListener.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ void RelayConfigControlListener::on_data_available(DDS::DataReader_ptr reader)
1919
RelayConfig control;
2020
DDS::SampleInfo info;
2121
while (control_reader->take_next_sample(control, info) == DDS::RETCODE_OK) {
22-
if (info.valid_data) {
22+
if (info.valid_data && info.instance_state == DDS::ALIVE_INSTANCE_STATE) {
2323
for (const auto& p : control.config()) {
2424
TheServiceParticipant->config_store()->set(p.first, p.second);
2525
}

tools/rtpsrelay/RtpsRelay.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -978,14 +978,24 @@ int run(int argc, ACE_TCHAR* argv[])
978978
return EXIT_FAILURE;
979979
}
980980

981+
DDS::SubscriberQos subscriber_partitioned_qos;
982+
relay_participant->get_default_subscriber_qos(subscriber_partitioned_qos);
983+
subscriber_partitioned_qos.partition.name = publisher_qos.partition.name;
984+
DDS::Subscriber_var partition_subscriber = relay_participant->create_subscriber(subscriber_partitioned_qos, nullptr,
985+
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
986+
if (!partition_subscriber) {
987+
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: failed to create Relay partition subscriber\n"));
988+
return EXIT_FAILURE;
989+
}
990+
981991
DDS::DataReaderQos relay_config_control_qos;
982-
relay_subscriber->get_default_datareader_qos(relay_config_control_qos);
992+
partition_subscriber->get_default_datareader_qos(relay_config_control_qos);
983993
relay_config_control_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
984994

985995
DDS::DataReaderListener_var relay_config_control_listener(new RelayConfigControlListener);
986996
DDS::DataReader_var relay_config_control_reader =
987-
relay_subscriber->create_datareader(cft_config_ctrl, relay_config_control_qos,
988-
relay_config_control_listener, DDS::DATA_AVAILABLE_STATUS);
997+
partition_subscriber->create_datareader(cft_config_ctrl, relay_config_control_qos,
998+
relay_config_control_listener, DDS::DATA_AVAILABLE_STATUS);
989999
if (!relay_config_control_reader) {
9901000
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: failed to create %C data reader\n", RELAY_CONFIG_CONTROL_TOPIC_NAME.c_str()));
9911001
return EXIT_FAILURE;

tools/rtpsrelay/control/RtpsRelayControl.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ struct ShutdownHandler : ACE_Event_Handler {
6565

6666
int write_if_ready(const RelayConfig& config,
6767
RelayConfigDataWriter_var& relay_config_data_writer,
68-
const DDS::WaitSet_var& waiter)
68+
const DDS::WaitSet_var& waiter,
69+
bool& write_pending)
6970
{
7071
DDS::PublicationMatchedStatus matches{};
7172
if (relay_config_data_writer->get_publication_matched_status(matches) != ::DDS::RETCODE_OK) {
@@ -94,9 +95,7 @@ int write_if_ready(const RelayConfig& config,
9495
waiter->detach_condition(status_cond);
9596
}
9697
}
97-
DDS::Publisher_var pub{relay_config_data_writer->get_publisher()};
98-
pub->delete_datawriter(relay_config_data_writer);
99-
relay_config_data_writer = nullptr;
98+
write_pending = false;
10099
}
101100
return EXIT_SUCCESS;
102101
}
@@ -217,6 +216,7 @@ int run(int argc, ACE_TCHAR* argv[])
217216

218217
DDS::WaitSet_var waiter = new DDS::WaitSet;
219218

219+
bool write_pending{false};
220220
RelayConfigDataWriter_var relay_config_data_writer;
221221
if (!config.config().empty()) {
222222
if (config.relay_id().empty()) {
@@ -252,6 +252,7 @@ int run(int argc, ACE_TCHAR* argv[])
252252
DDS::StatusCondition_var cond = relay_config_data_writer->get_statuscondition();
253253
cond->set_enabled_statuses(DDS::PUBLICATION_MATCHED_STATUS);
254254
waiter->attach_condition(cond);
255+
write_pending = true;
255256
}
256257

257258
DDS::Subscriber_var subscriber{participant->create_subscriber(subscriber_qos, nullptr, 0)};
@@ -292,8 +293,8 @@ int run(int argc, ACE_TCHAR* argv[])
292293
waiter->attach_condition(shutdown_guard);
293294

294295
for (auto done{false}; !done;) {
295-
if (relay_config_data_writer) {
296-
if (write_if_ready(config, relay_config_data_writer, waiter) != EXIT_SUCCESS) {
296+
if (relay_config_data_writer && write_pending) {
297+
if (write_if_ready(config, relay_config_data_writer, waiter, write_pending) != EXIT_SUCCESS) {
297298
return EXIT_FAILURE;
298299
}
299300
}

0 commit comments

Comments
 (0)