|
| 1 | +#!/usr/bin/env python3' |
| 2 | +# coding = utf-8 |
| 3 | +######################################################################################## |
| 4 | +## |
| 5 | +## Maintainer: stefan.kull@gmail.com |
| 6 | +## |
| 7 | +## Input: INA219 Current/Voltage-sensor (a.k.a "Battery State") |
| 8 | +## Output: ROS2 node that publish a BatteryState.msg topic |
| 9 | +## |
| 10 | +## Prerequisite: |
| 11 | +## Software |
| 12 | +## $ |
| 13 | +## $ sudo pip3 install adafruit-circuitpython-ina219 |
| 14 | +## |
| 15 | +## Hardware: Power circuit via INA219 break aout board (Power at VIn+, Drain/Source at VIn- ) |
| 16 | +## Host: Raspberry Pi 4(Ubuntu) via I2C |
| 17 | +## |
| 18 | +## Launch sequence: |
| 19 | +## 1) $ ros2 run pet_ros2_currentsensor_ina219_pkg pet_current_sensor_ina219_node.py |
| 20 | +## 2) $ ros2 topic echo /xyz |
| 21 | +## $ ros2 topic echo /zyx |
| 22 | +## |
| 23 | + |
| 24 | +# Import the ROS2-stuff |
| 25 | +import rclpy |
| 26 | +from rclpy.node import Node |
| 27 | +from sensor_msgs.msg import BatteryState |
| 28 | + |
| 29 | +# Import the Ubuntu/Linux-hardware stuff |
| 30 | +import time |
| 31 | +import board |
| 32 | +from adafruit_ina219 import ADCResolution, BusVoltageRange, INA219 |
| 33 | + |
| 34 | +# Import the common Ubuntu/Linux stuff |
| 35 | +import sys |
| 36 | +from time import sleep |
| 37 | +from math import modf |
| 38 | + |
| 39 | +#Set Button pin(GPIO no.) and ROS2-topic-name |
| 40 | +BATTERY_STATE_TOPIC = 'battery_state' |
| 41 | + |
| 42 | + |
| 43 | +class BatteryStatePublisher(Node): |
| 44 | + ''' |
| 45 | + ROS2 current & voltage sensor publisher node |
| 46 | + Create a BatteryStatePublisher class, which is a subclass of the Node class. |
| 47 | + The class publishes the battery state of an object at a specific time interval. |
| 48 | + ''' |
| 49 | + def __init__(self): |
| 50 | + # Initiate the Node class's constructor and give it a name |
| 51 | + super().__init__("pet_current_sensor_node") |
| 52 | + |
| 53 | + i2c_bus = board.I2C() |
| 54 | + self.ina219 = INA219(i2c_bus) |
| 55 | + |
| 56 | + # optional : change configuration to use 32 samples averaging for both bus voltage and shunt voltage |
| 57 | + self.ina219.bus_adc_resolution = ADCResolution.ADCRES_12BIT_32S |
| 58 | + self.ina219.shunt_adc_resolution = ADCResolution.ADCRES_12BIT_32S |
| 59 | + |
| 60 | + # optional : change voltage range to 16V |
| 61 | + self.ina219.bus_voltage_range = BusVoltageRange.RANGE_16V |
| 62 | + |
| 63 | + # display some of the advanced field (just to test) |
| 64 | + self.get_logger().info("INA219 Current/Voltage sensor. Config register:") |
| 65 | + self.get_logger().info(" - bus_voltage_range: 0x%1X" % self.ina219.bus_voltage_range) |
| 66 | + self.get_logger().info(" - gain: 0x%1X" % self.ina219.gain) |
| 67 | + self.get_logger().info(" - bus_adc_resolution: 0x%1X" % self.ina219.bus_adc_resolution) |
| 68 | + self.get_logger().info(" - shunt_adc_resolution: 0x%1X" % self.ina219.shunt_adc_resolution) |
| 69 | + self.get_logger().info(" - mode: 0x%1X" % self.ina219.mode) |
| 70 | + self.get_logger().info("") |
| 71 | + |
| 72 | + # Create Message <https://github.com/ros2/common_interfaces/blob/master/sensor_msgs/msg/BatteryState.msg> |
| 73 | + current_time = modf(time.time()) |
| 74 | + self.msg_battery = BatteryState() |
| 75 | + self.msg_battery.header.stamp.sec = int(current_time[1]) |
| 76 | + self.msg_battery.header.stamp.nanosec = int(current_time[0] * 1000000000) & 0xffffffff |
| 77 | + self.msg_battery.header.frame_id = "18650 3S x1P main battery" |
| 78 | + |
| 79 | + self.msg_battery.voltage = float('NaN') # Voltage in Volts (Mandatory) |
| 80 | + self.msg_battery.current = float('NaN') # Negative when discharging (A) (If unmeasured NaN) |
| 81 | + self.msg_battery.temperature = float('NaN') # Temperature in Degrees Celsius (If unmeasured NaN) |
| 82 | + self.msg_battery.charge = float('NaN') # Current charge in Ah (If unmeasured NaN) |
| 83 | + self.msg_battery.capacity = float('NaN') # Capacity in Ah (last full capacity) (If unmeasured NaN) |
| 84 | + self.msg_battery.design_capacity = float('NaN') # Capacity in Ah (design capacity) (If unmeasured NaN) |
| 85 | + self.msg_battery.percentage = float('NaN') # Charge percentage on 0 to 1 range (If unmeasured NaN |
| 86 | + |
| 87 | + self.msg_battery.power_supply_status = 0 # The charging status as reported. [uint8 POWER_SUPPLY_STATUS_UNKNOWN = 0] |
| 88 | + self.msg_battery.power_supply_health = 0 # The battery health metric. [uint8 POWER_SUPPLY_HEALTH_UNKNOWN = 0] |
| 89 | + self.msg_battery.power_supply_technology = 2 # The battery chemistry. [uint8 POWER_SUPPLY_TECHNOLOGY_LION = 2] |
| 90 | + self.msg_battery.present = True # True if the battery is present |
| 91 | + |
| 92 | + self.msg_battery.location = 'Think "Inside the box"' # The location into which the battery is inserted. (slot number or plug) |
| 93 | + self.msg_battery.serial_number = '0000000' # The best approximation of the battery serial number |
| 94 | + |
| 95 | + # Create publisher(s) |
| 96 | + self.publisher_battery_state = self.create_publisher(BatteryState, '/battery_status', 10) |
| 97 | + |
| 98 | + # Setup time interval in seconds... for the callback |
| 99 | + timer_period = 5.0 |
| 100 | + self.timer = self.create_timer(timer_period, self.get_battery_state_callback) |
| 101 | + |
| 102 | + # print("----------------------------------------") |
| 103 | + # print(self.msg_battery) |
| 104 | + # print("----------------------------------------") |
| 105 | + |
| 106 | + |
| 107 | + |
| 108 | + def get_battery_state_callback(self): |
| 109 | + """ |
| 110 | + Callback function. |
| 111 | + This function gets called at the specific time interval. |
| 112 | + """ |
| 113 | + # Update the message header |
| 114 | + current_time = modf(time.time()) |
| 115 | + self.msg_battery.header.stamp.sec = int(current_time[1]) |
| 116 | + self.msg_battery.header.stamp.nanosec = int(current_time[0] * 1000000000) & 0xffffffff |
| 117 | + |
| 118 | + self.msg_battery.voltage = self.ina219.bus_voltage # voltage on V- (load side) |
| 119 | + self.msg_battery.current = self.ina219.current /1000.0 # current in mA->A |
| 120 | + # print(self.msg_battery.voltage) |
| 121 | + |
| 122 | + # Publish BatteryState message |
| 123 | + self.publisher_battery_state.publish(self.msg_battery) |
| 124 | + |
| 125 | + |
| 126 | +def main(args=None): |
| 127 | + rclpy.init(args=args) |
| 128 | + |
| 129 | + # Create the node |
| 130 | + battery_state_pub = BatteryStatePublisher() |
| 131 | + |
| 132 | + try: |
| 133 | + # Spin the node so the callback function is called. |
| 134 | + # Publish any pending messages to the topics. |
| 135 | + rclpy.spin(battery_state_pub) |
| 136 | + |
| 137 | + except KeyboardInterrupt: |
| 138 | + print("**** * 💀 Ctrl-C detected...") |
| 139 | + |
| 140 | + finally: |
| 141 | + print("**** 🪦 battery_state_pub ending... ") |
| 142 | + print( str(sys.exc_info()[1]) ) # Need ´import sys´ |
| 143 | + |
| 144 | + # Time to clean up stuff... - Destroy the node explicitly |
| 145 | + # (optional - otherwise it will be done automatically |
| 146 | + # when the garbage collector destroys the node object) |
| 147 | + battery_state_pub.destroy_node() |
| 148 | + |
| 149 | + # Time to clean up stuff... Shutdown the ROS client library for Python |
| 150 | + rclpy.shutdown() |
| 151 | + |
| 152 | +if __name__ == "__main__": |
| 153 | + main() |
0 commit comments