Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions src/reachy_mini/media/camera_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
import numpy as np
import numpy.typing as npt

from reachy_mini.media.camera_constants import CameraResolution
from reachy_mini.media.camera_constants import (
CameraResolution,
CameraSpecs,
MujocoCameraSpecs,
)


class CameraBase(ABC):
Expand All @@ -20,22 +24,43 @@ class CameraBase(ABC):
def __init__(
self,
log_level: str = "INFO",
resolution: CameraResolution = CameraResolution.R1280x720,
) -> None:
"""Initialize the camera."""
self.logger = logging.getLogger(__name__)
self.logger.setLevel(log_level)
self._resolution = resolution
self._resolution: Optional[CameraResolution] = None
self.camera_specs: Optional[CameraSpecs] = None

@property
def resolution(self) -> tuple[int, int]:
"""Get the current camera resolution as a tuple (width, height)."""
if self._resolution is None:
raise RuntimeError("Camera resolution is not set.")
return (self._resolution.value[0], self._resolution.value[1])

@property
def framerate(self) -> int:
"""Get the current camera frames per second."""
return self._resolution.value[2]
if self._resolution is None:
raise RuntimeError("Camera resolution is not set.")
return int(self._resolution.value[2])

def set_resolution(self, resolution: CameraResolution) -> None:
"""Set the camera resolution."""
if self.camera_specs is None:
raise RuntimeError(
"Camera specs not set. Open the camera before setting the resolution."
)

if isinstance(self.camera_specs, MujocoCameraSpecs):
raise RuntimeError(
"Cannot change resolution of Mujoco simulated camera for now."
)

if resolution not in self.camera_specs.available_resolutions:
raise ValueError(
f"Resolution not supported by the camera. Available resolutions are : {self.camera_specs.available_resolutions}"
)

@abstractmethod
def open(self) -> None:
Expand Down
96 changes: 94 additions & 2 deletions src/reachy_mini/media/camera_constants.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
"""Camera constants for Reachy Mini."""

from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional

import numpy as np


class CameraResolution(Enum):
"""Base class for camera resolutions."""

pass


class ArduCamResolution(CameraResolution):
"""Camera resolutions. Arducam_12MP."""

R2304x1296 = (2304, 1296, 30)
Expand All @@ -13,13 +23,95 @@ class CameraResolution(Enum):
R1280x720 = (1280, 720, 30)


class RPICameraResolution(Enum):
class RPICameraResolution(CameraResolution):
"""Camera resolutions. Raspberry Pi Camera.

Camera supports higher resolutions but the h264 encoder won't follow.
"""

R1920x1080 = (1920, 1080, 30)
R1920x1080 = (1920, 1080, 60)
R1600x1200 = (1600, 1200, 30)
R1536x864 = (1536, 864, 40)
R1280x720 = (1280, 720, 60)


class MujocoCameraResolution(CameraResolution):
"""Camera resolutions for Mujoco simulated camera."""

R1280x720 = (1280, 720, 60)


@dataclass
class CameraSpecs:
"""Base camera specifications."""

available_resolutions: List[CameraResolution] = field(default_factory=list)
default_resolution: Optional[CameraResolution] = None
vid = 0
pid = 0
# TODO TEMPORARY
K = np.array(
[[550.3564, 0.0, 638.0112], [0.0, 549.1653, 364.589], [0.0, 0.0, 1.0]]
) # FOR 1280x720
D = np.array([-0.0694, 0.1565, -0.0004, 0.0003, -0.0983])


@dataclass
class ArducamSpecs(CameraSpecs):
"""Arducam camera specifications."""

available_resolutions = [
ArduCamResolution.R2304x1296,
ArduCamResolution.R4608x2592,
ArduCamResolution.R1920x1080,
ArduCamResolution.R1600x1200,
ArduCamResolution.R1280x720,
]
default_resolution = ArduCamResolution.R1280x720
vid = 0x0C45
pid = 0x636D
# TODO handle calibration depending on resolution ? How ?
K = np.array(
[[550.3564, 0.0, 638.0112], [0.0, 549.1653, 364.589], [0.0, 0.0, 1.0]]
) # FOR 1280x720
D = np.array([-0.0694, 0.1565, -0.0004, 0.0003, -0.0983])


