Skip to content

Commit 5a1ad4f

Browse files
committed
Added writer
1 parent decd68c commit 5a1ad4f

File tree

8 files changed

+319
-8
lines changed

8 files changed

+319
-8
lines changed

examples/routing_service/udp_socket_adapter/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ add_library(${PROJECT_NAME}
3737
"${CMAKE_CURRENT_SOURCE_DIR}/src/SocketInputDiscoveryStreamReader.hpp"
3838
"${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamReader.cxx"
3939
"${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamReader.hpp"
40+
"${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamWriter.cxx"
41+
"${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamWriter.hpp"
4042
"${CMAKE_CURRENT_SOURCE_DIR}/src/UdpSocket.cxx"
4143
"${CMAKE_CURRENT_SOURCE_DIR}/src/UdpSocket.hpp"
4244
)

examples/routing_service/udp_socket_adapter/RsSocketAdapter.xml

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@
2828
<routing_service name="SocketAdapterToDDS">
2929
<domain_route name="SocketBridge">
3030
<connection name="SocketConnection" plugin_name="AdapterLib::SocketAdapter">
31-
<registered_type name="ShapeType" type_name="ShapeType"/>
31+
<registered_type name="ShapeType" type_name="ShapeType" />
3232
</connection>
3333
<participant name="DDSConnection">
3434
<domain_id>0</domain_id>
35-
<domain_participant_qos base_name="BuiltinQosLib::Generic.Common"/>
36-
37-
<registered_type name="ShapeType" type_name="ShapeType"/>
35+
<domain_participant_qos base_name="BuiltinQosLib::Generic.Common" />
36+
37+
<registered_type name="ShapeType" type_name="ShapeType" />
3838
</participant>
3939
<session name="session">
4040
<route>
@@ -53,7 +53,7 @@
5353
<!-- Receive port for UDP data -->
5454
<element>
5555
<name>receive_port</name>
56-
<value>10203</value>
56+
<value>10204</value>
5757
</element>
5858
<!-- Shape color. This information is not sent to the UDP socket -->
5959
<element>
@@ -70,11 +70,48 @@
7070
You could use Triangle or Circle if you also modify the other
7171
references to Square that are hardcoded in the adapter code -->
7272
<topic_name>Square</topic_name>
73-
<datawriter_qos base_name="BuiltinQosLib::Generic.Common"/>
73+
<datawriter_qos base_name="BuiltinQosLib::Generic.Common" />
7474
</dds_output>
7575
</route>
76+
<route>
77+
<output connection="SocketConnection">
78+
<creation_mode>ON_ROUTE_MATCH</creation_mode>
79+
<!--creation_mode>IMMEDIATE</creation_mode-->
80+
<registered_type_name>ShapeType</registered_type_name>
81+
<stream_name>Square</stream_name>
82+
<property> <!--Set
83+
the IP addresses and ports for sender and destination-->
84+
<value>
85+
<element>
86+
<name>send_address</name>
87+
<value>127.0.0.1</value>
88+
</element>
89+
<element>
90+
<name>send_port</name>
91+
<value>0</value> <!--Set
92+
to 0 for automatic use of available port-->
93+
</element>
94+
<element>
95+
<name>dest_address</name>
96+
<value>127.0.0.1</value>
97+
</element>
98+
<element>
99+
<name>dest_port</name>
100+
<value>10203</value>
101+
</element>
102+
</value>
103+
</property>
104+
</output>
105+
<dds_input participant="DDSConnection">
106+
<creation_mode>ON_DOMAIN_MATCH</creation_mode>
107+
<!--creation_mode>IMMEDIATE</creation_mode-->
108+
<registered_type_name>ShapeType</registered_type_name>
109+
<topic_name>Square</topic_name>
110+
<datareader_qos base_name="BuiltinQosLib::Generic.Common" />
111+
</dds_input>
112+
</route>
76113
</session>
77114
</domain_route>
78115
</routing_service>
79116

80-
</dds>
117+
</dds>

examples/routing_service/udp_socket_adapter/src/SocketConnection.cxx

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include "SocketConnection.hpp"
1414
#include "SocketStreamReader.hpp"
15+
#include "SocketStreamWriter.hpp"
1516

1617
using namespace rti::routing;
1718
using namespace rti::routing::adapter;
@@ -33,19 +34,37 @@ StreamReader *SocketConnection::create_stream_reader(
3334
return new SocketStreamReader(this, info, properties, listener);
3435
}
3536

37+
StreamWriter *SocketConnection::create_stream_writer(
38+
Session *session,
39+
const StreamInfo &info,
40+
const PropertySet &properties)
41+
{
42+
return new SocketStreamWriter(this, info, properties);
43+
}
44+
3645
void SocketConnection::delete_stream_reader(StreamReader *reader)
3746
{
3847
SocketStreamReader *socket_reader =
3948
dynamic_cast<SocketStreamReader *>(reader);
4049
socket_reader->shutdown_socket_reader_thread();
4150
delete reader;
4251
}
52+
void SocketConnection::delete_stream_writer(StreamWriter *writer)
53+
{
54+
SocketStreamWriter *socket_writer = dynamic_cast<SocketStreamWriter *>(writer);
55+
delete writer;
56+
}
4357

