Skip to content

Commit 48781ac

Browse files
authored
Fine tune timeouts to reduce races (#29)
1 parent 4171285 commit 48781ac

File tree

1 file changed

+111
-22
lines changed

1 file changed

+111
-22
lines changed

onvif/client.py

Lines changed: 111 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import logging
77
import os.path
88
import ssl
9-
from typing import Any, Callable, Dict, Optional, Tuple
9+
from typing import Any, Awaitable, Callable, Dict, Optional, ParamSpec, Tuple, TypeVar
1010

1111
import httpx
1212
from httpx import AsyncClient, BasicAuth, DigestAuth, TransportError
@@ -35,6 +35,70 @@
3535

3636
_WSDL_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "wsdl")
3737

38+
_DEFAULT_TIMEOUT = 30
39+
_PULLPOINT_TIMEOUT = 90
40+
_CONNECT_TIMEOUT = 30
41+
_READ_TIMEOUT = 30
42+
_WRITE_TIMEOUT = 30
43+
44+
45+
KEEPALIVE_EXPIRY = 4
46+
BACKOFF_TIME = KEEPALIVE_EXPIRY + 0.5
47+
HTTPX_LIMITS = httpx.Limits(keepalive_expiry=4)
48+
49+
50+
DEFAULT_ATTEMPTS = 2
51+
52+
P = ParamSpec("P")
53+
T = TypeVar("T")
54+
55+
56+
def retry_connection_error(
57+
attempts: int = DEFAULT_ATTEMPTS,
58+
) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]:
59+
"""Define a wrapper to retry on connection error."""
60+
61+
def _decorator_retry_connection_error(
62+
func: Callable[P, Awaitable[T]]
63+
) -> Callable[P, Awaitable[T]]:
64+
"""Define a wrapper to retry on connection error.
65+
66+
The remote server is allowed to disconnect us any time so
67+
we need to retry the operation.
68+
"""
69+
70+
async def _async_wrap_connection_error_retry( # type: ignore[return]
71+
*args: P.args, **kwargs: P.kwargs
72+
) -> T:
73+
for attempt in range(attempts):
74+
try:
75+
return await func(*args, **kwargs)
76+
except httpx.RequestError as ex:
77+
#
78+
# We should only need to retry on RemoteProtocolError but some cameras
79+
# are flakey and sometimes do not respond to the Renew request so we
80+
# retry on RequestError as well.
81+
#
82+
# For RemoteProtocolError:
83+
# http://datatracker.ietf.org/doc/html/rfc2616#section-8.1.4 allows the server
84+
# to close the connection at any time, we treat this as a normal and try again
85+
# once since we do not want to declare the camera as not supporting PullPoint
86+
# if it just happened to close the connection at the wrong time.
87+
if attempt == attempts - 1:
88+
raise
89+
logger.debug(
90+
"Error: %s while calling %s, backing off: %s, retrying...",
91+
ex,
92+
func,
93+
BACKOFF_TIME,
94+
exc_info=True,
95+
)
96+
await asyncio.sleep(BACKOFF_TIME)
97+
98+
return _async_wrap_connection_error_retry
99+
100+
return _decorator_retry_connection_error
101+
38102