@dataclass
class ReachyMiniCamSpecs(CameraSpecs):
"""Reachy Mini camera specifications."""

available_resolutions = [
RPICameraResolution.R1920x1080,
RPICameraResolution.R1600x1200,
RPICameraResolution.R1536x864,
RPICameraResolution.R1280x720,
]
default_resolution = RPICameraResolution.R1920x1080
vid = 0x38FB
pid = 0x1002


@dataclass
class OlderRPiCamSpecs(CameraSpecs):
"""Older Raspberry Pi camera specifications."""

available_resolutions = [
RPICameraResolution.R1920x1080,
RPICameraResolution.R1600x1200,
RPICameraResolution.R1536x864,
RPICameraResolution.R1280x720,
]
default_resolution = RPICameraResolution.R1920x1080
vid = 0x1BCF
pid = 0x28C4


@dataclass
class MujocoCameraSpecs(CameraSpecs):
"""Mujoco simulated camera specifications."""

available_resolutions = [
MujocoCameraResolution.R1280x720,
]
default_resolution = MujocoCameraResolution.R1280x720
77 changes: 56 additions & 21 deletions src/reachy_mini/media/camera_gstreamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
"""

from threading import Thread
from typing import Optional
from typing import Optional, cast

import numpy as np
import numpy.typing as npt

from reachy_mini.media.camera_constants import CameraResolution
from reachy_mini.media.camera_constants import (
ArducamSpecs,
CameraResolution,
CameraSpecs,
ReachyMiniCamSpecs,
)

try:
import gi
Expand All @@ -35,27 +40,32 @@ class GStreamerCamera(CameraBase):
def __init__(
self,
log_level: str = "INFO",
resolution: CameraResolution = CameraResolution.R1280x720,
) -> None:
"""Initialize the GStreamer camera."""
super().__init__(log_level=log_level, resolution=resolution)
super().__init__(log_level=log_level)
Gst.init(None)
self._loop = GLib.MainLoop()
self._thread_bus_calls: Optional[Thread] = None

self.pipeline = Gst.Pipeline.new("camera_recorder")

# TODO How do we hande video device not found ?
cam_path = self.get_video_device()
if self.camera_specs is None:
raise RuntimeError("Camera specs not set")
self._resolution = self.camera_specs.default_resolution

if self._resolution is None:
raise RuntimeError("Failed to get default camera resolution.")

# note for some applications the jpeg image could be directly used
self._appsink_video: GstApp = Gst.ElementFactory.make("appsink")
caps_video = Gst.Caps.from_string(
f"video/x-raw,format=BGR, width={self.resolution[0]},height={self.resolution[1]},framerate={self.framerate}/1"
)
self._appsink_video.set_property("caps", caps_video)
self.set_resolution(self._resolution)
self._appsink_video.set_property("drop", True) # avoid overflow
self._appsink_video.set_property("max-buffers", 1) # keep last image only
self.pipeline.add(self._appsink_video)

cam_path = self.get_arducam_video_device()
# cam_path = self.get_video_device()
if cam_path == "":
self.logger.warning("Recording pipeline set without camera.")
self.pipeline.remove(self._appsink_video)
Expand Down Expand Up @@ -96,6 +106,22 @@ def _handle_bus_calls(self) -> None:
bus.remove_watch()
self.logger.debug("bus message loop stopped")

def set_resolution(self, resolution: CameraResolution) -> None:
"""Set the camera resolution."""
super().set_resolution(resolution)

# Check if pipeline is not playing before changing resolution
if self.pipeline.get_state(0).state == Gst.State.PLAYING:
raise RuntimeError(
"Cannot change resolution while the camera is streaming. Please close the camera first."
)

self._resolution = resolution
caps_video = Gst.Caps.from_string(
f"video/x-raw,format=BGR, width={self._resolution.value[0]},height={self._resolution.value[1]},framerate={self.framerate}/1"
)
self._appsink_video.set_property("caps", caps_video)

def open(self) -> None:
"""Open the camera using GStreamer."""
self.pipeline.set_state(Gst.State.PLAYING)
Expand Down Expand Up @@ -136,25 +162,34 @@ def close(self) -> None:
self._loop.quit()
self.pipeline.set_state(Gst.State.NULL)

def get_arducam_video_device(self) -> str:
"""Use Gst.DeviceMonitor to find the unix camera path /dev/videoX of the Arducam_12MP webcam.
def get_video_device(self) -> str:
"""Use Gst.DeviceMonitor to find the unix camera path /dev/videoX.

Returns the device path (e.g., '/dev/video2'), or '' if not found.
"""
monitor = Gst.DeviceMonitor()
monitor.add_filter("Video/Source")
monitor.start()

