-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathros2_event_callback.py
More file actions
161 lines (140 loc) · 4.71 KB
/
ros2_event_callback.py
File metadata and controls
161 lines (140 loc) · 4.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""
Example showing how to use asyncio_for_robotics subscriptions with ROS 2
subscription event callbacks (e.g. matched publishers, lost messages).
afor's generic `BaseSub` is used to accumulate a datastream not natively
supported by `afor`. The external ROS object calls `BaseSub.input_data` which
is threadsafe
Run:
python3 -m asyncio_for_robotics.example.ros2_event_callback
In another terminal:
ros2 topic pub /example std_msgs/msg/String "data: hey"
Expected output:
Match Event:
New publisher
{'current_count': 1,
'current_count_change': 1,
'total_count': 1,
'total_count_change': 1}
I heard: hey
...
I heard: hey
Match Event:
Undeclared publisher
{'current_count': 0,
'current_count_change': -1,
'total_count': 1,
'total_count_change': 0}
"""
import asyncio
import inspect
from contextlib import suppress
from pprint import pprint
from typing import Optional
import rclpy
from colorama import Fore
from rclpy.event_handler import (
IncompatibleTypeInfo,
QoSLivelinessChangedInfo,
QoSMessageLostInfo,
QoSRequestedDeadlineMissedInfo,
QoSRequestedIncompatibleQoSInfo,
QoSSubscriptionMatchedInfo,
SubscriptionEventCallbacks,
)
from rclpy.qos import QoSProfile
from rclpy.subscription import Subscription
from std_msgs.msg import String
from asyncio_for_robotics.core.sub import BaseSub, _MsgType
from asyncio_for_robotics.ros2 import (
QOS_DEFAULT,
BaseSession,
Sub,
TopicInfo,
auto_session,
)
TOPIC = TopicInfo(
"example",
String,
)
class AdvancedSub(Sub[_MsgType]):
def __init__(
self,
msg_type: type[_MsgType],
topic: str,
qos_profile: QoSProfile = QOS_DEFAULT,
session: Optional[BaseSession] = None,
event_callbacks: Optional[SubscriptionEventCallbacks] = None,
) -> None:
"""
Extension of `Sub` that allows passing ROS 2
`SubscriptionEventCallbacks` when creating the underlying rclpy
subscription.
"""
self.event_callbacks = event_callbacks
super().__init__(msg_type, topic, qos_profile, session)
def _resolve_sub(self, topic_info: TopicInfo) -> Subscription:
"""
Create the underlying rclpy Subscription with event callbacks attached.
"""
with self.session.lock() as node:
return node.create_subscription(
**topic_info.as_kwarg(),
callback=self.callback_for_sub,
event_callbacks=self.event_callbacks,
)
async def verbose_match_event(match_sub: BaseSub[QoSSubscriptionMatchedInfo]):
"""Listens to match/unmatch events and print their full contents."""
async for event in match_sub.listen_reliable():
event: QoSSubscriptionMatchedInfo
print("")
print(f"{Fore.RED}Match Event triggered:{Fore.RESET}")
if event.current_count_change > 0:
print(f"{Fore.GREEN}Match publisher{Fore.RESET}")
if event.current_count_change < 0:
print(f"{Fore.YELLOW}Unmatch publisher{Fore.RESET}")
pprint(
{
name: value
for name, value in inspect.getmembers(event)
if not name.startswith("_")
}
)
print("")
async def what_do_you_hear(sub: Sub[String]):
"""Simply print the payloads of a topic."""
async for msg in sub.listen_reliable():
print(f"I heard: {msg.data}")
async def event_example():
"""
- Creates `BaseSub` instances to receive ROS events in afor.
- Registers them as ROS 2 subscription event callbacks.
- Runs a simple String subscription.
"""
# afor's generic `BaseSub` is used to accumulate a generic datastream. The
# event callback calls `BaseSub.input_data` which is threadsafe
lost_sub: BaseSub[QoSMessageLostInfo] = BaseSub()
match_sub: BaseSub[QoSSubscriptionMatchedInfo] = BaseSub()
event_cbk = SubscriptionEventCallbacks(
message_lost=lost_sub.input_data, # type: ignore
matched=match_sub.input_data, # type: ignore
# more event types can be added
)
sub = AdvancedSub(
**TOPIC.as_kwarg(),
event_callbacks=event_cbk,
)
hear_task = asyncio.create_task(what_do_you_hear(sub))
match_task = asyncio.create_task(verbose_match_event(match_sub))
print(
f"""Start/stop publishing using
`ros2 topic pub /{sub.topic_info.topic} std_msgs/msg/String "data: hey"`"""
)
await asyncio.wait([hear_task, match_task])
if __name__ == "__main__":
rclpy.init()
try:
with suppress(KeyboardInterrupt, asyncio.CancelledError):
asyncio.run(event_example())
finally:
auto_session().close()
rclpy.shutdown()