Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/code-coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@ on:
jobs:
call_reusable_workflow:
uses: vortexntnu/vortex-ci/.github/workflows/reusable-code-coverage.yml@main
with:
before_install_target_dependencies: 'scripts/ci_install_dependencies.sh'
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
1 change: 1 addition & 0 deletions .github/workflows/industrial-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ jobs:
uses: vortexntnu/vortex-ci/.github/workflows/reusable-industrial-ci.yml@main
with:
ros_repo: '["main", "testing"]'
before_install_target_dependencies: 'scripts/ci_install_dependencies.sh'
33 changes: 33 additions & 0 deletions scripts/ci_install_dependencies.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/bin/bash

# Script to install dependencies for H264Decoder
# This script installs GStreamer and PyGObject dependencies required for running the tests

set -e # Exit on error

### GStreamer Installation ###
echo "Installing GStreamer and related plugins..."
sudo apt update
sudo apt install -y gstreamer1.0-tools gstreamer1.0-plugins-base \
gstreamer1.0-plugins-good gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly gstreamer1.0-libav python3-gi \
python3-gst-1.0

echo "GStreamer installation completed."

echo "If you experience display-related issues with the GUI, try running:"
echo "export QT_QPA_PLATFORM=xcb"

### PyGObject Installation ###
echo "Installing PyGObject dependencies..."
sudo apt install -y libglib2.0-dev libcairo2-dev libgirepository1.0-dev \
gir1.2-gtk-3.0 python3-dev ninja-build

echo "Ensuring latest Meson version is installed..."
pip install --upgrade meson

echo "Installing PyGObject via pip..."
pip install pycairo --no-cache-dir
pip install pygobject --no-cache-dir

echo "Installation of all dependencies completed successfully."
Binary file added tests/resources/test_video.h264
Binary file not shown.
32 changes: 32 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import threading

import numpy as np
import pytest
from geometry_msgs.msg import Pose, Twist

from vortex_utils.python_utils import (
H264Decoder,
PoseData,
State,
TwistData,
Expand Down Expand Up @@ -211,6 +214,35 @@ def test_state_subtraction_twist():
assert (state1 - state2).twist == TwistData(0.9, 1.8, 2.7, 0, 0, 0)


def test_h264_decoder():
test_file = "tests/resources/test_video.h264"

decoder = H264Decoder()

decoding_thread = threading.Thread(target=decoder.start, daemon=True)
decoding_thread.start()

with open(test_file, "rb") as f:
raw_data = f.read()

chunk_size = 64
for i in range(0, len(raw_data), chunk_size):
chunk = raw_data[i : i + chunk_size]
decoder.push_data(chunk)

decoder.appsrc.emit("end-of-stream")

decoding_thread.join(timeout=5.0)

assert len(decoder.decoded_frames) > 0, (
"No frames were decoded from the H.264 stream."
)

frame = decoder.decoded_frames[0]
assert isinstance(frame, np.ndarray), "Decoded frame is not a numpy array."
assert frame.ndim == 3, f"Expected 3D array (H, W, Channels), got {frame.shape}"


def test_pose_from_ros():
pose_msg = Pose()
pose_msg.position.x = 1.0
Expand Down
2 changes: 2 additions & 0 deletions vortex_utils/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# H264Decoder
Install the dependencies by running the following script: [ci_install_dependencies.sh](/scripts/ci_install_dependencies.sh)
110 changes: 110 additions & 0 deletions vortex_utils/python_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from dataclasses import dataclass

import gi
import numpy as np
from scipy.spatial.transform import Rotation

gi.require_version('Gst', '1.0')
gi.require_version('GstApp', '1.0')
from gi.repository import GLib, Gst


def ssa(angle: float) -> float:
return (angle + np.pi) % (2 * np.pi) - np.pi
Expand Down Expand Up @@ -120,3 +125,108 @@ def __add__(self, other: "State") -> "State":

def __sub__(self, other: "State") -> "State":
return State(pose=self.pose - other.pose, twist=self.twist - other.twist)


class H264Decoder:
"""Decodes H.264 streams using GStreamer."""

_gst_initialized = False

def __init__(self):
"""Initializes the H.264 decoder and sets up the GStreamer pipeline."""
# Ensure GStreamer is initialized only once
if not H264Decoder._gst_initialized:
Gst.init(None)
H264Decoder._gst_initialized = True

pipeline_desc = (
"appsrc name=mysrc is-live=true ! "
"h264parse ! "
"avdec_h264 ! "
"videoconvert ! video/x-raw,format=BGR ! "
"appsink name=appsink"
)

self._pipeline = Gst.parse_launch(pipeline_desc)
self.appsrc = self._pipeline.get_by_name("mysrc")
self._appsink = self._pipeline.get_by_name("appsink")

self._appsink.set_property("emit-signals", True)
self._appsink.set_property("sync", False)
self._appsink.connect("new-sample", self._on_new_sample)

self._bus = self._pipeline.get_bus()
self._bus.add_signal_watch()
self._bus.connect("message", self._on_bus_message)

self._main_loop = None

self.decoded_frames = []
self.max_frames = 3 # Keep only the last 3 frames here

def start(self):
"""Starts the GStreamer pipeline and runs the main event loop."""
self._pipeline.set_state(Gst.State.PLAYING)
self._main_loop = GLib.MainLoop()
try:
self._main_loop.run()
except KeyboardInterrupt:
pass
finally:
self.stop()

def stop(self):
"""Stops the GStreamer pipeline and cleans up resources."""
if self._pipeline:
self._pipeline.set_state(Gst.State.NULL)
if self._main_loop is not None:
self._main_loop.quit()
self._main_loop = None

def push_data(self, data: bytes):
"""Pushes H.264 encoded data into the pipeline for decoding."""
if not self.appsrc:
raise RuntimeError(
"The pipeline's appsrc element was not found or initialized."
)
gst_buffer = Gst.Buffer.new_allocate(None, len(data), None)
gst_buffer.fill(0, data)
self.appsrc.emit("push-buffer", gst_buffer)

def _on_bus_message(self, bus, message):
"""Handles messages from the GStreamer bus."""
msg_type = message.type
if msg_type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"GStreamer ERROR: {err}, debug={debug}")
self.stop()
elif msg_type == Gst.MessageType.EOS:
print("End-Of-Stream reached.")
self.stop()

def _on_new_sample(self, sink):
"""Processes a new decoded video frame from the appsink."""
sample = sink.emit("pull-sample")
if not sample:
return Gst.FlowReturn.ERROR

buf = sample.get_buffer()
caps_format = sample.get_caps().get_structure(0)
width = caps_format.get_value("width")
height = caps_format.get_value("height")

success, map_info = buf.map(Gst.MapFlags.READ)
if not success:
return Gst.FlowReturn.ERROR

frame_data = np.frombuffer(map_info.data, dtype=np.uint8)
channels = len(frame_data) // (width * height) # typically 3 (BGR) or 4 (BGRA)
frame_data_reshaped = frame_data.reshape((height, width, channels))

self.decoded_frames.append(frame_data_reshaped.copy())

if len(self.decoded_frames) > self.max_frames:
self.decoded_frames.pop(0)

buf.unmap(map_info)
return Gst.FlowReturn.OK