cam_names = ["Reachy", "Arducam_12MP"]

devices = monitor.get_devices()
for device in devices:
name = device.get_display_name()
device_props = device.get_properties()
if name and "Arducam_12MP" in name:
if device_props and device_props.has_field("api.v4l2.path"):
device_path = device_props.get_string("api.v4l2.path")
self.logger.debug(f"Found Arducam_12MP at {device_path}")
monitor.stop()
return str(device_path)
for cam_name in cam_names:
for device in devices:
name = device.get_display_name()
device_props = device.get_properties()

if cam_name in name:
if device_props and device_props.has_field("api.v4l2.path"):
device_path = device_props.get_string("api.v4l2.path")
self.camera_specs = (
cast(CameraSpecs, ArducamSpecs)
if cam_name == "Arducam_12MP"
else cast(CameraSpecs, ReachyMiniCamSpecs)
)
self.logger.debug(f"Found {cam_name} camera at {device_path}")
monitor.stop()
return str(device_path)
monitor.stop()
self.logger.warning("Arducam_12MP webcam not found.")
self.logger.warning("No camera found.")
return ""
35 changes: 27 additions & 8 deletions src/reachy_mini/media/camera_opencv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
This module provides an implementation of the CameraBase class using OpenCV.
"""

from typing import Optional
from typing import Optional, cast

import cv2
import numpy as np
import numpy.typing as npt

from reachy_mini.media.camera_constants import CameraResolution
from reachy_mini.media.camera_constants import (
CameraResolution,
CameraSpecs,
MujocoCameraSpecs,
)
from reachy_mini.media.camera_utils import find_camera

from .camera_base import CameraBase
Expand All @@ -21,22 +25,37 @@ class OpenCVCamera(CameraBase):
def __init__(
self,
log_level: str = "INFO",
resolution: CameraResolution = CameraResolution.R1280x720,
) -> None:
"""Initialize the OpenCV camera."""
super().__init__(log_level=log_level, resolution=resolution)
super().__init__(log_level=log_level)
self.cap: Optional[cv2.VideoCapture] = None

def set_resolution(self, resolution: CameraResolution) -> None:
"""Set the camera resolution."""
super().set_resolution(resolution)

self._resolution = resolution
if self.cap is not None:
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self._resolution.value[0])
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self._resolution.value[1])

def open(self, udp_camera: Optional[str] = None) -> None:
"""Open the camera using OpenCV VideoCapture."""
if udp_camera:
self.cap = cv2.VideoCapture(udp_camera)
self.camera_specs = cast(CameraSpecs, MujocoCameraSpecs)
self._resolution = self.camera_specs.default_resolution
else:
self.cap = find_camera()
if self.cap is None:
self.cap, self.camera_specs = find_camera()
if self.cap is None or self.camera_specs is None:
raise RuntimeError("Camera not found")
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])

self._resolution = self.camera_specs.default_resolution
if self._resolution is None:
raise RuntimeError("Failed to get default camera resolution.")

self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self._resolution.value[0])
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self._resolution.value[1])

if not self.cap.isOpened():
raise RuntimeError("Failed to open camera")
Expand Down
Loading
Loading