Skip to content

Commit 49489ea

Browse files
committed
Review changes. Stream writer and reader improvements
1 parent a1e47f0 commit 49489ea

File tree

5 files changed

+32
-47
lines changed

5 files changed

+32
-47
lines changed

examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414
<routing_service name="SocketAdapterToDDS">
1515
<domain_route name="SocketBridge">
1616
<connection name="SocketConnection" plugin_name="AdapterLib::SocketAdapter">
17-
<register_typename="PingType" type_name="PingType" />
17+
<register_type name="PingType" type_ref="PingType" />
1818
</connection>
1919
<participant name="DDSConnection">
2020
<domain_id>1</domain_id>
2121
<domain_participant_qos base_name="BuiltinQosLib::Generic.Common" />
22-
<register_typename="PingType" type_name="PingType" />
22+
<register_type name="PingType" type_ref="PingType" />
2323
</participant>
2424
<session name="session">
2525
<route>
@@ -57,12 +57,12 @@
5757
<routing_service name="DDSToSocketAdapter">
5858
<domain_route name="SocketBridge">
5959
<connection name="SocketConnection" plugin_name="AdapterLib::SocketAdapter">
60-
<register_typename="PingType" type_name="PingType" />
60+
<register_type name="PingType" type_ref="PingType" />
6161
</connection>
6262
<participant name="DDSConnection">
6363
<domain_id>0</domain_id>
6464
<domain_participant_qos base_name="BuiltinQosLib::Generic.Common" />
65-
<register_typename="PingType" type_name="PingType" />
65+
<register_type name="PingType" type_ref="PingType" />
6666
</participant>
6767
<session>
6868
<route>

examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <chrono>
1616
#include <sstream>
1717
#include <thread>
18+
#include <queue>
1819

1920
#include <cstring>
2021
#include <iostream>
@@ -55,7 +56,10 @@ void SocketStreamReader::socket_reading_thread()
5556
continue;
5657
}
5758

58-
received_bytes_ = received_bytes;
59+
{
60+
std::lock_guard<std::mutex> lock(buffer_mutex_);
61+
received_buffers_.emplace(received_buffer_, received_buffer_ + received_bytes);
62+
}
5963

6064
reader_listener_->on_data_available(this);
6165
}
@@ -95,10 +99,21 @@ void SocketStreamReader::take(
9599
std::vector<dds::core::xtypes::DynamicData *> &samples,
96100
std::vector<dds::sub::SampleInfo *> &infos)
97101
{
102+
std::vector<char> buffer;
103+
{
104+
std::unique_lock<std::mutex> lock(buffer_mutex_);
105+
if (received_buffers_.empty()) {
106+
// No data available
107+
samples.clear();
108+
infos.clear();
109+
return;
110+
}
111+
buffer = std::move(received_buffers_.front());
112+
received_buffers_.pop();
113+
}
114+
98115
dds::core::xtypes::DynamicData deserialized_sample(*adapter_type_);
99-
std::vector<char> received_buffer = std::vector<char>(
100-
received_buffer_, received_buffer_ + received_bytes_);
101-
rti::core::xtypes::from_cdr_buffer(deserialized_sample, received_buffer);
116+
rti::core::xtypes::from_cdr_buffer(deserialized_sample, buffer);
102117

103118
samples.resize(1);
104119
infos.resize(1);

examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <fstream>
1717
#include <iostream>
1818
#include <thread>
19+
#include <queue>
1920

2021
#include "SocketConnection.hpp"
2122
#include "UdpSocket.hpp"
@@ -65,8 +66,8 @@ class SocketStreamReader : public rti::routing::adapter::DynamicDataStreamReader
6566
std::ifstream input_socket_stream_;
6667
std::string receive_address_;
6768
int receive_port_;
68-
char received_buffer_[BUFFER_MAX_SIZE]; // Value that's high enough
69-
int received_bytes_;
69+
char received_buffer_[BUFFER_MAX_SIZE];
70+
std::queue<std::vector<char>> received_buffers_;
7071
std::mutex buffer_mutex_;
7172

7273
rti::routing::StreamInfo stream_info_;

examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -83,41 +83,19 @@ int SocketStreamWriter::write(
8383
{
8484
size_t len = 0;
8585
for (const auto sample : samples) {
86-
std::vector<char> buffer;
87-
rti::core::xtypes::to_cdr_buffer(buffer, *sample);
86+
serialization_buffer_.clear();
87+
rti::core::xtypes::to_cdr_buffer(serialization_buffer_, *sample);
8888
// Send the serialized data
8989
len = socket->send_data(
90-
buffer.data(),
91-
buffer.size(),
90+
serialization_buffer_.data(),
91+
serialization_buffer_.size(),
9292
dest_address_.c_str(),
9393
dest_port_);
9494
}
9595

9696
return len;
9797
}
9898

99-
int SocketStreamWriter::write(
100-
const std::vector<dds::core::xtypes::DynamicData *> &samples,
101-
const std::vector<dds::sub::SampleInfo *> &infos,
102-
const SelectorState &selector_state)
103-
{
104-
int len;
105-
len = write(samples, infos);
106-
return len;
107-
}
108-
109-
void SocketStreamWriter::return_loan(
110-
std::vector<dds::core::xtypes::DynamicData *> &samples,
111-
std::vector<dds::sub::SampleInfo *> &infos)
112-
{
113-
for (int i = 0; i < samples.size(); ++i) {
114-
delete samples[i];
115-
delete infos[i];
116-
}
117-
samples.clear();
118-
infos.clear();
119-
}
120-
12199
SocketStreamWriter::~SocketStreamWriter()
122100
{
123101
}

examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,9 @@ class SocketStreamWriter : public rti::routing::adapter::DynamicDataStreamWriter
3838
const rti::routing::PropertySet &
3939
);
4040

41-
virtual int
42-
write(const std::vector<dds::core::xtypes::DynamicData *> &,
43-
const std::vector<dds::sub::SampleInfo *> &) final;
44-
4541
virtual int write(
4642
const std::vector<dds::core::xtypes::DynamicData *> &,
47-
const std::vector<dds::sub::SampleInfo *> &,
48-
const rti::routing::adapter::SelectorState &selector_state) final;
49-
50-
virtual void return_loan(
51-
std::vector<dds::core::xtypes::DynamicData *> &,
52-
std::vector<dds::sub::SampleInfo *> &) final;
43+
const std::vector<dds::sub::SampleInfo *> &) final;
5344

5445
~SocketStreamWriter();
5546

@@ -61,7 +52,7 @@ class SocketStreamWriter : public rti::routing::adapter::DynamicDataStreamWriter
6152
*/
6253

6354
SocketConnection *socket_connection_;
64-
55+
std::vector<char> serialization_buffer_;
6556
std::unique_ptr<UdpSocket> socket;
6657

6758
int send_port_;

0 commit comments

Comments
 (0)