Skip to content

Commit f4a6d01

Browse files
authored
Refactor/mqtt endpoint (#416)
* update proto Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add new proto file Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add topics Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add motor control topic Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add disconnection timeout Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add subscribe Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * send UDP packet Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add heartbeat for motor Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add disconnection Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * stop motor when the motor control topic was stopped Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * start refactoring Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * use scheduler in heartbeat Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * modify send_heartbeat code Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * split code Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * refactor code Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add on_message Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add debug line Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * run scheduler at the beginig of loop Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add stop_all_motors function Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * use stop flag Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * stop motor when disconnected Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * modify timeout Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add ground station heartbeat Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * remove old script Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * rename test script to ground station Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add TODO description Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add check last message function Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add payload as argument Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add apk install step Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> * add python3-dev to apk step Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com> --------- Signed-off-by: Masaya Kataoka <ms.kataoka@gmail.com>
1 parent 5cc3da8 commit f4a6d01

File tree

13 files changed

+429
-85
lines changed

13 files changed

+429
-85
lines changed

.github/workflows/docker/mqtt_endpoint/Dockerfile

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
FROM ghcr.io/astral-sh/uv:python3.10-alpine
22

3+
RUN apk update && apk add --no-cache \
4+
build-base \
5+
pkgconfig \
6+
sdl2-dev \
7+
freetype-dev \
8+
python3-dev
9+
310
WORKDIR /app
411
COPY mqtt_endpoint .
512
RUN uv sync
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
protoc -I=$PWD/proto --python_out=. $PWD/proto/hardware_communication_msgs__HeartBeat.proto
2+
protoc -I=$PWD/proto --python_out=. $PWD/proto/hardware_communication_msgs__MotorControl.proto
3+
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Todo Get Joystick input by pygame
2+
import time
3+
import socket
4+
import paho.mqtt.client as mqtt
5+
from mqtt_endpoint.hardware_communication_msgs__MotorControl_pb2 import (
6+
hardware_communication_msgs__MotorControl,
7+
)
8+
9+
10+
def on_connect(client, userdata, flags, rc):
11+
if rc == 0:
12+
print("Connected to MQTT broker")
13+
else:
14+
print(f"Connection failed with code {rc}")
15+
16+
17+
def main():
18+
left_motor_control_topic = "miniv/left_motor"
19+
left_motor_command = hardware_communication_msgs__MotorControl()
20+
left_motor_command.motor_enable = True
21+
# TODO fill left motor speed from joystick input
22+
left_motor_command.motor_speed = 0.3
23+
right_motor_control_topic = "miniv/right_motor"
24+
right_motor_command = hardware_communication_msgs__MotorControl()
25+
right_motor_command.motor_enable = True
26+
# TODO fill right motor speed from joystick input
27+
right_motor_command.motor_speed = 0.3
28+
29+
groundstation_heartbeat_topic = "ground_station/heartbeat"
30+
31+
broker = "54.212.20.15"
32+
port = 1883
33+
client = mqtt.Client()
34+
35+
keep_alive_timeout = 1
36+
37+
try:
38+
client.connect(broker, port, keep_alive_timeout)
39+
client.on_connect = on_connect
40+
client.loop_start()
41+
time.sleep(1)
42+
while True:
43+
if not client.is_connected():
44+
break
45+
client.publish(
46+
left_motor_control_topic, left_motor_command.SerializeToString()
47+
)
48+
client.publish(
49+
right_motor_control_topic, right_motor_command.SerializeToString()
50+
)
51+
# TODO Fill heartbeat topic from joystick button input.
52+
# if the user press Emergency stop button, fill "EMERGENCY" instead of "AUTO"
53+
client.publish(groundstation_heartbeat_topic, "AUTO")
54+
time.sleep(0.05)
55+
except KeyboardInterrupt:
56+
print("Exiting...")
57+
except Exception as e:
58+
print(e)
59+
finally:
60+
left_motor_command.motor_speed = 0
61+
client.publish(left_motor_control_topic, left_motor_command.SerializeToString())
62+
right_motor_command.motor_speed = 0
63+
client.publish(
64+
right_motor_control_topic, right_motor_command.SerializeToString()
65+
)
66+
client.loop_stop()
67+
client.disconnect()
68+
69+
70+
if __name__ == "__main__":
71+
main()
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import datetime
2+
import sched
3+
4+
5+
class GroundStationHeartBeat:
6+
def __init__(self, topic: str, timeout: float, scheduler: sched.scheduler):
7+
self.last_timestamp = None
8+
self.topic = topic
9+
self.timeout = timeout
10+
self.is_timeout = False
11+
self.is_emergency = False
12+
self.scheduler = scheduler
13+
self.last_message = ""
14+
self.autonomous_message = "AUTO"
15+
self.emergency_message = "EMERGENCY"
16+
self.check_timeout(self.scheduler)
17+
self.check_last_message(self.scheduler)
18+
19+
def check_last_message(self, scheduler):
20+
if self.last_message == self.emergency_message:
21+
self.is_emergency = True
22+
if self.last_message == self.autonomous_message:
23+
self.is_emergency = False
24+
scheduler.enter(0.1, 2, self.check_timeout, (scheduler,))
25+
26+
def receive(self, message):
27+
self.last_message = message
28+
self.last_timestamp = datetime.datetime.now()
29+
30+
def check_timeout(self, scheduler):
31+
if self.last_timestamp:
32+
now = datetime.datetime.now()
33+
if (now - self.last_timestamp).total_seconds() >= self.timeout:
34+
print("Heartbeat from ground station timed out!!")
35+
self.is_timeout = True
36+
scheduler.enter(0.1, 2, self.check_timeout, (scheduler,))

.github/workflows/docker/mqtt_endpoint/mqtt_endpoint/mqtt_endpoint/hardware_communication_msgs__MotorControl_pb2.py

Lines changed: 101 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.github/workflows/docker/mqtt_endpoint/mqtt_endpoint/mqtt_endpoint/main.py

Lines changed: 0 additions & 68 deletions
This file was deleted.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import time
2+
import socket
3+
import sched
4+
from mqtt_endpoint.hardware_communication_msgs__HeartBeat_pb2 import (
5+
hardware_communication_msgs__HeartBeat,
6+
)
7+
from mqtt_endpoint.hardware_communication_msgs__MotorControl_pb2 import (
8+
hardware_communication_msgs__MotorControl,
9+
)
10+
11+
12+
class MotorCommand:
13+
def __init__(
14+
self,
15+
udp_socket,
16+
ip_address: str,
17+
command_port: int,
18+
heartbeat_port: int,
19+
command_topic: str,
20+
scheduler: sched.scheduler,
21+
):
22+
self.command: hardware_communication_msgs__MotorControl = (
23+
hardware_communication_msgs__MotorControl()
24+
)
25+
self.stop = False
26+
self.command.motor_enable = True
27+
self.udp_socket = udp_socket
28+
self.ip_address = ip_address
29+
self.command_port = command_port
30+
self.heartbeat_port = heartbeat_port
31+
self.command_topic = command_topic
32+
self.scheduler = scheduler
33+
self.send_heartbeat(self.scheduler)
34+
35+
def send_command(self):
36+
self.udp_socket.sendto(
37+
hardware_communication_msgs__MotorControl.SerializeToString(self.command),
38+
(self.ip_address, self.command_port),
39+
)
40+
41+
def send_heartbeat(self, scheduler):
42+
if not self.stop:
43+
self.udp_socket.sendto(
44+
hardware_communication_msgs__HeartBeat.SerializeToString(self.command),
45+
(self.ip_address, self.heartbeat_port),
46+
)
47+
else:
48+
print("Try stopping motor")
49+
self.command.motor_speed = 0
50+
self.send()
51+
scheduler.enter(0.1, 1, self.send_heartbeat, (scheduler,))
52+
53+
def send_command_from_serialized_string(self, serialized_string: str):
54+
if not self.stop:
55+
self.command.ParseFromString(serialized_string)
56+
self.send_command()
57+
else:
58+
self.command.motor_speed = 0
59+
self.send_command()

0 commit comments

Comments
 (0)