Skip to content

Commit 485e561

Browse files
committed
[project] Use import typing as ty everywhere
1 parent 7dee4b9 commit 485e561

17 files changed

+160
-162
lines changed

scenedetect/backends/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@
8383
# TODO: Future VideoStream implementations under consideration:
8484
# - Nvidia VPF: https://developer.nvidia.com/blog/vpf-hardware-accelerated-video-processing-framework-in-python/
8585

86-
from typing import Dict, Type
86+
import typing as ty
8787

8888
# OpenCV must be available at minimum.
8989
from scenedetect.backends.opencv import VideoCaptureAdapter, VideoStreamCv2
@@ -100,7 +100,7 @@
100100

101101
# TODO: Lazy-loading backends would improve startup performance. However, this requires removing
102102
# some of the re-exported types above from the public API.
103-
AVAILABLE_BACKENDS: Dict[str, Type] = {
103+
AVAILABLE_BACKENDS: ty.Dict[str, ty.Type] = {
104104
backend.BACKEND_NAME: backend
105105
for backend in filter(
106106
None,
@@ -114,5 +114,5 @@
114114
"""All available backends that :func:`scenedetect.open_video` can consider for the `backend`
115115
parameter. These backends must support construction with the following signature:
116116
117-
BackendType(path: str, framerate: Optional[float])
117+
BackendType(path: str, framerate: ty.Optional[float])
118118
"""

scenedetect/backends/moviepy.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
image sequences or AviSynth scripts are supported as inputs.
1717
"""
1818

19+
import typing as ty
1920
from logging import getLogger
20-
from typing import AnyStr, Optional, Tuple, Union
2121

2222
import cv2
2323
import numpy as np
@@ -34,7 +34,9 @@
3434
class VideoStreamMoviePy(VideoStream):
3535
"""MoviePy `FFMPEG_VideoReader` backend."""
3636

37-
def __init__(self, path: AnyStr, framerate: Optional[float] = None, print_infos: bool = False):
37+
def __init__(
38+
self, path: ty.AnyStr, framerate: ty.Optional[float] = None, print_infos: bool = False
39+
):
3840
"""Open a video or device.
3941
4042
Arguments:
@@ -64,8 +66,8 @@ def __init__(self, path: AnyStr, framerate: Optional[float] = None, print_infos:
6466
# This will always be one behind self._reader.lastread when we finally call read()
6567
# as MoviePy caches the first frame when opening the video. Thus self._last_frame
6668
# will always be the current frame, and self._reader.lastread will be the next.
67-
self._last_frame: Union[bool, np.ndarray] = False
68-
self._last_frame_rgb: Optional[np.ndarray] = None
69+
self._last_frame: ty.Union[bool, np.ndarray] = False
70+
self._last_frame_rgb: ty.Optional[np.ndarray] = None
6971
# Older versions don't track the video position when calling read_frame so we need
7072
# to keep track of the current frame number.
7173
self._frame_number = 0
@@ -86,7 +88,7 @@ def frame_rate(self) -> float:
8688
return self._reader.fps
8789

8890
@property
89-
def path(self) -> Union[bytes, str]:
91+
def path(self) -> ty.Union[bytes, str]:
9092
"""Video path."""
9193
return self._path
9294

@@ -101,12 +103,12 @@ def is_seekable(self) -> bool:
101103
return True
102104

103105
@property
104-
def frame_size(self) -> Tuple[int, int]:
106+
def frame_size(self) -> ty.Tuple[int, int]:
105107
"""Size of each video frame in pixels as a tuple of (width, height)."""
106108
return tuple(self._reader.infos["video_size"])
107109

108110
@property
109-
def duration(self) -> Optional[FrameTimecode]:
111+
def duration(self) -> ty.Optional[FrameTimecode]:
110112
"""Duration of the stream as a FrameTimecode, or None if non terminating."""
111113
assert isinstance(self._reader.infos["duration"], float)
112114
return self.base_timecode + self._reader.infos["duration"]
@@ -155,7 +157,7 @@ def frame_number(self) -> int:
155157
This method will always return 0 if no frames have been `read`."""
156158
return self._frame_number
157159

158-
def seek(self, target: Union[FrameTimecode, float, int]):
160+
def seek(self, target: ty.Union[FrameTimecode, float, int]):
159161
"""Seek to the given timecode. If given as a frame number, represents the current seek
160162
pointer (e.g. if seeking to 0, the next frame decoded will be the first frame of the video).
161163
@@ -207,7 +209,7 @@ def reset(self, print_infos=False):
207209
self._eof = False
208210
self._reader = FFMPEG_VideoReader(self._path, print_infos=print_infos)
209211

210-
def read(self, decode: bool = True, advance: bool = True) -> Union[np.ndarray, bool]:
212+
def read(self, decode: bool = True, advance: bool = True) -> ty.Union[np.ndarray, bool]:
211213
"""Read and decode the next frame as a np.ndarray. Returns False when video ends.
212214
213215
Arguments:

scenedetect/backends/opencv.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import math
2121
import os.path
22+
import typing as ty
2223
from logging import getLogger
23-
from typing import AnyStr, Optional, Tuple, Union
2424

2525
import cv2
2626
import numpy as np
@@ -58,10 +58,10 @@ class VideoStreamCv2(VideoStream):
5858

5959
def __init__(
6060
self,
61-
path: AnyStr = None,
62-
framerate: Optional[float] = None,
61+
path: ty.AnyStr = None,
62+
framerate: ty.Optional[float] = None,
6363
max_decode_attempts: int = 5,
64-
path_or_device: Union[bytes, str, int] = None,
64+
path_or_device: ty.Union[bytes, str, int] = None,
6565
):
6666
"""Open a video file, image sequence, or network stream.
6767
@@ -98,10 +98,10 @@ def __init__(
9898
self._is_device = isinstance(self._path_or_device, int)
9999

100100
# Initialized in _open_capture:
101-
self._cap: Optional[cv2.VideoCapture] = (
101+
self._cap: ty.Optional[cv2.VideoCapture] = (
102102
None # Reference to underlying cv2.VideoCapture object.
103103
)
104-
self._frame_rate: Optional[float] = None
104+
self._frame_rate: ty.Optional[float] = None
105105

106106
# VideoCapture state
107107
self._has_grabbed = False
@@ -140,7 +140,7 @@ def frame_rate(self) -> float:
140140
return self._frame_rate
141141

142142
@property
143-
def path(self) -> Union[bytes, str]:
143+
def path(self) -> ty.Union[bytes, str]:
144144
"""Video or device path."""
145145
if self._is_device:
146146
assert isinstance(self._path_or_device, (int))
@@ -168,15 +168,15 @@ def is_seekable(self) -> bool:
168168
return not self._is_device
169169

170170
@property
171-
def frame_size(self) -> Tuple[int, int]:
171+
def frame_size(self) -> ty.Tuple[int, int]:
172172
"""Size of each video frame in pixels as a tuple of (width, height)."""
173173
return (
174174
math.trunc(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
175175
math.trunc(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
176176
)
177177

178178
@property
179-
def duration(self) -> Optional[FrameTimecode]:
179+
def duration(self) -> ty.Optional[FrameTimecode]:
180180
"""Duration of the stream as a FrameTimecode, or None if non terminating."""
181181
if self._is_device:
182182
return None
@@ -218,7 +218,7 @@ def frame_number(self) -> int:
218218
This method will always return 0 if no frames have been `read`."""
219219
return math.trunc(self._cap.get(cv2.CAP_PROP_POS_FRAMES))
220220

221-
def seek(self, target: Union[FrameTimecode, float, int]):
221+
def seek(self, target: ty.Union[FrameTimecode, float, int]):
222222
"""Seek to the given timecode. If given as a frame number, represents the current seek
223223
pointer (e.g. if seeking to 0, the next frame decoded will be the first frame of the video).
224224
@@ -264,7 +264,7 @@ def reset(self):
264264
self._cap.release()
265265
self._open_capture(self._frame_rate)
266266

267-
def read(self, decode: bool = True, advance: bool = True) -> Union[np.ndarray, bool]:
267+
def read(self, decode: bool = True, advance: bool = True) -> ty.Union[np.ndarray, bool]:
268268
"""Read and decode the next frame as a np.ndarray. Returns False when video ends,
269269
or the maximum number of decode attempts has passed.
270270
@@ -308,7 +308,7 @@ def read(self, decode: bool = True, advance: bool = True) -> Union[np.ndarray, b
308308
# Private Methods
309309
#
310310

311-
def _open_capture(self, framerate: Optional[float] = None):
311+
def _open_capture(self, framerate: ty.Optional[float] = None):
312312
"""Opens capture referenced by this object and resets internal state."""
313313
if self._is_device and self._path_or_device < 0:
314314
raise ValueError("Invalid/negative device ID specified.")
@@ -364,7 +364,7 @@ class VideoCaptureAdapter(VideoStream):
364364
def __init__(
365365
self,
366366
cap: cv2.VideoCapture,
367-
framerate: Optional[float] = None,
367+
framerate: ty.Optional[float] = None,
368368
max_read_attempts: int = 5,
369369
):
370370
"""Create from an existing OpenCV VideoCapture object. Used for webcams, live streams,
@@ -448,15 +448,15 @@ def is_seekable(self) -> bool:
448448
return False
449449

450450
@property
451-
def frame_size(self) -> Tuple[int, int]:
451+
def frame_size(self) -> ty.Tuple[int, int]:
452452
"""Reported size of each video frame in pixels as a tuple of (width, height)."""
453453
return (
454454
math.trunc(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
455455
math.trunc(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
456456
)
457457

458458
@property
459-
def duration(self) -> Optional[FrameTimecode]:
459+
def duration(self) -> ty.Optional[FrameTimecode]:
460460
"""Duration of the stream as a FrameTimecode, or None if non terminating."""
461461
frame_count = math.trunc(self._cap.get(cv2.CAP_PROP_FRAME_COUNT))
462462
if frame_count > 0:
@@ -500,15 +500,15 @@ def frame_number(self) -> int:
500500
This method will always return 0 if no frames have been `read`."""
501501
return self._num_frames
502502

503-
def seek(self, target: Union[FrameTimecode, float, int]):
503+
def seek(self, target: ty.Union[FrameTimecode, float, int]):
504504
"""The underlying VideoCapture is assumed to not support seeking."""
505505
raise NotImplementedError("Seeking is not supported.")
506506

507507
def reset(self):
508508
"""Not supported."""
509509
raise NotImplementedError("Reset is not supported.")
510510

511-
def read(self, decode: bool = True, advance: bool = True) -> Union[np.ndarray, bool]:
511+
def read(self, decode: bool = True, advance: bool = True) -> ty.Union[np.ndarray, bool]:
512512
"""Read and decode the next frame as a np.ndarray. Returns False when video ends,
513513
or the maximum number of decode attempts has passed.
514514

scenedetect/backends/pyav.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
#
1212
""":class:`VideoStreamAv` provides an adapter for the PyAV av.InputContainer object."""
1313

14+
import typing as ty
1415
from logging import getLogger
15-
from typing import AnyStr, BinaryIO, Optional, Tuple, Union
1616

1717
import av
1818
import numpy as np
@@ -35,10 +35,10 @@ class VideoStreamAv(VideoStream):
3535
# calculates the end time.
3636
def __init__(
3737
self,
38-
path_or_io: Union[AnyStr, BinaryIO],
39-
framerate: Optional[float] = None,
40-
name: Optional[str] = None,
41-
threading_mode: Optional[str] = None,
38+
path_or_io: ty.Union[ty.AnyStr, ty.BinaryIO],
39+
framerate: ty.Optional[float] = None,
40+
name: ty.Optional[str] = None,
41+
threading_mode: ty.Optional[str] = None,
4242
suppress_output: bool = False,
4343
):
4444
"""Open a video by path.
@@ -147,12 +147,12 @@ def __del__(self):
147147
"""Unique name used to identify this backend."""
148148

149149
@property
150-
def path(self) -> Union[bytes, str]:
150+
def path(self) -> ty.Union[bytes, str]:
151151
"""Video path."""
152152
return self._path
153153

154154
@property
155-
def name(self) -> Union[bytes, str]:
155+
def name(self) -> ty.Union[bytes, str]:
156156
"""Name of the video, without extension."""
157157
return self._name
158158

@@ -162,7 +162,7 @@ def is_seekable(self) -> bool:
162162
return self._io.seekable()
163163

164164
@property
165-
def frame_size(self) -> Tuple[int, int]:
165+
def frame_size(self) -> ty.Tuple[int, int]:
166166
"""Size of each video frame in pixels as a tuple of (width, height)."""
167167
return (self._codec_context.width, self._codec_context.height)
168168

@@ -219,7 +219,7 @@ def aspect_ratio(self) -> float:
219219
frame_aspect_ratio = self.frame_size[0] / self.frame_size[1]
220220
return display_aspect_ratio / frame_aspect_ratio
221221

222-
def seek(self, target: Union[FrameTimecode, float, int]) -> None:
222+
def seek(self, target: ty.Union[FrameTimecode, float, int]) -> None:
223223
"""Seek to the given timecode. If given as a frame number, represents the current seek
224224
pointer (e.g. if seeking to 0, the next frame decoded will be the first frame of the video).
225225
@@ -263,7 +263,7 @@ def reset(self):
263263
except Exception as ex:
264264
raise VideoOpenFailure() from ex
265265

266-
def read(self, decode: bool = True, advance: bool = True) -> Union[np.ndarray, bool]:
266+
def read(self, decode: bool = True, advance: bool = True) -> ty.Union[np.ndarray, bool]:
267267
"""Read and decode the next frame as a np.ndarray. Returns False when video ends.
268268
269269
Arguments:

scenedetect/detectors/adaptive_detector.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
This detector is available from the command-line as the `detect-adaptive` command.
1717
"""
1818

19+
import typing as ty
1920
from logging import getLogger
20-
from typing import List, Optional
2121

2222
import numpy as np
2323

@@ -42,9 +42,9 @@ def __init__(
4242
min_content_val: float = 15.0,
4343
weights: ContentDetector.Components = ContentDetector.DEFAULT_COMPONENT_WEIGHTS,
4444
luma_only: bool = False,
45-
kernel_size: Optional[int] = None,
45+
kernel_size: ty.Optional[int] = None,
4646
video_manager=None,
47-
min_delta_hsv: Optional[float] = None,
47+
min_delta_hsv: ty.Optional[float] = None,
4848
):
4949
"""
5050
Arguments:
@@ -98,7 +98,7 @@ def __init__(
9898
self._first_frame_num = None
9999

100100
# NOTE: This must be different than `self._last_scene_cut` which is used by the base class.
101-
self._last_cut: Optional[int] = None
101+
self._last_cut: ty.Optional[int] = None
102102

103103
self._buffer = []
104104

@@ -107,15 +107,15 @@ def event_buffer_length(self) -> int:
107107
"""Number of frames any detected cuts will be behind the current frame due to buffering."""
108108
return self.window_width
109109

110-
def get_metrics(self) -> List[str]:
110+
def get_metrics(self) -> ty.List[str]:
111111
"""Combines base ContentDetector metric keys with the AdaptiveDetector one."""
112112
return super().get_metrics() + [self._adaptive_ratio_key]
113113

114114
def stats_manager_required(self) -> bool:
115115
"""Not required for AdaptiveDetector."""
116116
return False
117117

118-
def process_frame(self, frame_num: int, frame_img: Optional[np.ndarray]) -> List[int]:
118+
def process_frame(self, frame_num: int, frame_img: ty.Optional[np.ndarray]) -> ty.List[int]:
119119
"""Process the next frame. `frame_num` is assumed to be sequential.
120120
121121
Args:
@@ -124,7 +124,7 @@ def process_frame(self, frame_num: int, frame_img: Optional[np.ndarray]) -> List
124124
frame_img (numpy.ndarray or None): Video frame corresponding to `frame_img`.
125125
126126
Returns:
127-
List[int]: List of frames where scene cuts have been detected. There may be 0
127+
ty.List[int]: List of frames where scene cuts have been detected. There may be 0
128128
or more frames in the list, and not necessarily the same as frame_num.
129129
"""
130130

@@ -168,7 +168,7 @@ def process_frame(self, frame_num: int, frame_img: Optional[np.ndarray]) -> List
168168
return [target_frame]
169169
return []
170170

171-
def get_content_val(self, frame_num: int) -> Optional[float]:
171+
def get_content_val(self, frame_num: int) -> ty.Optional[float]:
172172
"""Returns the average content change for a frame."""
173173
# TODO(v0.7): Add DeprecationWarning that `get_content_val` will be removed in v0.7.
174174
logger.error(

0 commit comments

Comments
 (0)