39103
def create_no_verify_ssl_context() -> ssl.SSLContext:
40104
"""Return an SSL context that does not verify the server certificate.
@@ -186,6 +250,8 @@ def __init__(
186250
dt_diff=None,
187251
binding_name="",
188252
binding_key="",
253+
read_timeout: Optional[int] = None,
254+
write_timeout: Optional[int] = None,
189255
) -> None:
190256
if not _path_isfile(url):
191257
raise ONVIFError("%s doesn`t exist!" % url)
@@ -201,11 +267,21 @@ def __init__(
201267
self.dt_diff = dt_diff
202268
self.binding_name = binding_name
203269
# Create soap client
204-
client = AsyncClient(verify=_NO_VERIFY_SSL_CONTEXT, timeout=90)
270+
timeouts = httpx.Timeout(
271+
_DEFAULT_TIMEOUT,
272+
connect=_CONNECT_TIMEOUT,
273+
read=read_timeout or _READ_TIMEOUT,
274+
write=write_timeout or _WRITE_TIMEOUT,
275+
)
276+
client = AsyncClient(
277+
verify=_NO_VERIFY_SSL_CONTEXT, timeout=timeouts, limits=HTTPX_LIMITS
278+
)
205279
# The wsdl client should never actually be used, but it is required
206280
# to avoid creating another ssl context since the underlying code
207281
# will try to create a new one if it doesn't exist.
208-
wsdl_client = httpx.Client(verify=_NO_VERIFY_SSL_CONTEXT, timeout=90)
282+
wsdl_client = httpx.Client(
283+
verify=_NO_VERIFY_SSL_CONTEXT, timeout=timeouts, limits=HTTPX_LIMITS
284+
)
209285
self.transport = (
210286
AsyncTransport(client=client, wsdl_client=wsdl_client)
211287
if no_cache
@@ -483,13 +559,13 @@ def create_notification_manager(
483559
"""Create a notification manager."""
484560
return NotificationManager(self, config)
485561

486-
async def close(self):
562+
async def close(self) -> None:
487563
"""Close all transports."""
488564
await self._snapshot_client.aclose()
489565
for service in self.services.values():
490566
await service.close()
491567

492-
async def get_snapshot_uri(self, profile_token):
568+
async def get_snapshot_uri(self, profile_token: str) -> str:
493569
"""Get the snapshot uri for a given profile."""
494570
uri = self._snapshot_uris.get(profile_token)
495571
if uri is None:
@@ -501,7 +577,9 @@ async def get_snapshot_uri(self, profile_token):
501577
self._snapshot_uris[profile_token] = uri
502578
return uri
503579

504-
async def get_snapshot(self, profile_token, basic_auth=False):
580+
async def get_snapshot(
581+
self, profile_token: str, basic_auth: bool = False
582+
) -> Optional[bytes]:
505583
"""Get a snapshot image from the camera."""
506584
uri = await self.get_snapshot_uri(profile_token)
507585
if uri is None:
@@ -566,7 +644,11 @@ def get_definition(
566644
return xaddr, wsdlpath, binding_name
567645

568646
async def create_onvif_service(
569-
self, name: str, port_type: Optional[str] = None
647+
self,
648+
name: str,
649+
port_type: Optional[str] = None,
650+
read_timeout: Optional[int] = None,
651+
write_timeout: Optional[int] = None,
570652
) -> ONVIFService:
571653
"""Create ONVIF service client"""
572654
name = name.lower()
@@ -603,67 +685,74 @@ async def create_onvif_service(
603685
dt_diff=self.dt_diff,
604686
binding_name=binding_name,
605687
binding_key=binding_key,
688+
read_timeout=read_timeout,
689+
write_timeout=write_timeout,
606690
)
607691
await service.setup()
608692

609693
self.services[binding_key] = service
610694

611695
return service
612696

613-
async def create_devicemgmt_service(self):
697+
async def create_devicemgmt_service(self) -> ONVIFService:
614698
"""Service creation helper."""
615699
return await self.create_onvif_service("devicemgmt")
616700

617-
async def create_media_service(self):
701+
async def create_media_service(self) -> ONVIFService:
618702
"""Service creation helper."""
619703
return await self.create_onvif_service("media")
620704

621-
async def create_ptz_service(self):
705+
async def create_ptz_service(self) -> ONVIFService:
622706
"""Service creation helper."""
623707
return await self.create_onvif_service("ptz")
624708

625-
async def create_imaging_service(self):
709+
async def create_imaging_service(self) -> ONVIFService:
626710
"""Service creation helper."""
627711
return await self.create_onvif_service("imaging")
628712

629-
async def create_deviceio_service(self):
713+
async def create_deviceio_service(self) -> ONVIFService:
630714
"""Service creation helper."""
631715
return await self.create_onvif_service("deviceio")
632716

633-
async def create_events_service(self):
717+
async def create_events_service(self) -> ONVIFService:
634718
"""Service creation helper."""
635719
return await self.create_onvif_service("events")
636720

637-
async def create_analytics_service(self):
721+
async def create_analytics_service(self) -> ONVIFService:
638722
"""Service creation helper."""
639723
return await self.create_onvif_service("analytics")
640724

641-
async def create_recording_service(self):
725+
async def create_recording_service(self) -> ONVIFService:
642726
"""Service creation helper."""
643727
return await self.create_onvif_service("recording")
644728

645-
async def create_search_service(self):
729+
async def create_search_service(self) -> ONVIFService:
646730
"""Service creation helper."""
647731
return await self.create_onvif_service("search")
648732

649-
async def create_replay_service(self):
733+
async def create_replay_service(self) -> ONVIFService:
650734
"""Service creation helper."""
651735
return await self.create_onvif_service("replay")
652736

653-
async def create_pullpoint_service(self):
737+
async def create_pullpoint_service(self) -> ONVIFService:
654738
"""Service creation helper."""
655739
return await self.create_onvif_service(
656-
"pullpoint", port_type="PullPointSubscription"
740+
"pullpoint",
741+
port_type="PullPointSubscription",
742+
read_timeout=_PULLPOINT_TIMEOUT,
743+
write_timeout=_PULLPOINT_TIMEOUT,
657744
)
658745

659-
async def create_notification_service(self):
746+
async def create_notification_service(self) -> ONVIFService:
660747
"""Service creation helper."""
661748
return await self.create_onvif_service("notification")
662749

663-
async def create_subscription_service(self, port_type=None):
750+
async def create_subscription_service(
751+
self, port_type: Optional[str] = None
752+
) -> ONVIFService:
664753
"""Service creation helper."""
665754
return await self.create_onvif_service("subscription", port_type=port_type)
666755

667-
async def create_receiver_service(self):
756+
async def create_receiver_service(self) -> ONVIFService:
668757
"""Service creation helper."""
669758
return await self.create_onvif_service("receiver")

0 commit comments

Comments
 (0)