Skip to content

Commit cb6b5b0

Browse files
committed
refactor(H264Decoder): split up tests
1 parent 737b4b4 commit cb6b5b0

File tree

1 file changed

+61
-11
lines changed

1 file changed

+61
-11
lines changed

tests/test_utils.py

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import threading
2+
import time
23

34
import numpy as np
45
import pytest
56
from geometry_msgs.msg import Pose, Twist
7+
from gi.repository import Gst
68

79
from vortex_utils.python_utils import (
810
H264Decoder,
@@ -214,33 +216,81 @@ def test_state_subtraction_twist():
214216
assert (state1 - state2).twist == TwistData(0.9, 1.8, 2.7, 0, 0, 0)
215217

216218

217-
def test_h264_decoder():
218-
test_file = "tests/resources/test_video.h264"
219-
219+
@pytest.fixture
220+
def decoder():
221+
"""Fixture to create and clean up an H264Decoder instance."""
220222
decoder = H264Decoder()
221-
222223
decoding_thread = threading.Thread(target=decoder.start, daemon=True)
223224
decoding_thread.start()
225+
yield decoder
226+
decoder.stop()
227+
decoding_thread.join()
228+
224229

230+
def test_h264_decoder_initialization(decoder):
231+
"""Test that the H264Decoder initializes correctly."""
232+
assert decoder.appsrc is not None, "Appsrc element is missing."
233+
assert decoder._appsink is not None, "Appsink element is missing."
234+
assert decoder._pipeline is not None, "Pipeline was not initialized."
235+
assert decoder._bus is not None, "GStreamer bus was not initialized."
236+
237+
238+
def test_h264_decoder_decodes_frames(decoder):
239+
"""Test if the decoder correctly processes an H.264 stream."""
240+
test_file = "tests/resources/test_video.h264"
241+
242+
# Read and push H.264 data
225243
with open(test_file, "rb") as f:
226244
raw_data = f.read()
227245

228246
chunk_size = 64
229247
for i in range(0, len(raw_data), chunk_size):
230-
chunk = raw_data[i : i + chunk_size]
231-
decoder.push_data(chunk)
248+
decoder.push_data(raw_data[i : i + chunk_size])
232249

233250
decoder.appsrc.emit("end-of-stream")
234251

235-
decoding_thread.join(timeout=5.0)
252+
# Wait for frames to be decoded
253+
timeout = 5.0
254+
start_time = time.time()
255+
while len(decoder.decoded_frames) == 0 and (time.time() - start_time) < timeout:
256+
time.sleep(0.1)
257+
258+
assert len(decoder.decoded_frames) > 0, "No frames were decoded."
236259

237-
assert len(decoder.decoded_frames) > 0, (
238-
"No frames were decoded from the H.264 stream."
239-
)
260+
261+
def test_h264_decoder_frame_properties(decoder):
262+
"""Test if decoded frames have correct properties."""
263+
test_file = "tests/resources/test_video.h264"
264+
265+
# Read and push H.264 data
266+
with open(test_file, "rb") as f:
267+
raw_data = f.read()
268+
269+
chunk_size = 64
270+
for i in range(0, len(raw_data), chunk_size):
271+
decoder.push_data(raw_data[i : i + chunk_size])
272+
273+
decoder.appsrc.emit("end-of-stream")
274+
275+
# Wait for frames to be decoded
276+
timeout = 5.0
277+
start_time = time.time()
278+
while len(decoder.decoded_frames) == 0 and (time.time() - start_time) < timeout:
279+
time.sleep(0.1)
280+
281+
assert len(decoder.decoded_frames) > 0, "No frames were decoded."
240282

241283
frame = decoder.decoded_frames[0]
242284
assert isinstance(frame, np.ndarray), "Decoded frame is not a numpy array."
243-
assert frame.ndim == 3, f"Expected 3D array (H, W, Channels), got {frame.shape}"
285+
assert frame.ndim == 3, f"Expected 3D array (H, W, C), got shape {frame.shape}."
286+
287+
288+
def test_h264_decoder_stops_cleanly(decoder):
289+
"""Test if the decoder stops without errors."""
290+
decoder.stop()
291+
assert decoder._pipeline.get_state(0)[1] == Gst.State.NULL, (
292+
"Decoder did not shut down properly."
293+
)
244294

245295

246296
def test_pose_from_ros():

0 commit comments

Comments
 (0)