Skip to content

Commit 38f0644

Browse files
committed
Split example into two. Dynamic work in progress
1 parent 5a1ad4f commit 38f0644

35 files changed

+1453
-7
lines changed

examples/routing_service/udp_socket_adapter/CMakeLists.txt renamed to examples/routing_service/udp_socket_adapter_dynamic/CMakeLists.txt

File renamed without changes.

examples/routing_service/udp_socket_adapter/README.md renamed to examples/routing_service/udp_socket_adapter_dynamic/README.md

File renamed without changes.
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
<?xml version="1.0"?>
2+
<dds xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:noNamespaceSchemaLocation="http://community.rti.com/schema/7.3.0/rti_routing_service.xsd">
4+
5+
<plugin_library name="AdapterLib">
6+
<adapter_plugin name="SocketAdapter">
7+
<dll>SocketAdapterCpp</dll>
8+
<create_function>SocketAdapter_create_adapter_plugin</create_function>
9+
</adapter_plugin>
10+
</plugin_library>
11+
12+
<!-- Demonstrates a scenario where the SocketAdapter reads squares data from a socket
13+
and the DDSAdapter writes it to a DDS domain which can be subscribed to in
14+
Shapes Demo or using 'rtiddsspy -printSample' -->
15+
<routing_service name="SocketAdapterToDDS">
16+
<domain_route name="SocketBridge">
17+
<connection name="SocketConnection" plugin_name="AdapterLib::SocketAdapter">
18+
<registered_type name="ShapeType" type_name="ShapeType" />
19+
</connection>
20+
<participant name="DDSConnection">
21+
<domain_id>0</domain_id>
22+
<domain_participant_qos base_name="BuiltinQosLib::Generic.Common" />
23+
24+
<registered_type name="ShapeType" type_name="ShapeType" />
25+
</participant>
26+
<session name="session">
27+
<route>
28+
<input connection="SocketConnection">
29+
<creation_mode>ON_DOMAIN_MATCH</creation_mode>
30+
<registered_type_name>ShapeType</registered_type_name>
31+
<stream_name>Square</stream_name>
32+
<!-- You must set these 3 properties -->
33+
<property>
34+
<value>
35+
<!-- Receive address for UDP data -->
36+
<element>
37+
<name>receive_address</name>
38+
<value>127.0.0.1</value>
39+
</element>
40+
<!-- Receive port for UDP data -->
41+
<element>
42+
<name>receive_port</name>
43+
<value>10204</value>
44+
</element>
45+
</value>
46+
</property>
47+
</input>
48+
<dds_output participant="DDSConnection">
49+
<creation_mode>ON_ROUTE_MATCH</creation_mode>
50+
<registered_type_name>ShapeType</registered_type_name>
51+
<!-- Topic used to send data. We only support 1 topic for simplicity.
52+
You could use Triangle or Circle if you also modify the other
53+
references to Square that are hardcoded in the adapter code -->
54+
<topic_name>Square</topic_name>
55+
<datawriter_qos base_name="BuiltinQosLib::Generic.Common" />
56+
</dds_output>
57+
</route>
58+
<route>
59+
<output connection="SocketConnection">
60+
<creation_mode>ON_ROUTE_MATCH</creation_mode>
61+
<!--creation_mode>IMMEDIATE</creation_mode-->
62+
<registered_type_name>ShapeType</registered_type_name>
63+
<stream_name>Square</stream_name>
64+
<property> <!--Set
65+
the IP addresses and ports for sender and destination-->
66+
<value>
67+
<element>
68+
<name>send_address</name>
69+
<value>127.0.0.1</value>
70+
</element>
71+
<element>
72+
<name>send_port</name>
73+
<value>0</value> <!--Set
74+
to 0 for automatic use of available port-->
75+
</element>
76+
<element>
77+
<name>dest_address</name>
78+
<value>127.0.0.1</value>
79+
</element>
80+
<element>
81+
<name>dest_port</name>
82+
<value>10204</value>
83+
</element>
84+
</value>
85+
</property>
86+
</output>
87+
<dds_input participant="DDSConnection">
88+
<creation_mode>ON_DOMAIN_MATCH</creation_mode>
89+
<!--creation_mode>IMMEDIATE</creation_mode-->
90+
<registered_type_name>ShapeType</registered_type_name>
91+
<topic_name>Square</topic_name>
92+
<datareader_qos base_name="BuiltinQosLib::Generic.Common" />
93+
</dds_input>
94+
</route>
95+
</session>
96+
</domain_route>
97+
</routing_service>
98+
99+
</dds>

examples/routing_service/udp_socket_adapter/src/SocketAdapter.cxx renamed to examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.cxx

File renamed without changes.

examples/routing_service/udp_socket_adapter/src/SocketAdapter.hpp renamed to examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.hpp

File renamed without changes.

examples/routing_service/udp_socket_adapter/src/SocketConnection.cxx renamed to examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx

File renamed without changes.

examples/routing_service/udp_socket_adapter/src/SocketConnection.hpp renamed to examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.hpp

