|
| 1 | +""" |
| 2 | +An example ROS 2 node written in Python. |
| 3 | +
|
| 4 | +See the real node here: |
| 5 | +https://github.com/Sooner-Rover-Team/auto_ros2/tree/main/src/navigator/navigator_node |
| 6 | +""" |
| 7 | + |
| 8 | +from dataclasses import dataclass |
| 9 | +import time |
| 10 | +import rclpy |
| 11 | +from rclpy.node import Node |
| 12 | +from rclpy.qos import QoSPresetProfiles |
| 13 | +from rclpy.subscription import Subscription |
| 14 | +from sensor_msgs.msg import NavSatFix |
| 15 | +from typing_extensions import override |
| 16 | + |
| 17 | + |
| 18 | +@dataclass(kw_only=True) |
| 19 | +class NavigatorNode(Node): |
| 20 | + """ |
| 21 | + An example ROS 2 node. |
| 22 | +
|
| 23 | + Notice how there's the `@dataclass` annotation thingy up there? That lets |
| 24 | + us say _what_ data will be stored in the class. It cleans things up a lot! |
| 25 | +
|
| 26 | + After this comment ends, you'll see the class "data fields"... |
| 27 | + """ |
| 28 | + |
| 29 | + _gps_subscription: Subscription |
| 30 | + """ |
| 31 | + Here's our first field! |
| 32 | +
|
| 33 | + This `_gps_subscription` variable is accessible throughout the class, in |
| 34 | + all of its methods. |
| 35 | + """ |
| 36 | + |
| 37 | + def __init__(self): |
| 38 | + """ |
| 39 | + "Initializes" the node. |
| 40 | +
|
| 41 | + This function makes the node known to ROS 2 when it's created in |
| 42 | + Python code. It also sets up a bunch of variables, like our |
| 43 | + `_gps_subscription` field. |
| 44 | + """ |
| 45 | + |
| 46 | + # first, we have to initialize the `Node` class that this one extends. |
| 47 | + # |
| 48 | + # inside that `__init__` function, we're giving our node a name. ROS 2 |
| 49 | + # will refer to it as `navigator_node` from now on, unless you rename |
| 50 | + # it in a launch file. |
| 51 | + # |
| 52 | + # IMPORTANT: you are REQUIRED to call this function to correctly create |
| 53 | + # a ROS 2 node. |
| 54 | + super().__init__("navigator_node") |
| 55 | + |
| 56 | + # next, we can initialize the `_gps_subscription` variable. |
| 57 | + # |
| 58 | + # you can see that we have another method at the bottom of this class |
| 59 | + # called `_gps_callback`. that's a function we want to run if another |
| 60 | + # node sends us a `NavSatFix` (GPS message). |
| 61 | + # |
| 62 | + # so, let's set that up accordingly: |
| 63 | + self._gps_subscription = self.create_subscription( |
| 64 | + msg_type=NavSatFix, |
| 65 | + topic="/sensors/gps", |
| 66 | + callback=self._gps_callback, |
| 67 | + qos_profile=QoSPresetProfiles.SENSOR_DATA.value, |
| 68 | + ) |
| 69 | + |
| 70 | + # great! we've now set up a ROS 2 node correctly! |
| 71 | + # |
| 72 | + # the function now returns... |
| 73 | + |
| 74 | + # required boilerplate -- please ignore! |
| 75 | + @override |
| 76 | + def __hash__(self) -> int: |
| 77 | + return super().__hash__() |
| 78 | + |
| 79 | + def _gps_callback(self, msg: NavSatFix): |
| 80 | + """This function runs when we receive a GPS message!""" |
| 81 | + |
| 82 | + # extract data from message |
| 83 | + lat: float = msg.latitude |
| 84 | + lon: float = msg.longitude |
| 85 | + alt: float = msg.altitude |
| 86 | + |
| 87 | + # print out the data |
| 88 | + print(f"Got a GPS message! lat: {lat}, lon: {lon}, alt: {alt}") |
| 89 | + |
| 90 | + |
| 91 | +def main(args: list[str] | None = None): |
| 92 | + """ |
| 93 | + Starts the Navigator node. |
| 94 | + """ |
| 95 | + rclpy.init(args=args) |
| 96 | + navigator_node: NavigatorNode = NavigatorNode() |
| 97 | + |
| 98 | + # tell the user that the node's running, and let them know how to stop it! |
| 99 | + # |
| 100 | + # note that the weird \033 escapes down there show color in the terminal, |
| 101 | + # which makes it a little easier to read, in my view. ~bray |
| 102 | + print( |
| 103 | + "The Navigator node is now running! You can press Ctrl + C at any time to stop it." |
| 104 | + ) |
| 105 | + print() |
| 106 | + print("Please paste the following code into another ROS 2 terminal:") |
| 107 | + print( |
| 108 | + '\033[0;34mros2 \033[0;35mtopic pub \033[0;32m/sensors/gps sensor_msgs/NavSatFix \033[0;31m"{ latitude: 1.0, longitude: 2.0, altitude: 3.0 }"\033[0m' |
| 109 | + ) |
| 110 | + print() |
| 111 | + |
| 112 | + # spin the node until ROS 2 stops it (or you spam Ctrl^C) |
| 113 | + while rclpy.ok(): |
| 114 | + rclpy.spin_once(navigator_node, timeout_sec=0) |
| 115 | + time.sleep(1e-4) |
| 116 | + |
| 117 | + # destroy the Node explicitly |
| 118 | + # |
| 119 | + # this is optional - otherwise, the garbage collector does it automatically |
| 120 | + # when it runs. |
| 121 | + navigator_node.destroy_node() |
| 122 | + rclpy.shutdown() |
0 commit comments