Skip to content

Commit b6c762b

Browse files
committed
fix(vortex_utils): move H264Decoder into a separate file
1 parent 82df717 commit b6c762b

File tree

4 files changed

+173
-123
lines changed

4 files changed

+173
-123
lines changed

tests/test_utils.py

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import threading
2+
import time
23

34
import numpy as np
45
import pytest
56
from geometry_msgs.msg import Pose, Twist
7+
from gi.repository import Gst
68

9+
from vortex_utils.gst_utils import H264Decoder
710
from vortex_utils.python_utils import (
8-
H264Decoder,
911
PoseData,
1012
State,
1113
TwistData,
@@ -214,33 +216,81 @@ def test_state_subtraction_twist():
214216
assert (state1 - state2).twist == TwistData(0.9, 1.8, 2.7, 0, 0, 0)
215217

216218

217-
def test_h264_decoder():
218-
test_file = "tests/resources/test_video.h264"
219-
219+
@pytest.fixture
220+
def decoder():
221+
"""Fixture to create and clean up an H264Decoder instance."""
220222
decoder = H264Decoder()
221-
222223
decoding_thread = threading.Thread(target=decoder.start, daemon=True)
223224
decoding_thread.start()
225+
yield decoder
226+
decoder.stop()
227+
decoding_thread.join()
228+
224229

230+
def test_h264_decoder_initialization(decoder):
231+
"""Test that the H264Decoder initializes correctly."""
232+
assert decoder.appsrc is not None, "Appsrc element is missing."
233+
assert decoder._appsink is not None, "Appsink element is missing."
234+
assert decoder._pipeline is not None, "Pipeline was not initialized."
235+
assert decoder._bus is not None, "GStreamer bus was not initialized."
236+
237+
238+
def test_h264_decoder_decodes_frames(decoder):
239+
"""Test if the decoder correctly processes an H.264 stream."""
240+
test_file = "tests/resources/test_video.h264"
241+
242+
# Read and push H.264 data
225243
with open(test_file, "rb") as f:
226244
raw_data = f.read()
227245

228246
chunk_size = 64
229247
for i in range(0, len(raw_data), chunk_size):
230-
chunk = raw_data[i : i + chunk_size]
231-
decoder.push_data(chunk)
248+
decoder.push_data(raw_data[i : i + chunk_size])
232249

233250
decoder.appsrc.emit("end-of-stream")
234251

235-
decoding_thread.join(timeout=5.0)
252+
# Wait for frames to be decoded
253+
timeout = 5.0
254+
start_time = time.time()
255+
while len(decoder.decoded_frames) == 0 and (time.time() - start_time) < timeout:
256+
time.sleep(0.1)
257+
258+
assert len(decoder.decoded_frames) > 0, "No frames were decoded."
236259

237-
assert len(decoder.decoded_frames) > 0, (
238-
"No frames were decoded from the H.264 stream."
239-
)
260+
261+
def test_h264_decoder_frame_properties(decoder):
262+
"""Test if decoded frames have correct properties."""
263+
test_file = "tests/resources/test_video.h264"
264+
265+
# Read and push H.264 data
266+
with open(test_file, "rb") as f:
267+
raw_data = f.read()
268+
269+
chunk_size = 64
270+
for i in range(0, len(raw_data), chunk_size):
271+
decoder.push_data(raw_data[i : i + chunk_size])
272+
273+
decoder.appsrc.emit("end-of-stream")
274+
275+
# Wait for frames to be decoded
276+
timeout = 5.0
277+
start_time = time.time()
278+
while len(decoder.decoded_frames) == 0 and (time.time() - start_time) < timeout:
279+
time.sleep(0.1)
280+
281+
assert len(decoder.decoded_frames) > 0, "No frames were decoded."
240282

241283
frame = decoder.decoded_frames[0]
242284
assert isinstance(frame, np.ndarray), "Decoded frame is not a numpy array."
243-
assert frame.ndim == 3, f"Expected 3D array (H, W, Channels), got {frame.shape}"
285+
assert frame.ndim == 3, f"Expected 3D array (H, W, C), got shape {frame.shape}."
286+
287+
288+
def test_h264_decoder_stops_cleanly(decoder):
289+
"""Test if the decoder stops without errors."""
290+
decoder.stop()
291+
assert decoder._pipeline.get_state(0)[1] == Gst.State.NULL, (
292+
"Decoder did not shut down properly."
293+
)
244294

245295

246296
def test_pose_from_ros():

vortex_utils/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
# H264Decoder
1+
# H264Decoder (gst_utils.py)
22
Install the dependencies by running the following script: [ci_install_dependencies.sh](/scripts/ci_install_dependencies.sh)

vortex_utils/gst_utils.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import gi
2+
import numpy as np
3+
4+
gi.require_version('Gst', '1.0')
5+
gi.require_version('GstApp', '1.0')
6+
from gi.repository import GLib, Gst
7+
8+
9+
class H264Decoder:
10+
"""Decodes H.264 streams using GStreamer."""
11+
12+
_gst_initialized = False
13+
14+
def __init__(self):
15+
"""Initializes the H.264 decoder and sets up the GStreamer pipeline."""
16+
# Ensure GStreamer is initialized only once
17+
if not H264Decoder._gst_initialized:
18+
Gst.init(None)
19+
H264Decoder._gst_initialized = True
20+
21+
pipeline_desc = (
22+
"appsrc name=mysrc is-live=true ! "
23+
"h264parse ! "
24+
"avdec_h264 ! "
25+
"videoconvert ! video/x-raw,format=BGR ! "
26+
"appsink name=appsink"
27+
)
28+
29+
self._pipeline = Gst.parse_launch(pipeline_desc)
30+
self.appsrc = self._pipeline.get_by_name("mysrc")
31+
self._appsink = self._pipeline.get_by_name("appsink")
32+
33+
self._appsink.set_property("emit-signals", True)
34+
self._appsink.set_property("sync", False)
35+
self._appsink.connect("new-sample", self._on_new_sample)
36+
37+
self._bus = self._pipeline.get_bus()
38+
self._bus.add_signal_watch()
39+
self._bus.connect("message", self._on_bus_message)
40+
41+
self._main_loop = None
42+
self.decoded_frames = []
43+
self.max_frames = 3 # Keep only the last 3 frames here
44+
45+
def start(self):
46+
"""Starts the GStreamer pipeline and runs the main event loop."""
47+
self._pipeline.set_state(Gst.State.PLAYING)
48+
self._main_loop = GLib.MainLoop()
49+
try:
50+
self._main_loop.run()
51+
except KeyboardInterrupt:
52+
pass
53+
finally:
54+
self.stop()
55+
56+
def stop(self):
57+
"""Stops the GStreamer pipeline and cleans up resources."""
58+
if self._pipeline:
59+
self._pipeline.set_state(Gst.State.NULL)
60+
if self._main_loop is not None:
61+
self._main_loop.quit()
62+
self._main_loop = None
63+
64+
def push_data(self, data: bytes):
65+
"""Pushes H.264 encoded data into the pipeline for decoding."""
66+
if not self.appsrc:
67+
raise RuntimeError(
68+
"The pipeline's appsrc element was not found or initialized."
69+
)
70+
gst_buffer = Gst.Buffer.new_allocate(None, len(data), None)
71+
gst_buffer.fill(0, data)
72+
self.appsrc.emit("push-buffer", gst_buffer)
73+
74+
def _on_bus_message(self, bus, message):
75+
"""Handles messages from the GStreamer bus."""
76+
msg_type = message.type
77+
if msg_type == Gst.MessageType.ERROR:
78+
err, debug = message.parse_error()
79+
print(f"GStreamer ERROR: {err}, debug={debug}")
80+
self.stop()
81+
elif msg_type == Gst.MessageType.EOS:
82+
print("End-Of-Stream reached.")
83+
self.stop()
84+
85+
def _on_new_sample(self, sink):
86+
"""Processes a new decoded video frame from the appsink."""
87+
sample = sink.emit("pull-sample")
88+
if not sample:
89+
return Gst.FlowReturn.ERROR
90+
91+
buf = sample.get_buffer()
92+
caps_format = sample.get_caps().get_structure(0)
93+
width = caps_format.get_value("width")
94+
height = caps_format.get_value("height")
95+
96+
success, map_info = buf.map(Gst.MapFlags.READ)
97+
if not success:
98+
return Gst.FlowReturn.ERROR
99+
100+
frame_data = np.frombuffer(map_info.data, dtype=np.uint8)
101+
channels = len(frame_data) // (width * height) # typically 3 (BGR) or 4 (BGRA)
102+
frame_data_reshaped = frame_data.reshape((height, width, channels))
103+
104+
self.decoded_frames.append(frame_data_reshaped.copy())
105+
106+
if len(self.decoded_frames) > self.max_frames:
107+
self.decoded_frames.pop(0)
108+
109+
buf.unmap(map_info)
110+
return Gst.FlowReturn.OK

vortex_utils/python_utils.py

Lines changed: 0 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
11
from dataclasses import dataclass
22

3-
import gi
43
import numpy as np
54
from scipy.spatial.transform import Rotation
65

7-
gi.require_version('Gst', '1.0')
8-
gi.require_version('GstApp', '1.0')
9-
from gi.repository import GLib, Gst
10-
116

127
def ssa(angle: float) -> float:
138
return (angle + np.pi) % (2 * np.pi) - np.pi
@@ -125,108 +120,3 @@ def __add__(self, other: "State") -> "State":
125120

126121
def __sub__(self, other: "State") -> "State":
127122
return State(pose=self.pose - other.pose, twist=self.twist - other.twist)
128-
129-
130-
class H264Decoder:
131-
"""Decodes H.264 streams using GStreamer."""
132-
133-
_gst_initialized = False
134-
135-
def __init__(self):
136-
"""Initializes the H.264 decoder and sets up the GStreamer pipeline."""
137-
# Ensure GStreamer is initialized only once
138-
if not H264Decoder._gst_initialized:
139-
Gst.init(None)
140-
H264Decoder._gst_initialized = True
141-
142-
pipeline_desc = (
143-
"appsrc name=mysrc is-live=true ! "
144-
"h264parse ! "
145-
"avdec_h264 ! "
146-
"videoconvert ! video/x-raw,format=BGR ! "
147-
"appsink name=appsink"
148-
)
149-
150-
self._pipeline = Gst.parse_launch(pipeline_desc)
151-
self.appsrc = self._pipeline.get_by_name("mysrc")
152-
self._appsink = self._pipeline.get_by_name("appsink")
153-
154-
self._appsink.set_property("emit-signals", True)
155-
self._appsink.set_property("sync", False)
156-
self._appsink.connect("new-sample", self._on_new_sample)
157-
158-
self._bus = self._pipeline.get_bus()
159-
self._bus.add_signal_watch()
160-
self._bus.connect("message", self._on_bus_message)
161-
162-
self._main_loop = None
163-
164-
self.decoded_frames = []
165-
self.max_frames = 3 # Keep only the last 3 frames here
166-
167-
def start(self):
168-
"""Starts the GStreamer pipeline and runs the main event loop."""
169-
self._pipeline.set_state(Gst.State.PLAYING)
170-
self._main_loop = GLib.MainLoop()
171-
try:
172-
self._main_loop.run()
173-
except KeyboardInterrupt:
174-
pass
175-
finally:
176-
self.stop()
177-
178-
def stop(self):
179-
"""Stops the GStreamer pipeline and cleans up resources."""
180-
if self._pipeline:
181-
self._pipeline.set_state(Gst.State.NULL)
182-
if self._main_loop is not None:
183-
self._main_loop.quit()
184-
self._main_loop = None
185-
186-
def push_data(self, data: bytes):
187-
"""Pushes H.264 encoded data into the pipeline for decoding."""
188-
if not self.appsrc:
189-
raise RuntimeError(
190-
"The pipeline's appsrc element was not found or initialized."
191-
)
192-
gst_buffer = Gst.Buffer.new_allocate(None, len(data), None)
193-
gst_buffer.fill(0, data)
194-
self.appsrc.emit("push-buffer", gst_buffer)
195-
196-
def _on_bus_message(self, bus, message):
197-
"""Handles messages from the GStreamer bus."""
198-
msg_type = message.type
199-
if msg_type == Gst.MessageType.ERROR:
200-
err, debug = message.parse_error()
201-
print(f"GStreamer ERROR: {err}, debug={debug}")
202-
self.stop()
203-
elif msg_type == Gst.MessageType.EOS:
204-
print("End-Of-Stream reached.")
205-
self.stop()
206-
207-
def _on_new_sample(self, sink):
208-
"""Processes a new decoded video frame from the appsink."""
209-
sample = sink.emit("pull-sample")
210-
if not sample:
211-
return Gst.FlowReturn.ERROR
212-
213-
buf = sample.get_buffer()
214-
caps_format = sample.get_caps().get_structure(0)
215-
width = caps_format.get_value("width")
216-
height = caps_format.get_value("height")
217-
218-
success, map_info = buf.map(Gst.MapFlags.READ)
219-
if not success:
220-
return Gst.FlowReturn.ERROR
221-
222-
frame_data = np.frombuffer(map_info.data, dtype=np.uint8)
223-
channels = len(frame_data) // (width * height) # typically 3 (BGR) or 4 (BGRA)
224-
frame_data_reshaped = frame_data.reshape((height, width, channels))
225-
226-
self.decoded_frames.append(frame_data_reshaped.copy())
227-
228-
if len(self.decoded_frames) > self.max_frames:
229-
self.decoded_frames.pop(0)
230-
231-
buf.unmap(map_info)
232-
return Gst.FlowReturn.OK

0 commit comments

Comments
 (0)