Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 62 additions & 12 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import threading
import time

import numpy as np
import pytest
from geometry_msgs.msg import Pose, Twist
from gi.repository import Gst

from vortex_utils.gst_utils import H264Decoder
from vortex_utils.python_utils import (
H264Decoder,
PoseData,
State,
TwistData,
Expand Down Expand Up @@ -214,33 +216,81 @@ def test_state_subtraction_twist():
assert (state1 - state2).twist == TwistData(0.9, 1.8, 2.7, 0, 0, 0)


def test_h264_decoder():
test_file = "tests/resources/test_video.h264"

@pytest.fixture
def decoder():
"""Fixture to create and clean up an H264Decoder instance."""
decoder = H264Decoder()

decoding_thread = threading.Thread(target=decoder.start, daemon=True)
decoding_thread.start()
yield decoder
decoder.stop()
decoding_thread.join()


def test_h264_decoder_initialization(decoder):
"""Test that the H264Decoder initializes correctly."""
assert decoder.appsrc is not None, "Appsrc element is missing."
assert decoder._appsink is not None, "Appsink element is missing."
assert decoder._pipeline is not None, "Pipeline was not initialized."
assert decoder._bus is not None, "GStreamer bus was not initialized."


def test_h264_decoder_decodes_frames(decoder):
"""Test if the decoder correctly processes an H.264 stream."""
test_file = "tests/resources/test_video.h264"

# Read and push H.264 data
with open(test_file, "rb") as f:
raw_data = f.read()

chunk_size = 64
for i in range(0, len(raw_data), chunk_size):
chunk = raw_data[i : i + chunk_size]
decoder.push_data(chunk)
decoder.push_data(raw_data[i : i + chunk_size])

decoder.appsrc.emit("end-of-stream")

decoding_thread.join(timeout=5.0)
# Wait for frames to be decoded
timeout = 5.0
start_time = time.time()
while len(decoder.decoded_frames) == 0 and (time.time() - start_time) < timeout:
time.sleep(0.1)

assert len(decoder.decoded_frames) > 0, "No frames were decoded."

assert len(decoder.decoded_frames) > 0, (
"No frames were decoded from the H.264 stream."
)

def test_h264_decoder_frame_properties(decoder):
"""Test if decoded frames have correct properties."""
test_file = "tests/resources/test_video.h264"

# Read and push H.264 data
with open(test_file, "rb") as f:
raw_data = f.read()

chunk_size = 64
for i in range(0, len(raw_data), chunk_size):
decoder.push_data(raw_data[i : i + chunk_size])

decoder.appsrc.emit("end-of-stream")

# Wait for frames to be decoded
timeout = 5.0
start_time = time.time()
while len(decoder.decoded_frames) == 0 and (time.time() - start_time) < timeout:
time.sleep(0.1)

assert len(decoder.decoded_frames) > 0, "No frames were decoded."

frame = decoder.decoded_frames[0]
assert isinstance(frame, np.ndarray), "Decoded frame is not a numpy array."
assert frame.ndim == 3, f"Expected 3D array (H, W, Channels), got {frame.shape}"
assert frame.ndim == 3, f"Expected 3D array (H, W, C), got shape {frame.shape}."


def test_h264_decoder_stops_cleanly(decoder):
"""Test if the decoder stops without errors."""
decoder.stop()
assert decoder._pipeline.get_state(0)[1] == Gst.State.NULL, (
"Decoder did not shut down properly."
)


def test_pose_from_ros():
Expand Down
2 changes: 1 addition & 1 deletion vortex_utils/README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# H264Decoder
# H264Decoder (gst_utils.py)
Install the dependencies by running the following script: [ci_install_dependencies.sh](/scripts/ci_install_dependencies.sh)
110 changes: 110 additions & 0 deletions vortex_utils/gst_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import gi
import numpy as np

gi.require_version('Gst', '1.0')
gi.require_version('GstApp', '1.0')
from gi.repository import GLib, Gst


class H264Decoder:
"""Decodes H.264 streams using GStreamer."""

_gst_initialized = False

def __init__(self):
"""Initializes the H.264 decoder and sets up the GStreamer pipeline."""
# Ensure GStreamer is initialized only once
if not H264Decoder._gst_initialized:
Gst.init(None)
H264Decoder._gst_initialized = True

pipeline_desc = (
"appsrc name=mysrc is-live=true ! "
"h264parse ! "
"avdec_h264 ! "
"videoconvert ! video/x-raw,format=BGR ! "
"appsink name=appsink"
)

self._pipeline = Gst.parse_launch(pipeline_desc)
self.appsrc = self._pipeline.get_by_name("mysrc")
self._appsink = self._pipeline.get_by_name("appsink")

self._appsink.set_property("emit-signals", True)
self._appsink.set_property("sync", False)
self._appsink.connect("new-sample", self._on_new_sample)

self._bus = self._pipeline.get_bus()
self._bus.add_signal_watch()
self._bus.connect("message", self._on_bus_message)

self._main_loop = None
self.decoded_frames = []
self.max_frames = 3 # Keep only the last 3 frames here

def start(self):
"""Starts the GStreamer pipeline and runs the main event loop."""
self._pipeline.set_state(Gst.State.PLAYING)
self._main_loop = GLib.MainLoop()
try:
self._main_loop.run()
except KeyboardInterrupt:
pass
finally:
self.stop()

def stop(self):
"""Stops the GStreamer pipeline and cleans up resources."""
if self._pipeline:
self._pipeline.set_state(Gst.State.NULL)
if self._main_loop is not None:
self._main_loop.quit()
self._main_loop = None

def push_data(self, data: bytes):
"""Pushes H.264 encoded data into the pipeline for decoding."""
if not self.appsrc:
raise RuntimeError(
"The pipeline's appsrc element was not found or initialized."
)
gst_buffer = Gst.Buffer.new_allocate(None, len(data), None)
gst_buffer.fill(0, data)
self.appsrc.emit("push-buffer", gst_buffer)

def _on_bus_message(self, bus, message):
"""Handles messages from the GStreamer bus."""
msg_type = message.type
if msg_type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"GStreamer ERROR: {err}, debug={debug}")
self.stop()
elif msg_type == Gst.MessageType.EOS:
print("End-Of-Stream reached.")
self.stop()

