Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/SWC-plusmaze_to_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import shutil
from pathlib import Path

from poseinterface.io import annotations_to_coco
from poseinterface.io import annotations_to_poseinterface

# %%
# Background
Expand Down Expand Up @@ -102,7 +102,7 @@
# Here we use the :func:`annotations_to_coco` function from `poseinterface.io`
# which wraps around `sleap_io` functionality to perform the conversion.

annotations_to_coco(
annotations_to_poseinterface(
input_path=source_annotations_path,
output_json_path=target_annotations_path,
sub_id=subject_id,
Expand Down
131 changes: 131 additions & 0 deletions poseinterface/clips.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""Functions to extract clips from poseinterface videos."""

import argparse
import json
import logging
import sys
from pathlib import Path

import sleap_io as sio


def extract_clip(
video_path: str | Path,
start_frame: int,
duration: int,
):
"""Extract clip and clip labels.

We assume:
- the input video filename is in the format
`sub-<subjectID>_ses-<sessionID>_cam-<camID>.mp4`,
- a `sub-<subjectID>_ses-<sessionID>_cam-<camID>_cliplabels.json`
file with tracks for the full video exists alongside the input video,
where the `id` in `images` corresponds to the global video frame 0-based
indices (note that the local frame index and the global frame index is the
same if the data refers to the whole video),
- `start_frame` is 0-based index,
- `duration` is len(clip).
"""
# Create "Clips" directory if it doesn't exist
video_path = Path(video_path)
clips_dir = video_path.parent / "Clips"
clips_dir.mkdir(parents=True, exist_ok=True)

# Read video as array
video = sio.load_video(video_path)
logging.info(
f"filename: {video_path.name}, fps: {video.fps}, shape: {video.shape}"
)

# Clamp duration if it exceeds the video length
if start_frame + duration > video.shape[0]:
duration = video.shape[0] - start_frame
logging.warning(
"Clip exceeds video length. "
f"Clamping duration to {duration} frames."
)

# Slice clip and save as mp4
clip = video[start_frame : start_frame + duration]
clip_path = (
clips_dir / f"{video.stem}_start-{start_frame}_dur-{duration}.mp4"
)
sio.save_video(clip, clip_path, fps=video.fps)

# Generate cliplabels.json from the full video labels
clip_json = _extract_cliplabels(
video_path, clips_dir, start_frame, duration
)

return clip_path, clip_json


def _extract_cliplabels(video_path, clips_dir, start_frame, duration):
"""Extract clip labels from the video cliplabels.json file."""
# Read file with labels for the whole video
video_json = video_path.parent / f"{video_path.stem}_cliplabels.json"
with open(video_json) as f:
video_labels = json.load(f)

# Keep only data from the images in the clip
clip_labels = {}
clip_labels["images"] = [
img
for img in video_labels["images"]
if start_frame <= img["id"] < start_frame + duration
]
clip_labels["annotations"] = [
annot
for annot in video_labels["annotations"]
if start_frame <= annot["image_id"] < start_frame + duration
]
clip_labels["categories"] = video_labels["categories"]

# Save json with filtered data to clips directory
clip_json = (
clips_dir / f"{video_path.stem}_"
f"start-{start_frame}_dur-{duration}_cliplabels.json"
)
with open(clip_json, "w") as f:
json.dump(clip_labels, f)

return clip_json


def main(args: argparse.Namespace):
# Extract clip
extract_clip(args.video_path, args.start_frame, args.duration)


def parse_args(args) -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description="Extract clips from video")
parser.add_argument(
"--video_path",
type=str,
required=True,
help="Path to video file to clip.",
)
parser.add_argument(
"--start_frame",
type=int,
required=True,
help="Start frame of the clip as a 0-based index.",
)
parser.add_argument(
"--duration",
type=int,
required=True,
help="Total length of the output clip in frames",
)
return parser.parse_args(args)


def wrapper():
args = parse_args(sys.argv[1:])
main(args)


if __name__ == "__main__":
wrapper()
122 changes: 121 additions & 1 deletion poseinterface/io.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
"""Functions to convert annotations and videos to PoseInterface format."""

import copy
import json
import logging
import re
import shutil
from pathlib import Path
from typing import Literal

import sleap_io as sio
from sleap_io.io import coco
from sleap_io.io.cli import _get_video_encoding_info, _is_ffmpeg_available
from sleap_io.io.dlc import is_dlc_file

_EMPTY_LABELS_ERROR_MSG = {
Expand All @@ -24,8 +29,21 @@

POSEINTERFACE_FRAME_REGEXP = r"frame-(\d+)"

# We support sleap's MediaVideo files
EXPECTED_SUFFIX = ".mp4"
EXPECTED_ENCODING = {
"pixelformat": "yuv420p",
"codec": "h264", # codec name
}
REENCODING_PARAMS = {
**EXPECTED_ENCODING,
"codec": "libx264", # overwrite with encoder to use
"crf": 25,
"preset": "superfast",
}

def annotations_to_coco(

def annotations_to_poseinterface(
input_path: Path,
output_json_path: Path,
*,
Expand Down Expand Up @@ -246,3 +264,105 @@ def _pad_integers_to_same_width(input: list[int]) -> list[str]:
width = len(str(max(input)))
padded_numbers = [str(number).zfill(width) for number in input]
return padded_numbers


def video_to_poseinterface(
input_video: Path | str,
output_video_dir: Path | str,
*,
sub_id: str,
ses_id: str,
cam_id: str,
) -> Path:
"""Reencode and rename video."""
# Check if ffmpeg is available
_check_ffmpeg()

# Compute output_video_path
output_video = (
Path(output_video_dir) / f"sub-{sub_id}_ses-{ses_id}_cam-{cam_id}.mp4"
)
# Ensure parent directories exist
Path(output_video_dir).mkdir(parents=True, exist_ok=True)

# Check if reencoding is required
if not _needs_reencoding(input_video):
# If not, copy file and rename
shutil.copy(input_video, output_video)
else:
# Else, reencode video and rename
_reencode_video(input_video, output_video)

return output_video


def _check_ffmpeg() -> None:
"""Check FFMPEG availability."""
sio.set_default_video_plugin("ffmpeg")
if not _is_ffmpeg_available():
raise RuntimeError("ffmpeg is required but not found")


def _needs_reencoding(input_video_path: str | Path) -> bool:
"""Check if reencoding is required."""
input_video_path = Path(input_video_path)
logging.info(f"Input video: {input_video_path}")

# Check if suffix is mp4
if input_video_path.suffix.lower() != EXPECTED_SUFFIX:
return True

# Check codec and pixelformat
encoding = _get_codec_pixelformat(input_video_path)
if encoding != EXPECTED_ENCODING:
logging.warning(
f"Video encoding {encoding} does not match "
f"expected {EXPECTED_ENCODING}. Please reencode "
"using the `reencode_video()` function."
)
return True
return False


def _get_codec_pixelformat(input_video_path: str | Path) -> dict[str, str]:
"""Get video encoding parameters as dictionary.
It wraps sleap-io's `_get_video_encoding_info`, which
uses `ffmpeg -i` to extract metadata without requiring
`ffprobe` to be in PATH.
Notes
-----
`_get_video_encoding_info` returns a `VideoEncodingInfo`
object with the following attributes:
- codec: Video codec name (e.g., "h264", "hevc").
- codec_profile: Codec profile (e.g., "Main", "High").
- pixel_format: Pixel format (e.g., "yuv420p").
- bitrate_kbps: Bitrate in kilobits per second.
- fps: Frames per second.
- gop_size: Group of pictures size (keyframe interval).
- container: Container format (e.g., "mov", "avi").
"""
info = _get_video_encoding_info(input_video_path)
return {
"codec": info.codec,
"pixelformat": info.pixel_format,
}


def _reencode_video(
input_video_path: str | Path,
output_video_path: str | Path,
) -> Path:
"""Reencode video to default format."""
# Read and save reencoded video
video = sio.load_video(Path(input_video_path))
reencoded_video_path = sio.save_video(
video,
filename=output_video_path,
fps=video.fps,
**REENCODING_PARAMS,
)
logging.info(f"Re-encoded video saved to {reencoded_video_path}")
return reencoded_video_path
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ docs = [
"sphinx-sitemap",
]

[project.scripts]
extract-clip = "poseinterface.video:wrapper"

[build-system]
requires = [
"setuptools>=64",
Expand Down Expand Up @@ -133,3 +136,4 @@ commands =
[tool.codespell]
skip = '.git'
check-hidden = true
ignore-words-list = 'reencode'
4 changes: 2 additions & 2 deletions tests/test_integration/test_io.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from poseinterface.io import annotations_to_coco
from poseinterface.io import annotations_to_poseinterface


@pytest.mark.parametrize(
Expand All @@ -18,6 +18,6 @@ def test_annotations_to_coco(input_path, tmp_path, test_ids, request):
input_path = request.getfixturevalue(input_path)
output_json_path = tmp_path / "output.json"

annotations_to_coco(input_path, output_json_path, **test_ids)
annotations_to_poseinterface(input_path, output_json_path, **test_ids)

assert output_json_path.exists()
8 changes: 4 additions & 4 deletions tests/test_unit/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
_generate_poseinterface_filenames,
_pad_integers_to_same_width,
_update_image_ids,
annotations_to_coco,
annotations_to_poseinterface,
)


Expand All @@ -35,7 +35,7 @@ def test_annotations_to_coco(
# Run function to test
input_csv = tmp_path / "input.csv"
output_path = tmp_path / "output.json"
result = annotations_to_coco(
result = annotations_to_poseinterface(
input_csv,
output_path,
**test_ids,
Expand Down Expand Up @@ -83,7 +83,7 @@ def test_annotations_to_coco_invalid(
with pytest.raises(
ValueError, match=_EMPTY_LABELS_ERROR_MSG[error_message]
):
annotations_to_coco(
annotations_to_poseinterface(
input_file,
tmp_path / "output.json",
**test_ids,
Expand Down Expand Up @@ -111,7 +111,7 @@ def test_annotations_to_coco_not_single_video(
ValueError,
match=(r"The annotations refer to multiple videos.*Please check .*"),
):
annotations_to_coco(
annotations_to_poseinterface(
tmp_path / "input.csv",
tmp_path / "output.json",
**test_ids,
Expand Down
Loading