Skip to content

Commit 969f434

Browse files
authored
Python async pub example (#537)
* Initial commit, adding async pub example * Renaming file due to Python async keyword * Adding first version of example * Removing this file since it will be generated by rtiddsgen * Adding programatically setting QoS in publisher/subscriber, not fully working yet * Updating comments * Added QoS options and changed to printf * Added asyncio and QoS options * Consistency in QoS calls, removing unneeded calls and comments * Adding type file to skip code generation step * Removing some comments and unneeded calls. Properly calling the async publish function as well * Adding max wait time for async publishing * Replaced header * Addressed the commented notes * Adding readme
1 parent c6e957c commit 969f434

File tree

6 files changed

+388
-0
lines changed

6 files changed

+388
-0
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Example Code: Asynchronous Publishing
2+
3+
If you haven't used the RTI Connext Python API before, first check the
4+
[Getting Started Guide](https://community.rti.com/static/documentation/connext-dds/7.0.0/doc/manuals/connext_dds_professional/getting_started_guide/index.html).
5+
6+
## Running the Example
7+
8+
In two separate command prompt windows for the publisher and subscriber run the
9+
following commands from the example directory (this is necessary to ensure the
10+
application loads the QoS defined in *USER_QOS_PROFILES.xml*):
11+
12+
```sh
13+
python async_publisher.py
14+
python async_subscriber.py
15+
```
16+
17+
For the full list of arguments:
18+
19+
```sh
20+
python async_publisher.py -h
21+
or
22+
python async_subscriber.py -h
23+
```
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
(c) 2005-2015 Copyright, Real-Time Innovations, Inc. All rights reserved.
4+
RTI grants Licensee a license to use, modify, compile, and create derivative
5+
works of the Software. Licensee has the right to distribute object form only
6+
for use with RTI products. The Software is provided "as is", with no warranty
7+
of any type, including any warranty for fitness for any purpose. RTI is under
8+
no obligation to maintain or support the Software. RTI shall not be liable for
9+
any incidental or consequential damages arising out of the use or inability to
10+
use the software.
11+
-->
12+
13+
<dds xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
14+
xsi:noNamespaceSchemaLocation="http://community.rti.com/schema/6.1.1/rti_dds_qos_profiles.xsd">
15+
16+
<!-- QoS Library containing the QoS profile used in the generated example.
17+
18+
A QoS library is a named set of QoS profiles.
19+
-->
20+
<qos_library name="async_Library">
21+
22+
<!-- QoS profile used to configure reliable communication between the DataWriter
23+
and DataReader created in the example code.
24+
25+
A QoS profile groups a set of related QoS.
26+
-->
27+
<qos_profile name="async_Profile" is_default_qos="true">
28+
<!-- QoS used to configure the data writer created in the example code -->
29+
<datawriter_qos>
30+
31+
<!-- QoS for asynchronous publishing -->
32+
<publish_mode>
33+
<kind>ASYNCHRONOUS_PUBLISH_MODE_QOS</kind>
34+
<flow_controller_name>DDS_FIXED_RATE_FLOW_CONTROLLER_NAME</flow_controller_name>
35+
</publish_mode>
36+
37+
<reliability>
38+
<kind>RELIABLE_RELIABILITY_QOS</kind>
39+
<max_blocking_time>
40+
<sec>60</sec>
41+
</max_blocking_time>
42+
</reliability>
43+
44+
<history>
45+
<kind>KEEP_LAST_HISTORY_QOS</kind>
46+
<depth>12</depth>
47+
</history>
48+
49+
<protocol>
50+
<rtps_reliable_writer>
51+
<min_send_window_size>50</min_send_window_size>
52+
<max_send_window_size>50</max_send_window_size>
53+
</rtps_reliable_writer>
54+
</protocol>
55+
56+
</datawriter_qos>
57+
58+
<!-- QoS used to configure the data reader created in the example code -->
59+
<datareader_qos>
60+
61+
<reliability>
62+
<kind>RELIABLE_RELIABILITY_QOS</kind>
63+
</reliability>
64+
65+
<history>
66+
<kind>KEEP_ALL_HISTORY_QOS</kind>
67+
</history>
68+
69+
</datareader_qos>
70+
71+
<domain_participant_qos>
72+
<!--
73+
The participant name, if it is set, will be displayed in the
74+
RTI Analyzer tool, making it easier for you to tell one
75+
application from another when you're debugging.
76+
-->
77+
<participant_name>
78+
<name>Asynchronous Publisher Python Example</name>
79+
</participant_name>
80+
81+
</domain_participant_qos>
82+
</qos_profile>
83+
84+
</qos_library>
85+
</dds>
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
#
2+
# (c) 2022 Copyright, Real-Time Innovations, Inc. All rights reserved.
3+
#
4+
# RTI grants Licensee a license to use, modify, compile, and create derivative
5+
# works of the Software solely for use with RTI products. The Software is
6+
# provided "as is", with no warranty of any type, including any warranty for
7+
# fitness for any purpose. RTI is under no obligation to maintain or support
8+
# the Software. RTI shall not be liable for any incidental or consequential
9+
# damages arising out of the use or inability to use the software.
10+
#
11+
import argparse
12+
import time
13+
14+
import rti.connextdds as dds
15+
16+
from async_type import async_type
17+
18+
19+
def run_publisher_application(
20+
domain_id: int, sample_count: int, use_xml_qos: bool = False
21+
):
22+
# Start communicating in a domain, usually one participant per application
23+
participant = dds.DomainParticipant(domain_id)
24+
25+
# Create a Topic
26+
topic = dds.Topic(participant, "Example async", async_type)
27+
28+
if use_xml_qos:
29+
# Retrieve the default DataWriter QoS, from USER_QOS_PROFILES.xml
30+
writer_qos = dds.QosProvider.default.datawriter_qos
31+
else:
32+
# Configure the DataWriterQos in code, to the same effect as the XML file.
33+
writer_qos = participant.default_datawriter_qos
34+
writer_qos.reliability = dds.Reliability.reliable(
35+
dds.Duration.from_seconds(60)
36+
)
37+
writer_qos.history.kind = dds.HistoryKind.KEEP_LAST
38+
writer_qos.history.depth = 12
39+
writer_qos.data_writer_protocol.rtps_reliable_writer.min_send_window_size = (
40+
50
41+
)
42+
writer_qos.data_writer_protocol.rtps_reliable_writer.max_send_window_size = (
43+
50
44+
)
45+
46+
writer_qos.publish_mode = dds.PublishMode.asynchronous(
47+
"DDS_FIXED_RATE_FLOW_CONTROLLER_NAME"
48+
)
49+
50+
# Create a DataWriter with the QoS in our profile or configured programatically
51+
writer = dds.DataWriter(participant.implicit_publisher, topic, writer_qos)
52+
53+
# Create data sample for writing
54+
sample = async_type(x=42)
55+
56+
for count in range(sample_count):
57+
try:
58+
# Modify the data to be sent here
59+
sample.x = count
60+
print(f"Writing async_type, count {count}")
61+
writer.write(sample)
62+
time.sleep(1)
63+
except KeyboardInterrupt:
64+
break
65+
# You can wait until all written samples have been actually published
66+
# (note that this doesn't ensure that they have actually been received
67+
# if the sample required retransmission)
68+
writer.wait_for_asynchronous_publishing(dds.Duration(10))
69+
print("preparing to shut down...")
70+
71+
72+
def main():
73+
parser = argparse.ArgumentParser(
74+
description="RTI Connext DDS Example: Using Asynchronous Publishing"
75+
)
76+
parser.add_argument(
77+
"-d",
78+
"--domain",
79+
type=int,
80+
default=0,
81+
help="DDS Domain ID | Range: 0-232 | Default: 0",
82+
)
83+
parser.add_argument(
84+
"-c",
85+
"--count",
86+
type=int,
87+
default=50,
88+
help="Number of samples to send | Default 50",
89+
)
90+
parser.add_argument(
91+
"-v",
92+
"--verbosity",
93+
type=int,
94+
default=1,
95+
help="How much debugging output to show | Range: 0-3 | Default: 1",
96+
)
97+
parser.add_argument(
98+
"-q",
99+
"--qos",
100+
action="store_true",
101+
default=False,
102+
help="Use QoS profile from USER_QOS_PROFILES.xml | Default: False",
103+
)
104+
105+
args = parser.parse_args()
106+
assert 0 <= args.domain < 233
107+
assert args.count >= 0
108+
assert 0 <= args.verbosity < 4
109+
110+
verbosity_levels = {
111+
0: dds.Verbosity.SILENT,
112+
1: dds.Verbosity.EXCEPTION,
113+
2: dds.Verbosity.WARNING,
114+
3: dds.Verbosity.STATUS_ALL,
115+
}
116+
117+
# Sets Connext verbosity to help debugging
118+
verbosity = verbosity_levels.get(args.verbosity, dds.Verbosity.EXCEPTION)
119+
120+
dds.Logger.instance.verbosity = verbosity
121+
122+
try:
123+
run_publisher_application(args.domain, args.count, args.qos)
124+
except Exception as e:
125+
print(f"Exception in run_publisher_application(): {e}")
126+
return
127+
128+
129+
if __name__ == "__main__":
130+
main()
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
#
2+
# (c) 2022 Copyright, Real-Time Innovations, Inc. All rights reserved.
3+
#
4+
# RTI grants Licensee a license to use, modify, compile, and create derivative
5+
# works of the Software solely for use with RTI products. The Software is
6+
# provided "as is", with no warranty of any type, including any warranty for
7+
# fitness for any purpose. RTI is under no obligation to maintain or support
8+
# the Software. RTI shall not be liable for any incidental or consequential
9+
# damages arising out of the use or inability to use the software.
10+
#
11+
import argparse
12+
import rti.asyncio
13+
14+
import rti.connextdds as dds
15+
16+
from async_type import async_type
17+
18+
19+
async def process_data(reader: dds.DataReader):
20+
# Print data as it arrives, suspending the coroutine until data is
21+
# available.
22+
async for data in reader.take_data_async():
23+
print(f"Received: {data}")
24+
25+
26+
def run_subscriber_application(
27+
domain_id: int, sample_count: int, use_xml_qos: bool = False
28+
):
29+
# Start communicating in a domain, usually one participant per application
30+
participant = dds.DomainParticipant(domain_id)
31+
32+
# Create a Topic
33+
topic = dds.Topic(participant, "Example async", async_type)
34+
35+
if use_xml_qos:
36+
# Retrieve the default DataWriter QoS, from USER_QOS_PROFILES.xml
37+
reader_qos = dds.QosProvider.default.datareader_qos
38+
else:
39+
# Configure the DataReaderQos in code, to the same effect as the XML file.
40+
reader_qos = participant.default_datareader_qos
41+
reader_qos.reliability.kind = dds.ReliabilityKind.RELIABLE
42+
reader_qos.history.kind = dds.HistoryKind.KEEP_ALL
43+
44+
# Create a DataReader with the QoS in our profile or configured programatically
45+
reader = dds.DataReader(participant.implicit_subscriber, topic, reader_qos)
46+
47+
try:
48+
print("async subscriber sleeping")
49+
rti.asyncio.run(process_data(reader))
50+
except KeyboardInterrupt:
51+
pass
52+
53+
print("preparing to shut down...")
54+
55+
56+
def main():
57+
parser = argparse.ArgumentParser(
58+
description="RTI Connext DDS Example: Using Asynchronous Publishing"
59+
)
60+
parser.add_argument(
61+
"-d",
62+
"--domain",
63+
type=int,
64+
default=0,
65+
help="DDS Domain ID | Range: 0-232 | Default: 0",
66+
)
67+
parser.add_argument(
68+
"-c",
69+
"--count",
70+
type=int,
71+
default=50,
72+
help="Number of samples to send | Default 50",
73+
)
74+
parser.add_argument(
75+
"-v",
76+
"--verbosity",
77+
type=int,
78+
default=1,
79+
help="How much debugging output to show | Range: 0-3 | Default: 1",
80+
)
81+
parser.add_argument(
82+
"-q",
83+
"--qos",
84+
action="store_true",
85+
default=False,
86+
help="Use QoS profile from USER_QOS_PROFILES.xml | Default: False",
87+
)
88+
89+
args = parser.parse_args()
90+
assert 0 <= args.domain < 233
91+
assert args.count >= 0
92+
assert 0 <= args.verbosity < 4
93+
94+
verbosity_levels = {
95+
0: dds.Verbosity.SILENT,
96+
1: dds.Verbosity.EXCEPTION,
97+
2: dds.Verbosity.WARNING,
98+
3: dds.Verbosity.STATUS_ALL,
99+
}
100+
101+
# Sets Connext verbosity to help debugging
102+
verbosity = verbosity_levels.get(args.verbosity, dds.Verbosity.EXCEPTION)
103+
104+
dds.Logger.instance.verbosity = verbosity
105+
106+
try:
107+
run_subscriber_application(args.domain, args.count, args.qos)
108+
except Exception as e:
109+
print(f"Exception in run_subscriber_application(): {e}")
110+
return
111+
112+
113+
if __name__ == "__main__":
114+
main()
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* (c) 2013-2019 Copyright, Real-Time Innovations, Inc. All rights reserved.
3+
*
4+
* RTI grants Licensee a license to use, modify, compile, and create derivative
5+
* works of the Software. Licensee has the right to distribute object form
6+
* only for use with RTI products. The Software is provided "as is", with no
7+
* warranty of any type, including any warranty for fitness for any purpose.
8+
* RTI is under no obligation to maintain or support the Software. RTI shall
9+
* not be liable for any incidental or consequential damages arising out of the
10+
* use or inability to use the software.
11+
*/
12+
13+
struct async_type {
14+
int32 x;
15+
};
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#
2+
# (c) 2022 Copyright, Real-Time Innovations, Inc. All rights reserved.
3+
#
4+
# RTI grants Licensee a license to use, modify, compile, and create derivative
5+
# works of the Software solely for use with RTI products. The Software is
6+
# provided "as is", with no warranty of any type, including any warranty for
7+
# fitness for any purpose. RTI is under no obligation to maintain or support
8+
# the Software. RTI shall not be liable for any incidental or consequential
9+
# damages arising out of the use or inability to use the software.
10+
#
11+
12+
import sys
13+
import rti.idl as idl
14+
from enum import IntEnum, auto
15+
from typing import Sequence, Optional
16+
from dataclasses import field
17+
18+
19+
@idl.struct
20+
class async_type:
21+
x: idl.int32 = 0

0 commit comments

Comments
 (0)