def _on_new_sample(self, sink):
"""Processes a new decoded video frame from the appsink."""
sample = sink.emit("pull-sample")
if not sample:
return Gst.FlowReturn.ERROR

buf = sample.get_buffer()
caps_format = sample.get_caps().get_structure(0)
width = caps_format.get_value("width")
height = caps_format.get_value("height")

success, map_info = buf.map(Gst.MapFlags.READ)
if not success:
return Gst.FlowReturn.ERROR

frame_data = np.frombuffer(map_info.data, dtype=np.uint8)
channels = len(frame_data) // (width * height) # typically 3 (BGR) or 4 (BGRA)
frame_data_reshaped = frame_data.reshape((height, width, channels))

self.decoded_frames.append(frame_data_reshaped.copy())

if len(self.decoded_frames) > self.max_frames:
self.decoded_frames.pop(0)

buf.unmap(map_info)
return Gst.FlowReturn.OK
110 changes: 0 additions & 110 deletions vortex_utils/python_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
from dataclasses import dataclass

import gi
import numpy as np
from scipy.spatial.transform import Rotation

gi.require_version('Gst', '1.0')
gi.require_version('GstApp', '1.0')
from gi.repository import GLib, Gst


def ssa(angle: float) -> float:
return (angle + np.pi) % (2 * np.pi) - np.pi
Expand Down Expand Up @@ -125,108 +120,3 @@ def __add__(self, other: "State") -> "State":

def __sub__(self, other: "State") -> "State":
return State(pose=self.pose - other.pose, twist=self.twist - other.twist)


class H264Decoder:
"""Decodes H.264 streams using GStreamer."""

_gst_initialized = False

def __init__(self):
"""Initializes the H.264 decoder and sets up the GStreamer pipeline."""
# Ensure GStreamer is initialized only once
if not H264Decoder._gst_initialized:
Gst.init(None)
H264Decoder._gst_initialized = True

pipeline_desc = (
"appsrc name=mysrc is-live=true ! "
"h264parse ! "
"avdec_h264 ! "
"videoconvert ! video/x-raw,format=BGR ! "
"appsink name=appsink"
)

self._pipeline = Gst.parse_launch(pipeline_desc)
self.appsrc = self._pipeline.get_by_name("mysrc")
self._appsink = self._pipeline.get_by_name("appsink")

self._appsink.set_property("emit-signals", True)
self._appsink.set_property("sync", False)
self._appsink.connect("new-sample", self._on_new_sample)

self._bus = self._pipeline.get_bus()
self._bus.add_signal_watch()
self._bus.connect("message", self._on_bus_message)

self._main_loop = None

self.decoded_frames = []
self.max_frames = 3 # Keep only the last 3 frames here

def start(self):
"""Starts the GStreamer pipeline and runs the main event loop."""
self._pipeline.set_state(Gst.State.PLAYING)
self._main_loop = GLib.MainLoop()
try:
self._main_loop.run()
except KeyboardInterrupt:
pass
finally:
self.stop()

def stop(self):
"""Stops the GStreamer pipeline and cleans up resources."""
if self._pipeline:
self._pipeline.set_state(Gst.State.NULL)
if self._main_loop is not None:
self._main_loop.quit()
self._main_loop = None

def push_data(self, data: bytes):
"""Pushes H.264 encoded data into the pipeline for decoding."""
if not self.appsrc:
raise RuntimeError(
"The pipeline's appsrc element was not found or initialized."
)
gst_buffer = Gst.Buffer.new_allocate(None, len(data), None)
gst_buffer.fill(0, data)
self.appsrc.emit("push-buffer", gst_buffer)

def _on_bus_message(self, bus, message):
"""Handles messages from the GStreamer bus."""
msg_type = message.type
if msg_type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"GStreamer ERROR: {err}, debug={debug}")
self.stop()
elif msg_type == Gst.MessageType.EOS:
print("End-Of-Stream reached.")
self.stop()

def _on_new_sample(self, sink):
"""Processes a new decoded video frame from the appsink."""
sample = sink.emit("pull-sample")
if not sample:
return Gst.FlowReturn.ERROR

buf = sample.get_buffer()
caps_format = sample.get_caps().get_structure(0)
width = caps_format.get_value("width")
height = caps_format.get_value("height")

success, map_info = buf.map(Gst.MapFlags.READ)
if not success:
return Gst.FlowReturn.ERROR

frame_data = np.frombuffer(map_info.data, dtype=np.uint8)
channels = len(frame_data) // (width * height) # typically 3 (BGR) or 4 (BGRA)
frame_data_reshaped = frame_data.reshape((height, width, channels))

self.decoded_frames.append(frame_data_reshaped.copy())

if len(self.decoded_frames) > self.max_frames:
self.decoded_frames.pop(0)

buf.unmap(map_info)
return Gst.FlowReturn.OK