Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 34 additions & 13 deletions viseron/components/darknet/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Darknet object detection."""

from __future__ import annotations

import configparser
Expand All @@ -8,13 +9,11 @@
import pwd
from abc import ABC, abstractmethod
from queue import Empty, Queue
from typing import Any
from typing import TYPE_CHECKING, Any

import cv2
import numpy as np
import voluptuous as vol

from viseron import Viseron
from viseron.const import ENV_CUDA_SUPPORTED, ENV_OPENCL_SUPPORTED
from viseron.domains import OptionalDomain, RequireDomain, setup_domain
from viseron.domains.motion_detector.const import DOMAIN as MOTION_DETECTOR_DOMAIN
Expand Down Expand Up @@ -64,6 +63,11 @@
DNN_TARGETS,
)

if TYPE_CHECKING:
import numpy as np

from viseron import Viseron

LOGGER = logging.getLogger(__name__)

CONFIG_SCHEMA = vol.Schema(
Expand Down Expand Up @@ -140,7 +144,7 @@ def setup_domains(vis: Viseron, config: dict[str, Any]) -> None:
"""Set up darknet domains."""
config = config[COMPONENT]

for camera_identifier in config[CONFIG_OBJECT_DETECTOR][CONFIG_CAMERAS].keys():
for camera_identifier in config[CONFIG_OBJECT_DETECTOR][CONFIG_CAMERAS]:
setup_domain(
vis,
COMPONENT,
Expand All @@ -162,6 +166,13 @@ def setup_domains(vis: Viseron, config: dict[str, Any]) -> None:
)


def unload(vis: Viseron) -> None:
"""Unload the darknet component."""
if COMPONENT in vis.data:
vis.data[COMPONENT].stop()
del vis.data[COMPONENT]


class LoadDarknetError(ViseronError):
"""Raised when failing to load Darknet network."""

Expand Down Expand Up @@ -210,22 +221,26 @@ def model_height(self) -> int:
return self._model_height

@property
def model_res(self):
def model_res(self) -> tuple[int, int]:
"""Return trained model resolution."""
return self.model_width, self.model_height

@abstractmethod
def preprocess(self, frame):
def preprocess(self, frame: np.ndarray) -> np.ndarray | bytes:
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BaseDarknet.preprocess abstract method's frame parameter now has a np.ndarray type annotation, and DarknetDNN.preprocess was also updated in this PR. However, DarknetNative.preprocess at line 447 still has an untyped frame parameter (def preprocess(self, frame) -> bytes:). This PR introduced the type annotation to the base class and one subclass, but missed updating DarknetNative.preprocess to be consistent.

Suggested change
def preprocess(self, frame: np.ndarray) -> np.ndarray | bytes:
def preprocess(self, frame: Any) -> np.ndarray | bytes:

Copilot uses AI. Check for mistakes.
"""Pre process frame before detection."""

@abstractmethod
def detect(self, frame, camera_identifier, result_queue, min_confidence):
"""Perform detection."""

@abstractmethod
def post_process(self, detections, camera_resolution):
def post_process(self, detections, camera_resolution) -> list[DetectedObject]:
"""Post process detections."""

@abstractmethod
def stop(self) -> None:
"""Stop Darknet."""


class DarknetDNNError(ViseronError):
"""Raised when failing to load Darknet in subprocess."""
Expand Down Expand Up @@ -278,7 +293,7 @@ def spawn_subprocess(self) -> RestartablePopen:
stderr=self._log_pipe,
)

def preprocess(self, frame) -> np.ndarray:
def preprocess(self, frame: np.ndarray) -> np.ndarray:
"""Pre process frame before detection."""
return cv2.resize(
frame,
Expand Down Expand Up @@ -315,12 +330,10 @@ def work_output(self, item) -> None:
return
pop_if_full(self._result_queues[item["camera_identifier"]], item)

def post_process(self, detections, camera_resolution):
def post_process(self, detections, camera_resolution) -> list[DetectedObject]:
"""Post process detections."""
_detections = []
for (label, confidence, box) in zip(
detections[0], detections[1], detections[2]
):
for label, confidence, box in zip(detections[0], detections[1], detections[2]):
_detections.append(
DetectedObject.from_absolute(
self.labels[int(label)],
Expand Down Expand Up @@ -354,6 +367,10 @@ def dnn_preferable_target(self) -> int:
return DNN_TARGETS[DNN_OPENCL]
return DNN_TARGETS[DNN_CPU]

def stop(self) -> None:
"""Stop Darknet."""
SubProcessWorker.stop(self)


class DarknetNative(BaseDarknet, ChildProcessWorker):
"""Darknet object detector interface using native Darknet.
Expand Down Expand Up @@ -454,7 +471,7 @@ def detect(
return None
return item["result"]

def post_process(self, detections, camera_resolution):
def post_process(self, detections, camera_resolution) -> list[DetectedObject]:
"""Post process detections."""
_detections = []
for label, confidence, box in detections:
Expand Down Expand Up @@ -493,3 +510,7 @@ def darknet_data_path(self) -> str:
"""Return path to Darknet data file."""
homedir = os.path.expanduser(f"~{pwd.getpwuid(os.geteuid())[0]}")
return f"{homedir}/darknet_data.data"

def stop(self) -> None:
"""Stop Darknet."""
ChildProcessWorker.stop(self)
Loading