diff --git a/.github/workflows/code-coverage.yml b/.github/workflows/code-coverage.yml index 889e97c..f685149 100644 --- a/.github/workflows/code-coverage.yml +++ b/.github/workflows/code-coverage.yml @@ -10,5 +10,7 @@ on: jobs: call_reusable_workflow: uses: vortexntnu/vortex-ci/.github/workflows/reusable-code-coverage.yml@main + with: + before_install_target_dependencies: 'scripts/ci_install_dependencies.sh' secrets: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/.github/workflows/industrial-ci.yml b/.github/workflows/industrial-ci.yml index 4b7d008..c9adce3 100644 --- a/.github/workflows/industrial-ci.yml +++ b/.github/workflows/industrial-ci.yml @@ -11,3 +11,4 @@ jobs: uses: vortexntnu/vortex-ci/.github/workflows/reusable-industrial-ci.yml@main with: ros_repo: '["main", "testing"]' + before_install_target_dependencies: 'scripts/ci_install_dependencies.sh' diff --git a/scripts/ci_install_dependencies.sh b/scripts/ci_install_dependencies.sh new file mode 100755 index 0000000..d18c96f --- /dev/null +++ b/scripts/ci_install_dependencies.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +# Script to install dependencies for H264Decoder +# This script installs GStreamer and PyGObject dependencies required for running the tests + +set -e # Exit on error + +### GStreamer Installation ### +echo "Installing GStreamer and related plugins..." +sudo apt update +sudo apt install -y gstreamer1.0-tools gstreamer1.0-plugins-base \ + gstreamer1.0-plugins-good gstreamer1.0-plugins-bad \ + gstreamer1.0-plugins-ugly gstreamer1.0-libav python3-gi \ + python3-gst-1.0 + +echo "GStreamer installation completed." + +echo "If you experience display-related issues with the GUI, try running:" +echo "export QT_QPA_PLATFORM=xcb" + +### PyGObject Installation ### +echo "Installing PyGObject dependencies..." +sudo apt install -y libglib2.0-dev libcairo2-dev libgirepository1.0-dev \ + gir1.2-gtk-3.0 python3-dev ninja-build + +echo "Ensuring latest Meson version is installed..." +pip install --upgrade meson + +echo "Installing PyGObject via pip..." +pip install pycairo --no-cache-dir +pip install pygobject --no-cache-dir + +echo "Installation of all dependencies completed successfully." diff --git a/tests/resources/test_video.h264 b/tests/resources/test_video.h264 new file mode 100644 index 0000000..a56ce6a Binary files /dev/null and b/tests/resources/test_video.h264 differ diff --git a/tests/test_utils.py b/tests/test_utils.py index 38b3d1b..8137dd6 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,8 +1,11 @@ +import threading + import numpy as np import pytest from geometry_msgs.msg import Pose, Twist from vortex_utils.python_utils import ( + H264Decoder, PoseData, State, TwistData, @@ -211,6 +214,35 @@ def test_state_subtraction_twist(): assert (state1 - state2).twist == TwistData(0.9, 1.8, 2.7, 0, 0, 0) +def test_h264_decoder(): + test_file = "tests/resources/test_video.h264" + + decoder = H264Decoder() + + decoding_thread = threading.Thread(target=decoder.start, daemon=True) + decoding_thread.start() + + with open(test_file, "rb") as f: + raw_data = f.read() + + chunk_size = 64 + for i in range(0, len(raw_data), chunk_size): + chunk = raw_data[i : i + chunk_size] + decoder.push_data(chunk) + + decoder.appsrc.emit("end-of-stream") + + decoding_thread.join(timeout=5.0) + + assert len(decoder.decoded_frames) > 0, ( + "No frames were decoded from the H.264 stream." + ) + + frame = decoder.decoded_frames[0] + assert isinstance(frame, np.ndarray), "Decoded frame is not a numpy array." + assert frame.ndim == 3, f"Expected 3D array (H, W, Channels), got {frame.shape}" + + def test_pose_from_ros(): pose_msg = Pose() pose_msg.position.x = 1.0 diff --git a/vortex_utils/README.md b/vortex_utils/README.md new file mode 100644 index 0000000..4b1cd50 --- /dev/null +++ b/vortex_utils/README.md @@ -0,0 +1,2 @@ +# H264Decoder +Install the dependencies by running the following script: [ci_install_dependencies.sh](/scripts/ci_install_dependencies.sh) diff --git a/vortex_utils/python_utils.py b/vortex_utils/python_utils.py index 7dbcbeb..53f61ff 100644 --- a/vortex_utils/python_utils.py +++ b/vortex_utils/python_utils.py @@ -1,8 +1,13 @@ from dataclasses import dataclass +import gi import numpy as np from scipy.spatial.transform import Rotation +gi.require_version('Gst', '1.0') +gi.require_version('GstApp', '1.0') +from gi.repository import GLib, Gst + def ssa(angle: float) -> float: return (angle + np.pi) % (2 * np.pi) - np.pi @@ -120,3 +125,108 @@ def __add__(self, other: "State") -> "State": def __sub__(self, other: "State") -> "State": return State(pose=self.pose - other.pose, twist=self.twist - other.twist) + + +class H264Decoder: + """Decodes H.264 streams using GStreamer.""" + + _gst_initialized = False + + def __init__(self): + """Initializes the H.264 decoder and sets up the GStreamer pipeline.""" + # Ensure GStreamer is initialized only once + if not H264Decoder._gst_initialized: + Gst.init(None) + H264Decoder._gst_initialized = True + + pipeline_desc = ( + "appsrc name=mysrc is-live=true ! " + "h264parse ! " + "avdec_h264 ! " + "videoconvert ! video/x-raw,format=BGR ! " + "appsink name=appsink" + ) + + self._pipeline = Gst.parse_launch(pipeline_desc) + self.appsrc = self._pipeline.get_by_name("mysrc") + self._appsink = self._pipeline.get_by_name("appsink") + + self._appsink.set_property("emit-signals", True) + self._appsink.set_property("sync", False) + self._appsink.connect("new-sample", self._on_new_sample) + + self._bus = self._pipeline.get_bus() + self._bus.add_signal_watch() + self._bus.connect("message", self._on_bus_message) + + self._main_loop = None + + self.decoded_frames = [] + self.max_frames = 3 # Keep only the last 3 frames here + + def start(self): + """Starts the GStreamer pipeline and runs the main event loop.""" + self._pipeline.set_state(Gst.State.PLAYING) + self._main_loop = GLib.MainLoop() + try: + self._main_loop.run() + except KeyboardInterrupt: + pass + finally: + self.stop() + + def stop(self): + """Stops the GStreamer pipeline and cleans up resources.""" + if self._pipeline: + self._pipeline.set_state(Gst.State.NULL) + if self._main_loop is not None: + self._main_loop.quit() + self._main_loop = None + + def push_data(self, data: bytes): + """Pushes H.264 encoded data into the pipeline for decoding.""" + if not self.appsrc: + raise RuntimeError( + "The pipeline's appsrc element was not found or initialized." + ) + gst_buffer = Gst.Buffer.new_allocate(None, len(data), None) + gst_buffer.fill(0, data) + self.appsrc.emit("push-buffer", gst_buffer) + + def _on_bus_message(self, bus, message): + """Handles messages from the GStreamer bus.""" + msg_type = message.type + if msg_type == Gst.MessageType.ERROR: + err, debug = message.parse_error() + print(f"GStreamer ERROR: {err}, debug={debug}") + self.stop() + elif msg_type == Gst.MessageType.EOS: + print("End-Of-Stream reached.") + self.stop() + + def _on_new_sample(self, sink): + """Processes a new decoded video frame from the appsink.""" + sample = sink.emit("pull-sample") + if not sample: + return Gst.FlowReturn.ERROR + + buf = sample.get_buffer() + caps_format = sample.get_caps().get_structure(0) + width = caps_format.get_value("width") + height = caps_format.get_value("height") + + success, map_info = buf.map(Gst.MapFlags.READ) + if not success: + return Gst.FlowReturn.ERROR + + frame_data = np.frombuffer(map_info.data, dtype=np.uint8) + channels = len(frame_data) // (width * height) # typically 3 (BGR) or 4 (BGRA) + frame_data_reshaped = frame_data.reshape((height, width, channels)) + + self.decoded_frames.append(frame_data_reshaped.copy()) + + if len(self.decoded_frames) > self.max_frames: + self.decoded_frames.pop(0) + + buf.unmap(map_info) + return Gst.FlowReturn.OK