Skip to content

Commit 5d576f8

Browse files
authored
Merge pull request #1280 from roflcoopter/feature/ffmpeg-reload
feat(ffmpeg): support config reload
2 parents 88e9f13 + 8e78116 commit 5d576f8

File tree

2 files changed

+88
-60
lines changed

2 files changed

+88
-60
lines changed

viseron/components/ffmpeg/camera.py

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,9 @@ def __init__(
329329
) -> None:
330330
# Add password to SensitiveInformationFilter.
331331
# It is done in AbstractCamera but since we are calling Stream before
332-
# super().__init__ we need to do it here as well
332+
# super().__init__ we need to do it here as well.
333+
# For this reason we dont have to clear the SensitiveInformationFilter
334+
# on unload since AbstractCamera will do it in its unload method
333335
if config[CONFIG_PASSWORD]:
334336
SensitiveInformationFilter.add_sensitive_string(config[CONFIG_PASSWORD])
335337
SensitiveInformationFilter.add_sensitive_string(
@@ -350,14 +352,20 @@ def __init__(
350352
] = mp.Queue(maxsize=2)
351353
self._capture_frames = mp.Event()
352354
self._thread_stuck = False
353-
self.resolution = None
355+
self.resolution = self.stream.width, self.stream.height
354356
self.decode_error = mp.Event()
355357

356358
if cv2.ocl.haveOpenCL():
357359
cv2.ocl.setUseOpenCL(True)
358360
self._recorder = Recorder(vis, config, self)
359361

360-
self.initialize_camera()
362+
self._check_segment_process_thread: RestartableThread | None = None
363+
364+
self._logger.debug(
365+
f"Resolution: {self.resolution[0]}x{self.resolution[1]} "
366+
f"@ {self.stream.fps} FPS"
367+
)
368+
self._logger.debug(f"Camera {self.name} initialized")
361369

362370
def _create_frame_reader(self) -> tuple[RestartableProcess, RestartableThread]:
363371
"""Return a frame reader thread."""
@@ -391,7 +399,7 @@ def _start_recording_only(self) -> None:
391399
self._logger.debug("Starting recording only mode")
392400

393401
def check_segment_process() -> None:
394-
while self.is_on:
402+
while self._capture_frames.is_set():
395403
time.sleep(1)
396404
if (
397405
self.stream.segment_process
@@ -402,26 +410,14 @@ def check_segment_process() -> None:
402410
self.connected = False
403411
self.connected = False
404412

405-
RestartableThread(
413+
self._check_segment_process_thread = RestartableThread(
406414
name="viseron.camera." + self.identifier + ".segment_check",
407415
target=check_segment_process,
408416
daemon=True,
409417
register=True,
410-
).start()
411-
412-
self.stream.record_only()
413-
414-
def initialize_camera(self) -> None:
415-
"""Start processing of camera frames."""
416-
self._logger.debug(f"Initializing camera {self.name}")
417-
418-
self.resolution = self.stream.width, self.stream.height
419-
self._logger.debug(
420-
f"Resolution: {self.resolution[0]}x{self.resolution[1]} "
421-
f"@ {self.stream.fps} FPS"
422418
)
423-
424-
self._logger.debug(f"Camera {self.name} initialized")
419+
self._check_segment_process_thread.start()
420+
self.stream.record_only()
425421

426422
def read_frames(
427423
self,
@@ -545,12 +541,12 @@ def calculate_output_fps(self, scanners: list[FrameIntervalCalculator]) -> None:
545541

546542
def _start_camera(self) -> None:
547543
"""Start capturing frames from camera."""
544+
self._capture_frames.set()
548545
if self._config[CONFIG_RECORD_ONLY]:
549546
self._start_recording_only()
550547
return
551548

552549
self._logger.debug("Starting capture thread")
553-
self._capture_frames.set()
554550
if not self._frame_reader or not self._frame_reader.is_alive():
555551
self._logger.debug("Creating new frame reader")
556552
self._frame_reader, self._frame_relay = self._create_frame_reader()
@@ -559,13 +555,14 @@ def _start_camera(self) -> None:
559555

560556
def _stop_camera(self) -> None:
561557
"""Release the connection to the camera."""
562-
self._logger.debug("Stopping capture thread")
563558
self._capture_frames.clear()
564559
if self._frame_relay:
560+
self._logger.debug("Stopping capture thread")
565561
self._frame_relay.stop()
566562
self._frame_relay.join(timeout=5)
567563

568564
if self._frame_reader:
565+
self._logger.debug("Stopping frame reader process")
569566
self._frame_reader.stop()
570567
self._frame_reader.join(timeout=5)
571568
if self._frame_reader.is_alive():
@@ -575,6 +572,13 @@ def _stop_camera(self) -> None:
575572
self._frame_reader = None
576573
self.stream.close_pipe()
577574

575+
if self._config[CONFIG_RECORD_ONLY] and self._check_segment_process_thread:
576+
self._logger.debug("Stopping record-only process")
577+
self._check_segment_process_thread.stop()
578+
self._check_segment_process_thread.join(timeout=5)
579+
self._check_segment_process_thread = None
580+
self.stream.close_pipe()
581+
578582
def start_recorder(
579583
self,
580584
shared_frame: SharedFrame,
@@ -598,12 +602,12 @@ def output_fps(self, fps: int) -> None:
598602
self.stream.output_fps = fps
599603

600604
@property
601-
def resolution(self):
605+
def resolution(self) -> tuple[int, int]:
602606
"""Return stream resolution."""
603607
return self._resolution
604608

605609
@resolution.setter
606-
def resolution(self, resolution) -> None:
610+
def resolution(self, resolution: tuple[int, int]) -> None:
607611
"""Set stream resolution."""
608612
self._resolution = resolution
609613

viseron/components/ffmpeg/stream.py

Lines changed: 61 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Class to interact with an FFmpeg stream."""
2+
23
from __future__ import annotations
34

45
import json
@@ -107,14 +108,14 @@ def __init__(
107108

108109
self._camera: Camera = camera
109110

110-
self._pipe: sp.Popen | None = None
111+
self._pipe: RestartablePopen | None = None
111112
self.segment_process: RestartablePopen | None = None
112113
self._log_pipe: LogPipe | None = None
113114
self._ffprobe = FFprobe(config, camera_identifier, attempt)
114115

115116
self._mainstream = self.get_stream_information(config)
116117
self._substream = None
117-
if config.get(CONFIG_SUBSTREAM, None):
118+
if config.get(CONFIG_SUBSTREAM):
118119
self._substream = self.get_stream_information(config[CONFIG_SUBSTREAM])
119120

120121
self._output_fps = self.fps
@@ -131,7 +132,7 @@ def __init__(
131132
self.create_symlink(self.segments_alias)
132133

133134
@property
134-
def output_args(self):
135+
def output_args(self) -> list:
135136
"""Return FFmpeg output args."""
136137
return [
137138
"-f",
@@ -152,7 +153,7 @@ def segments_alias(self) -> str:
152153
return f"ffmpeg_{self._camera_identifier}_seg"
153154

154155
@staticmethod
155-
def create_symlink(alias) -> None:
156+
def create_symlink(alias: str) -> None:
156157
"""Create a symlink to FFmpeg executable.
157158
158159
This is done to know which FFmpeg command belongs to which camera.
@@ -194,12 +195,12 @@ def fps(self) -> int:
194195
return self._mainstream.fps
195196

196197
@property
197-
def output_fps(self):
198+
def output_fps(self) -> int:
198199
"""Return stream output FPS."""
199200
return self._output_fps
200201

201202
@output_fps.setter
202-
def output_fps(self, fps) -> None:
203+
def output_fps(self, fps: int) -> None:
203204
self._output_fps = fps
204205

205206
def get_stream_url(self, stream_config: dict[str, Any]) -> str:
@@ -212,8 +213,7 @@ def get_stream_url(self, stream_config: dict[str, Any]) -> str:
212213

213214
protocol = (
214215
stream_config[CONFIG_PROTOCOL]
215-
if stream_config[CONFIG_PROTOCOL]
216-
else STREAM_FORMAT_MAP[stream_config[CONFIG_STREAM_FORMAT]]["protocol"]
216+
or STREAM_FORMAT_MAP[stream_config[CONFIG_STREAM_FORMAT]]["protocol"]
217217
)
218218

219219
return (
@@ -242,11 +242,21 @@ def get_stream_information(
242242
stream_url, stream_config
243243
)
244244

245-
width = stream_config[CONFIG_WIDTH] if stream_config[CONFIG_WIDTH] else width
245+
width = (
246+
stream_config[CONFIG_WIDTH] # noqa: FURB110
247+
if stream_config[CONFIG_WIDTH]
248+
else width
249+
)
246250
height = (
247-
stream_config[CONFIG_HEIGHT] if stream_config[CONFIG_HEIGHT] else height
251+
stream_config[CONFIG_HEIGHT] # noqa: FURB110
252+
if stream_config[CONFIG_HEIGHT]
253+
else height
254+
)
255+
fps = (
256+
stream_config[CONFIG_FPS] # noqa: FURB110
257+
if stream_config[CONFIG_FPS]
258+
else fps
248259
)
249-
fps = stream_config[CONFIG_FPS] if stream_config[CONFIG_FPS] else fps
250260
codec = (
251261
stream_config[CONFIG_CODEC]
252262
if stream_config[CONFIG_CODEC] != DEFAULT_CODEC
@@ -276,36 +286,38 @@ def get_stream_information(
276286
)
277287

278288
@staticmethod
279-
def get_decoder_codec(stream_config: dict[str, Any], stream_codec: str):
289+
def get_decoder_codec(
290+
stream_config: dict[str, Any], stream_codec: str
291+
) -> list[str]:
280292
"""Return decoder codec set in config or from predefined codec map."""
281293
if stream_config[CONFIG_CODEC] and stream_config[CONFIG_CODEC] != DEFAULT_CODEC:
282294
return ["-c:v", stream_config[CONFIG_CODEC]]
283295

284296
codec = None
285297
codec_map = None
286-
if stream_codec:
287-
if stream_config[CONFIG_STREAM_FORMAT] in ["rtsp", "rtmp"]:
288-
if os.getenv(ENV_RASPBERRYPI3) == "true":
289-
codec_map = HWACCEL_RPI3_DECODER_CODEC_MAP
290-
elif os.getenv(ENV_RASPBERRYPI4) == "true":
291-
codec_map = HWACCEL_RPI4_DECODER_CODEC_MAP
292-
elif os.getenv(ENV_JETSON_NANO) == "true":
293-
codec_map = HWACCEL_JETSON_NANO_DECODER_CODEC_MAP
294-
elif os.getenv(ENV_CUDA_SUPPORTED) == "true":
295-
codec_map = HWACCEL_CUDA_DECODER_CODEC_MAP
296-
if codec_map:
297-
codec = codec_map.get(stream_codec, None)
298+
if stream_codec and stream_config[CONFIG_STREAM_FORMAT] in ["rtsp", "rtmp"]:
299+
if os.getenv(ENV_RASPBERRYPI3) == "true":
300+
codec_map = HWACCEL_RPI3_DECODER_CODEC_MAP
301+
elif os.getenv(ENV_RASPBERRYPI4) == "true":
302+
codec_map = HWACCEL_RPI4_DECODER_CODEC_MAP
303+
elif os.getenv(ENV_JETSON_NANO) == "true":
304+
codec_map = HWACCEL_JETSON_NANO_DECODER_CODEC_MAP
305+
elif os.getenv(ENV_CUDA_SUPPORTED) == "true":
306+
codec_map = HWACCEL_CUDA_DECODER_CODEC_MAP
307+
if codec_map:
308+
codec = codec_map.get(stream_codec, None)
309+
298310
if codec:
299311
return ["-c:v", codec]
300312
return []
301313

302-
def get_encoder_codec(self):
314+
def get_encoder_codec(self) -> list[str]:
303315
"""Return encoder codec set in config."""
304316
return ["-c:v", self._config[CONFIG_RECORDER][CONFIG_RECORDER_CODEC]]
305317

306318
def stream_command(
307319
self, stream_config: dict[str, Any], stream_codec: str, stream_url: str
308-
):
320+
) -> list[str]:
309321
"""Return FFmpeg input stream."""
310322
if stream_config[CONFIG_INPUT_ARGS]:
311323
input_args = stream_config[CONFIG_INPUT_ARGS]
@@ -378,7 +390,7 @@ def recorder_audio_filter_args(self) -> list[str] | list:
378390
]
379391
return []
380392

381-
def segment_args(self):
393+
def segment_args(self) -> list[str]:
382394
"""Generate FFmpeg segment args."""
383395
return (
384396
[
@@ -413,7 +425,7 @@ def segment_args(self):
413425
]
414426
)
415427

416-
def filter_args(self):
428+
def filter_args(self) -> list[str]:
417429
"""Return filter arguments."""
418430
filters = self._config[CONFIG_VIDEO_FILTERS].copy()
419431
if self.output_fps < self.fps:
@@ -426,7 +438,7 @@ def filter_args(self):
426438
]
427439
return []
428440

429-
def build_segment_command(self):
441+
def build_segment_command(self) -> list[str]:
430442
"""Return command for writing segments only from main stream.
431443
432444
Only used when a substream is configured.
@@ -446,7 +458,7 @@ def build_segment_command(self):
446458
+ self.segment_args()
447459
)
448460

449-
def build_command(self):
461+
def build_command(self) -> list[str]:
450462
"""Return full FFmpeg command."""
451463
if self._substream:
452464
if self._config[CONFIG_SUBSTREAM][CONFIG_RAW_COMMAND]:
@@ -478,7 +490,7 @@ def build_command(self):
478490
+ self.output_args
479491
)
480492

481-
def pipe(self):
493+
def pipe(self) -> RestartablePopen:
482494
"""Return subprocess pipe for FFmpeg."""
483495
try:
484496
if self._log_pipe:
@@ -519,8 +531,19 @@ def start_pipe(self) -> None:
519531

520532
def close_pipe(self) -> None:
521533
"""Close FFmpeg pipe."""
534+
self._logger.debug("Closing pipe")
522535
if self.segment_process:
523-
self.segment_process.terminate()
536+
self._logger.debug("Terminating segment process")
537+
try:
538+
self.segment_process.terminate()
539+
try:
540+
self.segment_process.communicate(timeout=5)
541+
except sp.TimeoutExpired:
542+
self._logger.debug("FFmpeg did not terminate, killing instead.")
543+
self.segment_process.kill()
544+
self.segment_process.communicate()
545+
except (AttributeError, OSError) as error:
546+
self._logger.error("Failed to close segment process: %s", error)
524547

525548
if self._pipe:
526549
try:
@@ -541,21 +564,22 @@ def close_pipe(self) -> None:
541564
except OSError as error:
542565
self._logger.error("Failed to close log pipe: %s", error)
543566

544-
def poll(self):
567+
def poll(self) -> int | None:
545568
"""Poll pipe."""
546569
if self._pipe:
547570
return self._pipe.poll()
571+
return None
548572

549573
def read(self) -> bytes | None:
550574
"""Return a single frame from FFmpeg pipe."""
551575
try:
552576
if self._pipe and self._pipe.stdout:
553577
return self._pipe.stdout.read(self.frame_bytes_size)
554-
except Exception as err: # pylint: disable=broad-except
555-
self._logger.error(f"Error reading frame from pipe: {err}")
578+
except Exception: # pylint: disable=broad-except
579+
self._logger.exception("Error reading frame from pipe")
556580
return None
557581

558-
def record_only(self):
582+
def record_only(self) -> None:
559583
"""Record only the stream."""
560584
self._logger.debug(
561585
f"Recording only stream: {' '.join(self.build_segment_command())}"
@@ -689,7 +713,7 @@ def run_ffprobe(
689713
stdout,
690714
) from error
691715

692-
if output.get("error", None):
716+
if output.get("error"):
693717
raise FFprobeError(
694718
output,
695719
)

0 commit comments

Comments
 (0)