Skip to content

Commit 2ba17f1

Browse files
Merge pull request #274 from pollen-robotics/180_2-stream-audiovideo-from-rpi
180 2 stream audiovideo from rpi
2 parents 5ab4663 + fcdcf12 commit 2ba17f1

22 files changed

+941
-77
lines changed

.github/workflows/pytest.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@ jobs:
2323
pip install .[dev]
2424
- name: Run tests
2525
run: |
26-
pytest -vv -m 'not audio and not video and not audio_gstreamer and not video_gstreamer and not wireless' --tb=short
26+
pytest -vv -m 'not audio and not video and not audio_gstreamer and not video_gstreamer and not wireless and not wireless_gstreamer' --tb=short
2727
env:
2828
MUJOCO_GL: disable

docs/RPI.md

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# Raspberry Pi Installation
2+
3+
## RPI Lite OS
4+
5+
Follow the [official documentation](https://www.raspberrypi.com/documentation/computers/getting-started.html#installing-the-operating-system) to install the Raspberry Pi OS Lite (64 bits).
6+
7+
It is recommended to setup a wifi password and a ssh connection.
8+
9+
## Gstreamer
10+
11+
```bash
12+
sudo apt-get install libgstreamer-plugins-bad1.0-dev libgstreamer-plugins-base1.0-dev libgstreamer1.0-dev libglib2.0-dev libssl-dev git libgirepository1.0-dev libcairo2-dev libportaudio2 gstreamer1.0-libcamera librpicam-app1 libssl-dev libnice10 gstreamer1.0-plugins-good gstreamer1.0-alsa gstreamer1.0-plugins-bad gstreamer1.0-nice
13+
```
14+
15+
## Install Rust
16+
17+
```bash
18+
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
19+
```
20+
## Webrtc plugin
21+
22+
```bash
23+
git clone https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
24+
25+
cd gst-plugins-rs
26+
27+
git checkout 0.14.1
28+
29+
cargo install cargo-c
30+
31+
sudo mkdir /opt/gst-plugins-rs
32+
33+
sudo chown reachy /opt/gst-plugins-rs
34+
35+
cargo cinstall -p gst-plugin-webrtc --prefix=/opt/gst-plugins-rs --release
36+
37+
echo 'export GST_PLUGIN_PATH=/opt/gst-plugins-rs/lib/aarch64-linux-gnu/' >> ~/.bashrc
38+
```
39+
40+
## Install Daemon
41+
42+
Install with gstreamer extra dependencies
43+
44+
pip install -e .[gstreamer]
45+
46+
## Usage
47+
48+
### Daemon
49+
50+
The webrtc streaming will start automatically with the wireless option:
51+
52+
```bash
53+
reachy-mini-daemon --wireless-version
54+
```
55+
56+
### Client
57+
58+
This should open view of the camera, and play back the sound.
59+
60+
```bash
61+
python examples/debug/gstreamer_client.py --signaling-host <Reachy Mini ip>
62+
```
63+
64+
It is assumed that gstreamer is installed in your machine. For Linux users you may want to follow the above procedure. For MacOS, please install via [brew](https://gstreamer.freedesktop.org/download/#macos). *ToDo* For Windows please make a conda environement.
65+
66+
67+
## Unit tests
68+
69+
### Manually create the webrtcsrc server
70+
71+
```bash
72+
gst-launch-1.0 webrtcsink run-signalling-server=true meta="meta,name=reachymini" name=ws libcamerasrc ! capsfilter caps=video/x-raw,width=1280,height=720,framerate=60/1,format=YUY2,colorimetry=bt709,interlace-mode=progressive ! queue ! v4l2h264enc extra-controls="controls,repeat_sequence_header=1" ! 'video/x-h264,level=(string)4' ! ws. alsasrc device=hw:4 ! queue ! audioconvert ! audioresample ! opusenc ! audio/x-opus, rate=48000, channels=2 ! ws.
73+
```
74+
75+
### Send sound to Reachy Mini
76+
77+
Send an audio RTP stream to the port 5000
78+
79+
```bash
80+
gst-launch-1.0 audiotestsrc ! audioconvert ! audioresample ! opusenc ! audio/x-opus, rate=48000, channels=2 ! rtpopuspay pt=96 ! udpsink host=10.0.1.38 port=5000
81+
```
82+
83+
84+

examples/debug/gstreamer_client.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
"""Simple gstreamer webrtc consumer example."""
2+
3+
import argparse
4+
5+
import gi
6+
from gst_signalling.utils import find_producer_peer_id_by_name
7+
8+
gi.require_version("Gst", "1.0")
9+
from gi.repository import GLib, Gst # noqa: E402
10+
11+
12+
class GstConsumer:
13+
"""Gstreamer webrtc consumer class."""
14+
15+
def __init__(
16+
self,
17+
signalling_host: str,
18+
signalling_port: int,
19+
peer_name: str,
20+
) -> None:
21+
"""Initialize the consumer with signalling server details and peer name."""
22+
Gst.init(None)
23+
24+
self.pipeline = Gst.Pipeline.new("webRTC-consumer")
25+
self.source = Gst.ElementFactory.make("webrtcsrc")
26+
27+
if not self.pipeline:
28+
print("Pipeline could be created.")
29+
exit(-1)
30+
31+
if not self.source:
32+
print(
33+
"webrtcsrc component could not be created. Please make sure that the plugin is installed \
34+
(see https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/tree/main/net/webrtc)"
35+
)
36+
exit(-1)
37+
38+
self.pipeline.add(self.source)
39+
40+
peer_id = find_producer_peer_id_by_name(
41+
signalling_host, signalling_port, peer_name
42+
)
43+
print(f"found peer id: {peer_id}")
44+
45+
self.source.connect("pad-added", self.webrtcsrc_pad_added_cb)
46+
signaller = self.source.get_property("signaller")
47+
signaller.set_property("producer-peer-id", peer_id)
48+
signaller.set_property("uri", f"ws://{signalling_host}:{signalling_port}")
49+
50+
def dump_latency(self) -> None:
51+
"""Dump the current pipeline latency."""
52+
query = Gst.Query.new_latency()
53+
self.pipeline.query(query)
54+
print(f"Pipeline latency {query.parse_latency()}")
55+
56+
def _configure_webrtcbin(self, webrtcsrc: Gst.Element) -> None:
57+
if isinstance(webrtcsrc, Gst.Bin):
58+
webrtcbin_name = "webrtcbin0"
59+
webrtcbin = webrtcsrc.get_by_name(webrtcbin_name)
60+
assert webrtcbin is not None
61+
# jitterbuffer has a default 200 ms buffer.
62+
webrtcbin.set_property("latency", 50)
63+
64+
def webrtcsrc_pad_added_cb(self, webrtcsrc: Gst.Element, pad: Gst.Pad) -> None:
65+
"""Add webrtcsrc elements when a new pad is added."""
66+
self._configure_webrtcbin(webrtcsrc)
67+
if pad.get_name().startswith("video"): # type: ignore[union-attr]
68+
# webrtcsrc automatically decodes and convert the video
69+
sink = Gst.ElementFactory.make("fpsdisplaysink")
70+
assert sink is not None
71+
self.pipeline.add(sink)
72+
pad.link(sink.get_static_pad("sink")) # type: ignore[arg-type]
73+
sink.sync_state_with_parent()
74+
75+
elif pad.get_name().startswith("audio"): # type: ignore[union-attr]
76+
# webrtcsrc automatically decodes and convert the audio
77+
sink = Gst.ElementFactory.make("autoaudiosink")
78+
assert sink is not None
79+
self.pipeline.add(sink)
80+
pad.link(sink.get_static_pad("sink")) # type: ignore[arg-type]
81+
sink.sync_state_with_parent()
82+
83+
GLib.timeout_add_seconds(5, self.dump_latency)
84+
85+
def __del__(self) -> None:
86+
"""Destructor to clean up GStreamer resources."""
87+
Gst.deinit()
88+
89+
def get_bus(self) -> Gst.Bus:
90+
"""Get the GStreamer bus for the pipeline."""
91+
return self.pipeline.get_bus()
92+
93+
def play(self) -> None:
94+
"""Start the GStreamer pipeline."""
95+
ret = self.pipeline.set_state(Gst.State.PLAYING)
96+
if ret == Gst.StateChangeReturn.FAILURE:
97+
print("Error starting playback.")
98+
exit(-1)
99+
print("playing ... (ctrl+c to quit)")
100+
101+
def stop(self) -> None:
102+
"""Stop the GStreamer pipeline."""
103+
print("stopping")
104+
self.pipeline.send_event(Gst.Event.new_eos())
105+
self.pipeline.set_state(Gst.State.NULL)
106+
107+
108+
def process_msg(bus: Gst.Bus, pipeline: Gst.Pipeline) -> bool:
109+
"""Process messages from the GStreamer bus."""
110+
msg = bus.timed_pop_filtered(10 * Gst.MSECOND, Gst.MessageType.ANY)
111+
if msg:
112+
if msg.type == Gst.MessageType.ERROR:
113+
err, debug = msg.parse_error()
114+
print(f"Error: {err}, {debug}")
115+
return False
116+
elif msg.type == Gst.MessageType.EOS:
117+
print("End-Of-Stream reached.")
118+
return False
119+
elif msg.type == Gst.MessageType.LATENCY:
120+
if pipeline:
121+
try:
122+
pipeline.recalculate_latency()
123+
except Exception as e:
124+
print("failed to recalculate warning, exception: %s" % str(e))
125+
# else:
126+
# print(f"Message: {msg.type}")
127+
return True
128+
129+
130+
def main() -> None:
131+
"""Run the main function."""
132+
parser = argparse.ArgumentParser(description="webrtc gstreamer simple consumer")
133+
parser.add_argument(
134+
"--signaling-host",
135+
default="127.0.0.1",
136+
help="Gstreamer signaling host - Reachy Mini ip",
137+
)
138+
parser.add_argument(
139+
"--signaling-port", default=8443, help="Gstreamer signaling port"
140+
)
141+
142+
args = parser.parse_args()
143+
144+
consumer = GstConsumer(
145+
args.signaling_host,
146+
args.signaling_port,
147+
"reachymini",
148+
)
149+
consumer.play()
150+
151+
# Wait until error or EOS
152+
bus = consumer.get_bus()
153+
try:
154+
while True:
155+
if not process_msg(bus, consumer.pipeline):
156+
break
157+
158+
except KeyboardInterrupt:
159+
print("User exit")
160+
finally:
161+
consumer.stop()
162+
163+
164+
if __name__ == "__main__":
165+
main()

examples/debug/sound_record.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def main(backend: str) -> None:
1919
level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s"
2020
)
2121

22-
with ReachyMini(log_level="DEBUG", media_backend=backend) as mini:
22+
with ReachyMini(log_level="INFO", media_backend=backend) as mini:
2323
print(f"Recording for {DURATION} seconds...")
2424
audio_samples = []
2525
t0 = time.time()

pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "reachy_mini"
7-
version = "1.0.0"
7+
version = "1.0.0rc5"
88
authors = [{ name = "Pollen Robotics", email = "[email protected]" }]
99
description = ""
1010
readme = "README.md"
@@ -51,7 +51,7 @@ examples = ["pynput"]
5151
mujoco = ["mujoco==3.3.0"]
5252
nn_kinematics = ["onnxruntime==1.22.1"]
5353
placo_kinematics = ["placo==0.9.14"]
54-
gstreamer = ["PyGObject>=3.42.2,<=3.46.0"]
54+
gstreamer = ["PyGObject>=3.42.2,<=3.46.0", "gst-signalling==1.1.1"]
5555
rerun = [
5656
"rerun-sdk>=0.23.4",
5757
# See https://github.com/rerun-io/rerun-loader-python-example-urdf/issues/12
@@ -96,6 +96,7 @@ markers = [
9696
"video: mark test as requiring video hardware",
9797
"video_gstreamer: mark test as requiring GStreamer for video",
9898
"wireless: mark test as requiring wireless Reachy Mini",
99+
"wireless_gstreamer: mark test as requiring GStreamer for wireless Reachy Mini",
99100
]
100101

101102
[tool.mypy]

src/reachy_mini/daemon/backend/robot/backend.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ def voltage_ok(
537537
if "Input Voltage Error" in err:
538538
if voltage_ok(id):
539539
err.remove("Input Voltage Error")
540-
540+
541541
# To avoid logging empty errors like "Motor 1: []"
542542
if len(err) > 0:
543543
errors[name] = err

src/reachy_mini/daemon/daemon.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
It also provides a command-line interface for easy interaction.
66
"""
77

8+
import asyncio
89
import json
910
import logging
1011
import time
@@ -50,6 +51,14 @@ def __init__(self, log_level: str = "INFO", wireless_version: bool = False) -> N
5051
)
5152
self._thread_event_publish_status = Event()
5253

54+
self._webrtc: Optional[Any] = (
55+
None # type GstWebRTC imported for wireless version only
56+
)
57+
if wireless_version:
58+
from reachy_mini.media.webrtc_daemon import GstWebRTC
59+
60+
self._webrtc = GstWebRTC(log_level)
61+
5362
async def start(
5463
self,
5564
sim: bool = False,
@@ -158,6 +167,12 @@ def backend_wrapped_run() -> None:
158167
self._status.state = DaemonState.STOPPING
159168
return self._status.state
160169

170+
if self._webrtc:
171+
await asyncio.sleep(
172+
0.2
173+
) # Give some time for the backend to release the audio device
174+
self._webrtc.start()
175+
161176
self.logger.info("Daemon started successfully.")
162177
self._status.state = DaemonState.RUNNING
163178
return self._status.state
@@ -191,6 +206,9 @@ async def stop(self, goto_sleep_on_stop: bool = True) -> "DaemonState":
191206
self._thread_event_publish_status.set()
192207
self.server.stop()
193208

209+
if self._webrtc:
210+
self._webrtc.stop()
211+
194212
if goto_sleep_on_stop:
195213
try:
196214
self.logger.info("Putting Reachy Mini to sleep...")

src/reachy_mini/daemon/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def get_ip_address(ifname: str = "wlan0") -> str | None:
9191
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
9292
try:
9393
import fcntl
94+
9495
return socket.inet_ntoa(
9596
fcntl.ioctl(
9697
s.fileno(),

src/reachy_mini/media/audio_base.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import logging
88
import struct
99
from abc import ABC, abstractmethod
10-
from enum import Enum
1110
from typing import List, Optional
1211

1312
import numpy as np
@@ -16,13 +15,6 @@
1615
from libusb_package import get_libusb1_backend
1716

1817

19-
class AudioBackend(Enum):
20-
"""Audio backends."""
21-
22-
SOUNDDEVICE = "sounddevice"
23-
GSTREAMER = "gstreamer"
24-
25-
2618
class AudioBase(ABC):
2719
"""Abstract class for opening and managing audio devices."""
2820

@@ -35,11 +27,10 @@ class AudioBase(ABC):
3527
"DOA_VALUE_RADIANS": (20, 19, 8 + 1, "ro", "radians"),
3628
}
3729

38-
def __init__(self, backend: AudioBackend, log_level: str = "INFO") -> None:
30+
def __init__(self, log_level: str = "INFO") -> None:
3931
"""Initialize the audio device."""
4032
self.logger = logging.getLogger(__name__)
4133
self.logger.setLevel(log_level)
42-
self.backend = backend
4334
self._respeaker = self._init_respeaker_usb()
4435
# name, resid, cmdid, length, type
4536

0 commit comments

Comments
 (0)