File renamed without changes.
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved.
3+
*
4+
* RTI grants Licensee a license to use, modify, compile, and create derivative
5+
* works of the Software. Licensee has the right to distribute object form
6+
* only for use with RTI products. The Software is provided "as is", with no
7+
* warranty of any type, including any warranty for fitness for any purpose.
8+
* RTI is under no obligation to maintain or support the Software. RTI shall
9+
* not be liable for any incidental or consequential damages arising out of the
10+
* use or inability to use the software.
11+
*/
12+
13+
#include "SocketInputDiscoveryStreamReader.hpp"
14+
15+
using namespace rti::routing;
16+
using namespace rti::routing::adapter;
17+
18+
SocketInputDiscoveryStreamReader::SocketInputDiscoveryStreamReader(
19+
const PropertySet &,
20+
StreamReaderListener *input_stream_discovery_listener)
21+
{
22+
input_stream_discovery_listener_ = input_stream_discovery_listener;
23+
}
24+
25+
void SocketInputDiscoveryStreamReader::dispose(
26+
const rti::routing::StreamInfo &stream_info)
27+
{
28+
/**
29+
* This guard is essential since the take() and return_loan() operations
30+
* triggered by calling on_data_available() execute on an internal Routing
31+
* Service thread. The custom dispose() operation doesn't run on that
32+
* thread. Since the take() and return_loan() operations also need to access
33+
* the data_samples_ list this protection is required.
34+
*/
35+
std::lock_guard<std::mutex> guard(data_samples_mutex_);
36+
37+
std::unique_ptr<rti::routing::StreamInfo> stream_info_disposed(
38+
new StreamInfo(
39+
stream_info.stream_name(),
40+
stream_info.type_info().type_name()));
41+
stream_info_disposed.get()->disposed(true);
42+
43+
this->data_samples_.push_back(std::move(stream_info_disposed));
44+
input_stream_discovery_listener_->on_data_available(this);
45+
}
46+
47+
void SocketInputDiscoveryStreamReader::take(
48+
std::vector<rti::routing::StreamInfo *> &stream)
49+
{
50+
/**
51+
* This guard is essential since the take() and return_loan() operations
52+
* triggered by calling on_data_available() execute on an internal Routing
53+
* Service thread. The custom dispose() operation doesn't run on that
54+
* thread. Since the take() and return_loan() operations also need to access
55+
* the data_samples_ list this protection is required.
56+
*/
57+
std::lock_guard<std::mutex> guard(data_samples_mutex_);
58+
std::transform(
59+
data_samples_.begin(),
60+
data_samples_.end(),
61+
std::back_inserter(stream),
62+
[](const std::unique_ptr<rti::routing::StreamInfo> &element) {
63+
return element.get();
64+
});
65+
}
66+
67+
void SocketInputDiscoveryStreamReader::return_loan(
68+
std::vector<rti::routing::StreamInfo *> &stream)
69+
{
70+
/**
71+
* This guard is essential since the take() and return_loan() operations
72+
* triggered by calling on_data_available() execute on an internal Routing
73+
* Service thread. The custom dispose() operation doesn't run on that
74+
* thread. Since the take() and return_loan() operations also need to access
75+
* the data_samples_ list this protection is required.
76+
*/
77+
std::lock_guard<std::mutex> guard(data_samples_mutex_);
78+
79+
/**
80+
* For discovery streams there will never be any outstanding return_loan().
81+
* Thus we can be sure that each take() will be followed by a call to
82+
* return_loan(), before the next take() executes.
83+
*/
84+
this->data_samples_.erase(
85+
data_samples_.begin(),
86+
data_samples_.begin() + stream.size());
87+
stream.clear();
88+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved.
3+
*
4+
* RTI grants Licensee a license to use, modify, compile, and create derivative
5+
* works of the Software. Licensee has the right to distribute object form
6+
* only for use with RTI products. The Software is provided "as is", with no
7+
* warranty of any type, including any warranty for fitness for any purpose.
8+
* RTI is under no obligation to maintain or support the Software. RTI shall
9+
* not be liable for any incidental or consequential damages arising out of the
10+
* use or inability to use the software.
11+
*/
12+
13+
#ifndef SOCKETDISCOVERYSTREAMREADER_HPP
14+
#define SOCKETDISCOVERYSTREAMREADER_HPP
15+
16+
#include <fstream>
17+
#include <mutex>
18+
19+
#include <rti/routing/adapter/AdapterPlugin.hpp>
20+
#include <rti/routing/adapter/DiscoveryStreamReader.hpp>
21+
22+
/**
23+
* This class implements a DiscoveryStreamReader, a special kind of StreamReader
24+
* that provide discovery information about the available streams and their
25+
* types.
26+
*/
27+
28+
class SocketInputDiscoveryStreamReader
29+
: public rti::routing::adapter::DiscoveryStreamReader {
30+
public:
31+
SocketInputDiscoveryStreamReader(
32+
const rti::routing::PropertySet &,
33+
rti::routing::adapter::StreamReaderListener
34+
*input_stream_discovery_listener);
35+
36+
void take(std::vector<rti::routing::StreamInfo *> &) final;
37+
38+
void return_loan(std::vector<rti::routing::StreamInfo *> &) final;
39+
40+
/**
41+
* @brief Custom operation defined to indicate disposing off an <input>
42+
* when the SocketStreamReader has finished reading from the socket.
43+
* The SocketInputDiscoveryStreamReader will then create a new
44+
* discovery sample indicating that the stream has been disposed.
45+
* This will cause the Routing Service to start tearing down the Routes
46+
* associated with <input> having the corresponding <registered_type_name>
47+
* and <stream_name>.
48+
*
49+
* @param stream_info \b in. Reference to a StreamInfo object which should
50+
* be used when creating a new StreamInfo sample with disposed set to true
51+
*/
52+
void dispose(const rti::routing::StreamInfo &stream_info);
53+
54+
private:
55+
std::mutex data_samples_mutex_;
56+
std::vector<std::unique_ptr<rti::routing::StreamInfo>> data_samples_;
57+
rti::routing::adapter::StreamReaderListener
58+
*input_stream_discovery_listener_;
59+
};
60+
61+
#endif

0 commit comments

Comments
 (0)