4458
DiscoveryStreamReader *SocketConnection::input_stream_discovery_reader()
4559
{
4660
return &input_discovery_reader_;
4761
}
4862

63+
DiscoveryStreamReader *SocketConnection::output_stream_discovery_reader()
64+
{
65+
return nullptr;
66+
}
67+
4968
void SocketConnection::dispose_discovery_stream(
5069
const rti::routing::StreamInfo &stream_info)
5170
{

examples/routing_service/udp_socket_adapter/src/SocketConnection.hpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,23 @@ class SocketConnection : public rti::routing::adapter::Connection {
3737
const rti::routing::PropertySet &properties,
3838
rti::routing::adapter::StreamReaderListener *listener) final;
3939

40-
// This function will also stop the receiving socket thread
40+
rti::routing::adapter::StreamWriter *create_stream_writer(
41+
rti::routing::adapter::Session *session,
42+
const rti::routing::StreamInfo &info,
43+
const rti::routing::PropertySet &properties) final;
44+
4145
void delete_stream_reader(
4246
rti::routing::adapter::StreamReader *reader) final;
4347

48+
void delete_stream_writer(
49+
rti::routing::adapter::StreamWriter *writer) final;
50+
4451
rti::routing::adapter::DiscoveryStreamReader *
4552
input_stream_discovery_reader() final;
4653

54+
rti::routing::adapter::DiscoveryStreamReader *
55+
output_stream_discovery_reader() final;
56+
4757
/**
4858
* @brief This function is called by the SocketStreamReader to indicate
4959
* that it's time to dispose the route. The dispose set by the
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved.
3+
*
4+
* RTI grants Licensee a license to use, modify, compile, and create derivative
5+
* works of the Software. Licensee has the right to distribute object form
6+
* only for use with RTI products. The Software is provided "as is", with no
7+
* warranty of any type, including any warranty for fitness for any purpose.
8+
* RTI is under no obligation to maintain or support the Software. RTI shall
9+
* not be liable for any incidental or consequential damages arising out of the
10+
* use or inability to use the software.
11+
*/
12+
13+
#include <algorithm>
14+
#include <cctype>
15+
#include <sstream>
16+
#include <thread>
17+
#include <chrono>
18+
19+
#include <iostream>
20+
#include <cstring>
21+
#ifdef _WIN32
22+
#include <winsock2.h>
23+
#pragma comment(lib, "ws2_32.lib")
24+
#else
25+
#include <sys/types.h>
26+
#include <sys/socket.h>
27+
#include <netinet/in.h>
28+
#include <arpa/inet.h>
29+
#include <unistd.h>
30+
#endif
31+
32+
#include <dds/dds.hpp>
33+
#include <rti/rti.hpp>
34+
#include <rti/routing/Logger.hpp>
35+
#include "SocketStreamWriter.hpp"
36+
#include "SocketStreamReader.hpp" //use ShapeType from here
37+
#include <rti/core/Exception.hpp>
38+
39+
using namespace dds::core::xtypes;
40+
using namespace rti::routing;
41+
using namespace rti::routing::adapter;
42+
43+
SocketStreamWriter::SocketStreamWriter(
44+
SocketConnection *connection,
45+
const StreamInfo &info,
46+
const PropertySet &properties
47+
)
48+
: stream_info_(info.stream_name(), info.type_info().type_name())
49+
{
50+
51+
socket_connection_ = connection;
52+
53+
adapter_type_ =
54+
static_cast<DynamicType *>(info.type_info().type_representation());
55+
56+
57+
// Parse the properties provided in the xml configuration file
58+
for (const auto &property : properties) {
59+
if (property.first == SEND_ADDRESS_STRING) {
60+
send_address_ = property.second;
61+
}
62+
else if (property.first == SEND_PORT_STRING) {
63+
send_port_ = std::stoi(property.second);
64+
}
65+
else if (property.first == DEST_ADDRESS_STRING)
66+
{
67+
dest_address_ = property.second;
68+
}
69+
else if (property.first == DEST_PORT_STRING)
70+
{
71+
dest_port_ = std::stoi(property.second);
72+
}
73+
}
74+
75+
socket = std::unique_ptr<UdpSocket>(new UdpSocket(
76+
send_address_.c_str(),
77+
send_port_));
78+
}
79+
80+
int SocketStreamWriter::write(
81+
const std::vector<dds::core::xtypes::DynamicData *> &samples,
82+
const std::vector<dds::sub::SampleInfo *> &infos)
83+
{
84+
size_t len = 0;
85+
86+
ShapeType shapes;
87+
uint32_t tempObject=0;
88+
89+
for (const auto sample : samples) {
90+
//send sample out UDP interface
91+
if (sample->member_exists_in_type("shapesize"))
92+
{
93+
shapes.shapesize = sample->value<int32_t>("shapesize");
94+
shapes.x = sample->value<int32_t>("x");
95+
shapes.y = sample->value<int32_t>("y");
96+
len = +socket->send_data((char*)&shapes, sizeof(shapes), dest_address_.c_str(), dest_port_);
97+
}
98+
else
99+
{
100+
Logger::instance().local("Received Sample that is not valid ShapeType");
101+
}
102+
}
103+
return len;
104+
}
105+
106+
int SocketStreamWriter::write(
107+
const std::vector<dds::core::xtypes::DynamicData *> &samples,
108+
const std::vector<dds::sub::SampleInfo *> &infos,
109+
const SelectorState &selector_state)
110+
{
111+
int len;
112+
len = write(samples, infos);
113+
return len;
114+
}
115+
116+
void SocketStreamWriter::return_loan(
117+
std::vector<dds::core::xtypes::DynamicData *> &samples,
118+
std::vector<dds::sub::SampleInfo *> &infos)
119+
{
120+
for (int i = 0; i < samples.size(); ++i) {
121+
delete samples[i];
122+
delete infos[i];
123+
}
124+
samples.clear();
125+
infos.clear();
126+
}
127+
128+
SocketStreamWriter::~SocketStreamWriter()
129+
{
130+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* (c) 2019 Copyright, Real-Time Innovations, Inc. All rights reserved.
3+
*
4+
* RTI grants Licensee a license to use, modify, compile, and create derivative
5+
* works of the Software. Licensee has the right to distribute object form
6+
* only for use with RTI products. The Software is provided "as is", with no
7+
* warranty of any type, including any warranty for fitness for any purpose.
8+
* RTI is under no obligation to maintain or support the Software. RTI shall
9+
* not be liable for any incidental or consequential damages arising out of the
10+
* use or inability to use the software.
11+
*/
12+
13+
#ifndef SOCKETSTREAMWRITER_HPP
14+
#define SOCKETSTREAMWRITER_HPP
15+
16+
#include <fstream>
17+
#include <iostream>
18+
#include <thread>
19+
#include <cstring>
20+
21+
#include "SocketConnection.hpp"
22+
#include "UdpSocket.hpp"
23+
24+
#include <rti/routing/adapter/AdapterPlugin.hpp>
25+
#include <rti/routing/adapter/StreamReader.hpp>
26+
27+
#define BUFFER_MAX_SIZE 1024
28+
#define SEND_ADDRESS_STRING "send_address"
29+
#define SEND_PORT_STRING "send_port"
30+
#define DEST_ADDRESS_STRING "dest_address"
31+
#define DEST_PORT_STRING "dest_port"
32+
33+
class SocketStreamWriter : public rti::routing::adapter::DynamicDataStreamWriter {
34+
public:
35+
explicit SocketStreamWriter(
36+
SocketConnection *connection,
37+
const rti::routing::StreamInfo &info,
38+
const rti::routing::PropertySet &
39+
);
40+
41+
virtual int
42+
write(const std::vector<dds::core::xtypes::DynamicData *> &,
43+
const std::vector<dds::sub::SampleInfo *> &) final;
44+
45+
virtual int write(
46+
const std::vector<dds::core::xtypes::DynamicData *> &,
47+
const std::vector<dds::sub::SampleInfo *> &,
48+
const rti::routing::adapter::SelectorState &selector_state) final;
49+
50+
virtual void return_loan(
51+
std::vector<dds::core::xtypes::DynamicData *> &,
52+
std::vector<dds::sub::SampleInfo *> &) final;
53+
54+
~SocketStreamWriter();
55+
56+
57+
private:
58+
/**
59+
* @brief Function used by socketreader_thread_ to read samples from the
60+
* socket.
61+
*/
62+
63+
64+
SocketConnection *socket_connection_;
65+
66+
std::unique_ptr<UdpSocket> socket;
67+
68+
int send_port_;
69+
int dest_port_;
70+
71+
std::string send_address_;
72+
std::string dest_address_;
73+
rti::routing::StreamInfo stream_info_;
74+
dds::core::xtypes::DynamicType *adapter_type_;
75+
76+
struct doNothing {
77+
RTI_INT32 CountUp;
78+
RTI_INT32 CountDown;
79+
RTI_INT32 Pause;
80+
//dds::core::optional<RTI_UINT32>ObjectId;
81+
RTI_UINT32 ObjectId;
82+
};
83+
struct ShapeType {
84+
int x;
85+
int y;
86+
int shapesize;
87+
};
88+
89+
};
90+
91+
#endif

examples/routing_service/udp_socket_adapter/src/UdpSocket.cxx

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,15 @@ void UdpSocket::receive_data(
9797
&client_addr_len);
9898

9999
return;
100+
}
101+
102+
int UdpSocket::send_data(char* tx_buffer, int tx_length, const char* destAddr, int destPort)
103+
{
104+
sockaddr_in dest_addr;
105+
dest_addr.sin_family = AF_INET;
106+
dest_addr.sin_port = htons(destPort);
107+
dest_addr.sin_addr.s_addr = inet_addr(destAddr);
108+
109+
size_t length = sendto(sockfd, tx_buffer, tx_length, 0, (struct sockaddr*)&dest_addr, sizeof(dest_addr));
110+
return (int)length;
100111
}

0 commit comments

Comments
 (0)