Skip to content

Commit c1a6403

Browse files
Add dynamic_data_skip_serialization example (#642)
Add dynamic_data_skip_serialization example for C++11 and Python. This is an upcoming feature in Connext DDS 7.3.0.
1 parent 0ac2239 commit c1a6403

File tree

9 files changed

+896
-0
lines changed

9 files changed

+896
-0
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Example Code: Skip Data Serialization for Data Recording
2+
3+
## Concept
4+
5+
There are scenarios where the need to deserialize or inspect data is not necessary.
6+
A common example is recording data. In this case, the application can simply
7+
record the binary data as it is received, and then replay it later. This provides
8+
a significant performance improvement.
9+
10+
The DynamicData API provides a mode that allows sending or receiving data in its
11+
CDR format, without serializing or deserializing it.
12+
13+
Note that while Connext provides a Recording Service, there may be situations
14+
where a custom Recording application may be useful.
15+
16+
## Example Description
17+
18+
This example implements a simple recording application that uses the DynamicData
19+
API to receive data in CDR format and directly record it in a file. The example
20+
also provides a replay option that reads the data buffers from the file
21+
and publishes them back. For convenience, an option to publish a few samples
22+
to test the record and replay functionality is also provided.
23+
24+
The key parts of the example are implemented in the ``record()`` and
25+
``replay()`` functions in the example source code for each language.
26+
27+
The example is very simple and uses a single type and topic, but it could be
28+
extended to use discovered types (see the
29+
[built-in topics example](../builtin_topics/)) to implement a more
30+
general-purpose recording application.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#
2+
# (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved.
3+
#
4+
# RTI grants Licensee a license to use, modify, compile, and create derivative
5+
# works of the Software. Licensee has the right to distribute object form
6+
# only for use with RTI products. The Software is provided "as is", with no
7+
# warranty of any type, including any warranty for fitness for any purpose.
8+
# RTI is under no obligation to maintain or support the Software. RTI shall
9+
# not be liable for any incidental or consequential damages arising out of the
10+
# use or inability to use the software.
11+
#
12+
cmake_minimum_required(VERSION 3.11)
13+
project(rtiexamples-dynamic-data-skip-serialization)
14+
list(APPEND CMAKE_MODULE_PATH
15+
"${CMAKE_CURRENT_SOURCE_DIR}/../../../../resources/cmake/Modules"
16+
)
17+
include(ConnextDdsConfigureCmakeUtils)
18+
connextdds_configure_cmake_utils()
19+
20+
# Find the RTI Connext DDS libraries
21+
if(NOT RTIConnextDDS_FOUND)
22+
find_package(RTIConnextDDS
23+
"7.3.0"
24+
REQUIRED
25+
COMPONENTS
26+
core
27+
)
28+
endif()
29+
30+
set(CMAKE_CXX_STANDARD 11)
31+
set(PLATFORM_MODERN_CXX_STANDARD 11)
32+
33+
add_executable(recorder_cxx2
34+
"${CMAKE_CURRENT_SOURCE_DIR}/util.cxx"
35+
"${CMAKE_CURRENT_SOURCE_DIR}/recorder.cxx"
36+
)
37+
38+
target_link_libraries(recorder_cxx2
39+
PUBLIC
40+
RTIConnextDDS::cpp2_api
41+
)
42+
43+
target_include_directories(recorder_cxx2
44+
PRIVATE
45+
"${CMAKE_CURRENT_BINARY_DIR}/src"
46+
)
47+
48+
set_target_properties(recorder_cxx2
49+
PROPERTIES
50+
OUTPUT_NAME "recorder")
51+
52+
53+
if(CMAKE_SYSTEM_NAME MATCHES "Linux" AND CMAKE_CXX_COMPILER_ID MATCHES "GNU")
54+
set_target_properties(recorder_cxx2
55+
PROPERTIES
56+
LINK_FLAGS -Wl,--no-as-needed)
57+
endif()
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Example Code: Skip Data Serialization for Data Recording
2+
3+
## Building the Example :wrench:
4+
5+
To build this example, first run CMake to generate the corresponding build
6+
files. We recommend you use a separate directory to store all the generated
7+
files (e.g., ./build).
8+
9+
```sh
10+
mkdir build
11+
cd build
12+
cmake ..
13+
```
14+
15+
Once you have run CMake, you will find a number of new files in your build
16+
directory (the list of generated files will depend on the specific CMake
17+
Generator). To build the example, run CMake as follows:
18+
19+
```sh
20+
cmake --build .
21+
```
22+
23+
**Note**: if you are using a multi-configuration generator, such as Visual
24+
Studio solutions, you can specify the configuration mode to build as follows:
25+
26+
```sh
27+
cmake --build . --config Release|Debug
28+
```
29+
30+
Alternatively, you can use directly the generated infrastructure (e.g.,
31+
Makefiles or Visual Studio Solutions) to build the example. If you generated
32+
Makefiles in the configuration process, run make to build the example. Likewise,
33+
if you generated a Visual Studio solution, open the solution and follow the
34+
regular build process.
35+
36+
## Running the Example
37+
38+
From the build directory, in one command prompt run the recorder:
39+
40+
```sh
41+
./recorder --record data.bin
42+
```
43+
44+
In a second command prompt run the publisher:
45+
46+
```sh
47+
./recorder --publish
48+
```
49+
50+
The recorder application will print a message each time a sample is recorded.
51+
52+
Now kill the recorder and run the replay application. A file called `data.bin`
53+
will have been created in the current directory.
54+
55+
Now we will run a subscriber and a reply application.
56+
57+
To subscribe to the data we will simply use **rtiddsspy**:
58+
59+
```sh
60+
<Connext installation>/bin/rtiddsspy -printSample
61+
```
62+
63+
Now run the application that replays the data recorded in `data.bin`:
64+
65+
```sh
66+
./recorder --replay data.bin
67+
```
68+
69+
The subscriber will print the data that is being replayed.
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* (c) Copyright, Real-Time Innovations, 2023. All rights reserved.
3+
* RTI grants Licensee a license to use, modify, compile, and create derivative
4+
* works of the software solely for use with RTI Connext DDS. Licensee may
5+
* redistribute copies of the software provided that all such copies are subject
6+
* to this license. The software is provided "as is", with no warranty of any
7+
* type, including any warranty for fitness for any purpose. RTI is under no
8+
* obligation to maintain or support the software. RTI shall not be liable for
9+
* any incidental or consequential damages arising out of the use or inability
10+
* to use the software.
11+
*/
12+
13+
#include <fstream>
14+
#include <iostream>
15+
#include <string>
16+
#include <rti/rti.hpp>
17+
18+
#include "util.hpp"
19+
20+
dds::core::xtypes::StructType create_type()
21+
{
22+
using namespace dds::core::xtypes;
23+
24+
StructType type(
25+
"RecordExample",
26+
{ Member("id", StringType(128)),
27+
Member("payload",
28+
SequenceType(primitive_type<int32_t>(), 1024)) });
29+
30+
return type;
31+
}
32+
33+
void record(const std::string &file_name, int domain_id)
34+
{
35+
using dds::core::xtypes::DynamicData;
36+
37+
// To disable the deserialization on the DataReader and get direct access
38+
// to the serialized CDR buffer, we need to register the DataReader type
39+
// with the DynamicDataTypeSerializationProperty::skip_deserialization set
40+
// to true, and then create the DataReader with the name used to register
41+
// the type.
42+
dds::domain::DomainParticipant participant(domain_id);
43+
const std::string type_name = "RecordExample";
44+
rti::core::xtypes::DynamicDataTypeSerializationProperty property;
45+
property.skip_deserialization(true);
46+
rti::domain::register_type(participant, type_name, create_type(), property);
47+
48+
dds::topic::Topic<DynamicData> topic(
49+
participant,
50+
"Example Record",
51+
type_name); // specify the registered type name
52+
53+
auto qos = dds::core::QosProvider::Default().datareader_qos(
54+
rti::core::builtin_profiles::qos_lib::generic_strict_reliable());
55+
dds::sub::DataReader<DynamicData> reader(topic, qos);
56+
if (!util::wait_for_writer(reader)) {
57+
return;
58+
}
59+
60+
// File setup
61+
std::ofstream out_file(file_name, std::ios::binary);
62+
if (!out_file) {
63+
std::cerr << "Failed to open file for recording: " << file_name
64+
<< std::endl;
65+
return;
66+
}
67+
68+
auto record_data = [&out_file, &reader]() {
69+
auto samples = reader.take();
70+
for (auto sample : samples) {
71+
if (!sample.info().valid()) {
72+
continue;
73+
}
74+
75+
// Now the only way to access the data is to call get_cdr_buffer,
76+
// any other field accessor will fail.
77+
auto buffer_info = sample.data().get_cdr_buffer();
78+
auto buffer = buffer_info.first;
79+
auto buffer_length = buffer_info.second;
80+
std::cout << "Recording data sample (" << buffer_length << " bytes)"
81+
<< std::endl;
82+
out_file.write(
83+
reinterpret_cast<const char *>(&buffer_length),
84+
sizeof(buffer_length));
85+
out_file.write(
86+
reinterpret_cast<const char *>(buffer),
87+
buffer_length);
88+
}
89+
};
90+
91+
// Set up a ReadCondition to trigger the record_data function when
92+
// data is available
93+
dds::core::cond::WaitSet waitset;
94+
dds::sub::cond::ReadCondition read_condition(
95+
reader,
96+
dds::sub::status::DataState::any(),
97+
record_data);
98+
waitset += read_condition;
99+
100+
while (!application::shutdown_requested) {
101+
waitset.dispatch(dds::core::Duration(1));
102+
}
103+
}
104+
105+
void replay(const std::string &file_name, int domain_id)
106+
{
107+
using dds::core::xtypes::DynamicData;
108+
109+
dds::domain::DomainParticipant participant(domain_id);
110+
111+
// For the replay application we don't need to register the type with any
112+
// particular property because DynamicData DataWriters are always prepared
113+
// to write serialized buffers directly
114+
dds::topic::Topic<DynamicData> topic(
115+
participant,
116+
"Example Record",
117+
create_type());
118+
119+
auto qos = dds::core::QosProvider::Default().datawriter_qos(
120+
rti::core::builtin_profiles::qos_lib::generic_strict_reliable());
121+
dds::pub::DataWriter<DynamicData> writer(topic, qos);
122+
if (!util::wait_for_reader(writer)) {
123+
return;
124+
}
125+
126+
std::ifstream in_file(file_name, std::ios::binary);
127+
if (!in_file) {
128+
std::cerr << "Failed to open file for replay: " << file_name
129+
<< std::endl;
130+
return;
131+
}
132+
133+
uint32_t length;
134+
std::vector<char> buffer;
135+
DynamicData sample(create_type());
136+
while (!in_file.eof()) {
137+
// read length and data
138+
if (!in_file.read(reinterpret_cast<char *>(&length), sizeof(length))) {
139+
break;
140+
}
141+
142+
std::cout << "Replaying data sample (" << length << " bytes)"
143+
<< std::endl;
144+
buffer.resize(length);
145+
if (!in_file.read(buffer.data(), length)) {
146+
break;
147+
}
148+
149+
// By calling the set_cdr_buffer method we override the contents
150+
// of the DynamicData object with the new serialized data. After
151+
// setting a cdr buffer we can't use any field getters or setters.
152+
sample.set_cdr_buffer(buffer.data(), length);
153+
writer.write(sample);
154+
}
155+
156+
in_file.close();
157+
writer.wait_for_acknowledgments(dds::core::Duration(10));
158+
}
159+
160+
int main(int argc, char *argv[])
161+
{
162+
using namespace application;
163+
164+
// Parse arguments and handle control-C
165+
auto arguments = parse_arguments(argc, argv);
166+
if (arguments.parse_result == ParseReturn::exit) {
167+
return EXIT_SUCCESS;
168+
} else if (arguments.parse_result == ParseReturn::failure) {
169+
return EXIT_FAILURE;
170+
}
171+
setup_signal_handlers();
172+
173+
// Sets Connext verbosity to help debugging
174+
rti::config::Logger::instance().verbosity(arguments.verbosity);
175+
176+
try {
177+
if (arguments.application_type == ApplicationType::record) {
178+
record(arguments.file_name, arguments.domain_id);
179+
} else if (arguments.application_type == ApplicationType::replay) {
180+
replay(arguments.file_name, arguments.domain_id);
181+
} else if (arguments.application_type == ApplicationType::publish) {
182+
util::publish_example_data(arguments.domain_id, create_type());
183+
}
184+
185+
} catch (const std::exception &ex) {
186+
std::cerr << "Exception in application: " << ex.what() << std::endl;
187+
return EXIT_FAILURE;
188+
}
189+
190+
// Releases the memory used by the participant factory. Optional at
191+
// application exit
192+
dds::domain::DomainParticipant::finalize_participant_factory();
193+
194+
return EXIT_SUCCESS;
195+
}

0 commit comments

Comments
 (0)