Skip to content

Migrate to using aiohttp #115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jun 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 84 additions & 57 deletions onvif/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@
import datetime as dt
import logging
import os.path
from typing import Any
from collections.abc import Callable
import httpx
from httpx import AsyncClient, BasicAuth, DigestAuth
from typing import Any

import zeep.helpers
from zeep.cache import SqliteCache
from zeep.client import AsyncClient as BaseZeepAsyncClient
import zeep.helpers
from zeep.proxy import AsyncServiceProxy
from zeep.transports import AsyncTransport
from zeep.wsdl import Document
from zeep.wsse.username import UsernameToken

import aiohttp
import httpx
from aiohttp import BasicAuth, ClientSession, DigestAuthMiddleware, TCPConnector
from onvif.definition import SERVICES
from onvif.exceptions import ONVIFAuthError, ONVIFError, ONVIFTimeoutError
from requests import Response

from .const import KEEPALIVE_EXPIRY
from .managers import NotificationManager, PullPointManager
Expand All @@ -29,13 +31,14 @@
from .util import (
create_no_verify_ssl_context,
normalize_url,
obscure_user_pass_url,
path_isfile,
utcnow,
strip_user_pass_url,
obscure_user_pass_url,
utcnow,
)
from .wrappers import retry_connection_error # noqa: F401
from .wrappers import retry_connection_error
from .wsa import WsAddressingIfMissingPlugin
from .zeep_aiohttp import AIOHTTPTransport

logger = logging.getLogger("onvif")
logging.basicConfig(level=logging.INFO)
Expand All @@ -48,7 +51,7 @@
_CONNECT_TIMEOUT = 30
_READ_TIMEOUT = 90
_WRITE_TIMEOUT = 90
_HTTPX_LIMITS = httpx.Limits(keepalive_expiry=KEEPALIVE_EXPIRY)
# Keepalive is set on the connector, not in ClientTimeout
_NO_VERIFY_SSL_CONTEXT = create_no_verify_ssl_context()


Expand All @@ -59,7 +62,7 @@
try:
return func(*args, **kwargs)
except Exception as err:
raise ONVIFError(err)
raise ONVIFError(err) from err

Check warning on line 65 in onvif/client.py

View check run for this annotation

Codecov / codecov/patch

onvif/client.py#L65

Added line #L65 was not covered by tests

return wrapped

Expand Down Expand Up @@ -102,20 +105,28 @@
return original_load(self, *args, **kwargs)


class AsyncTransportProtocolErrorHandler(AsyncTransport):
"""Retry on remote protocol error.
class AsyncTransportProtocolErrorHandler(AIOHTTPTransport):
"""
Retry on remote protocol error.

http://datatracker.ietf.org/doc/html/rfc2616#section-8.1.4 allows the server
# to close the connection at any time, we treat this as a normal and try again
# once since
"""

@retry_connection_error(attempts=2, exception=httpx.RemoteProtocolError)
async def post(self, address, message, headers):
@retry_connection_error(attempts=2, exception=aiohttp.ServerDisconnectedError)
async def post(
self, address: str, message: str, headers: dict[str, str]
) -> httpx.Response:
return await super().post(address, message, headers)

@retry_connection_error(attempts=2, exception=httpx.RemoteProtocolError)
async def get(self, address, params, headers):
@retry_connection_error(attempts=2, exception=aiohttp.ServerDisconnectedError)
async def get(
self,
address: str,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> Response:
return await super().get(address, params, headers)


Expand Down Expand Up @@ -162,17 +173,18 @@
self.set_ns_prefix("wsa", "http://www.w3.org/2005/08/addressing")

def create_service(self, binding_name, address):
"""Create a new ServiceProxy for the given binding name and address.
"""
Create a new ServiceProxy for the given binding name and address.
:param binding_name: The QName of the binding
:param address: The address of the endpoint
"""
try:
binding = self.wsdl.bindings[binding_name]
except KeyError:
raise ValueError(
"No binding found with the given QName. Available bindings "
"are: %s" % (", ".join(self.wsdl.bindings.keys()))
)
f"No binding found with the given QName. Available bindings "
f"are: {', '.join(self.wsdl.bindings.keys())}"
) from None
return AsyncServiceProxy(self, binding, address=address)


Expand Down Expand Up @@ -223,7 +235,7 @@
write_timeout: int | None = None,
) -> None:
if not path_isfile(url):
raise ONVIFError("%s doesn`t exist!" % url)
raise ONVIFError(f"{url} doesn`t exist!")

Check warning on line 238 in onvif/client.py

View check run for this annotation

Codecov / codecov/patch

onvif/client.py#L238

Added line #L238 was not covered by tests

self.url = url
self.xaddr = xaddr
Expand All @@ -236,26 +248,28 @@
self.dt_diff = dt_diff
self.binding_name = binding_name
# Create soap client
timeouts = httpx.Timeout(
_DEFAULT_TIMEOUT,
connect=_CONNECT_TIMEOUT,
read=read_timeout or _READ_TIMEOUT,
write=write_timeout or _WRITE_TIMEOUT,
)
client = AsyncClient(
verify=_NO_VERIFY_SSL_CONTEXT, timeout=timeouts, limits=_HTTPX_LIMITS
connector = TCPConnector(
ssl=_NO_VERIFY_SSL_CONTEXT,
keepalive_timeout=KEEPALIVE_EXPIRY,
)
# The wsdl client should never actually be used, but it is required
# to avoid creating another ssl context since the underlying code
# will try to create a new one if it doesn't exist.
wsdl_client = httpx.Client(
verify=_NO_VERIFY_SSL_CONTEXT, timeout=timeouts, limits=_HTTPX_LIMITS
session = ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(
total=_DEFAULT_TIMEOUT,
connect=_CONNECT_TIMEOUT,
sock_read=read_timeout or _READ_TIMEOUT,
),
)
self.transport = (
AsyncTransportProtocolErrorHandler(client=client, wsdl_client=wsdl_client)
AsyncTransportProtocolErrorHandler(
session=session,
verify_ssl=False,
)
if no_cache
else AsyncTransportProtocolErrorHandler(
client=client, wsdl_client=wsdl_client, cache=SqliteCache()
else AIOHTTPTransport(
session=session,
verify_ssl=False,
cache=SqliteCache(),
)
)
self.document: Document | None = None
Expand Down Expand Up @@ -399,7 +413,8 @@
self.to_dict = ONVIFService.to_dict

self._snapshot_uris = {}
self._snapshot_client = AsyncClient(verify=_NO_VERIFY_SSL_CONTEXT)
self._snapshot_connector = TCPConnector(ssl=_NO_VERIFY_SSL_CONTEXT)
self._snapshot_client = ClientSession(connector=self._snapshot_connector)

async def get_capabilities(self) -> dict[str, Any]:
"""Get device capabilities."""
Expand Down Expand Up @@ -531,7 +546,8 @@

async def close(self) -> None:
"""Close all transports."""
await self._snapshot_client.aclose()
await self._snapshot_client.close()
await self._snapshot_connector.close()
for service in self.services.values():
await service.close()

Expand Down Expand Up @@ -572,42 +588,53 @@
if uri is None:
return None

auth = None
auth: BasicAuth | None = None
middlewares: tuple[DigestAuthMiddleware, ...] | None = None

if self.user and self.passwd:
if basic_auth:
auth = BasicAuth(self.user, self.passwd)
else:
auth = DigestAuth(self.user, self.passwd)
# Use DigestAuthMiddleware for digest auth
middlewares = (DigestAuthMiddleware(self.user, self.passwd),)

response = await self._try_snapshot_uri(uri, auth)
response = await self._try_snapshot_uri(uri, auth=auth, middlewares=middlewares)
content = await response.read()

# If the request fails with a 401, make sure to strip any
# sample user/pass from the URL and try again
# If the request fails with a 401, strip user/pass from URL and retry
if (
response.status_code == 401
response.status == 401
and (stripped_uri := strip_user_pass_url(uri))
and stripped_uri != uri
):
response = await self._try_snapshot_uri(stripped_uri, auth)
response = await self._try_snapshot_uri(
stripped_uri, auth=auth, middlewares=middlewares
)
content = await response.read()

if response.status_code == 401:
if response.status == 401:
raise ONVIFAuthError(f"Failed to authenticate to {uri}")

if response.status_code < 300:
return response.content
if response.status < 300:
return content

return None

async def _try_snapshot_uri(
self, uri: str, auth: BasicAuth | DigestAuth | None
) -> httpx.Response:
self,
uri: str,
auth: BasicAuth | None = None,
middlewares: tuple[DigestAuthMiddleware, ...] | None = None,
) -> aiohttp.ClientResponse:
try:
return await self._snapshot_client.get(uri, auth=auth)
except httpx.TimeoutException as error:
return await self._snapshot_client.get(
uri, auth=auth, middlewares=middlewares
)
except TimeoutError as error:
raise ONVIFTimeoutError(
f"Timed out fetching {obscure_user_pass_url(uri)}: {error}"
) from error
except httpx.RequestError as error:
except aiohttp.ClientError as error:
raise ONVIFError(
f"Error fetching {obscure_user_pass_url(uri)}: {error}"
) from error
Expand All @@ -618,7 +645,7 @@
"""Returns xaddr and wsdl of specified service"""
# Check if the service is supported
if name not in SERVICES:
raise ONVIFError("Unknown service %s" % name)
raise ONVIFError(f"Unknown service {name}")

Check warning on line 648 in onvif/client.py

View check run for this annotation

Codecov / codecov/patch

onvif/client.py#L648

Added line #L648 was not covered by tests
wsdl_file = SERVICES[name]["wsdl"]
namespace = SERVICES[name]["ns"]

Expand All @@ -629,14 +656,14 @@

wsdlpath = os.path.join(self.wsdl_dir, wsdl_file)
if not path_isfile(wsdlpath):
raise ONVIFError("No such file: %s" % wsdlpath)
raise ONVIFError(f"No such file: {wsdlpath}")

Check warning on line 659 in onvif/client.py

View check run for this annotation

Codecov / codecov/patch

onvif/client.py#L659

Added line #L659 was not covered by tests

# XAddr for devicemgmt is fixed:
if name == "devicemgmt":
xaddr = "{}:{}/onvif/device_service".format(
self.host
if (self.host.startswith("http://") or self.host.startswith("https://"))
else "http://%s" % self.host,
else f"http://{self.host}",
self.port,
)
return xaddr, wsdlpath, binding_name
Expand Down
22 changes: 12 additions & 10 deletions onvif/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@

from __future__ import annotations

from abc import abstractmethod
import asyncio
import datetime as dt
import logging
from typing import TYPE_CHECKING, Any
from abc import abstractmethod
from collections.abc import Callable
from typing import TYPE_CHECKING, Any

import httpx
from httpx import TransportError
from zeep.exceptions import Fault, XMLParseError, XMLSyntaxError
from zeep.loader import parse_xml
from zeep.wsdl.bindings.soap import SoapOperation

import aiohttp
from onvif.exceptions import ONVIFError

from .settings import DEFAULT_SETTINGS
Expand All @@ -27,8 +26,8 @@

_RENEWAL_PERCENTAGE = 0.8

SUBSCRIPTION_ERRORS = (Fault, asyncio.TimeoutError, TransportError)
RENEW_ERRORS = (ONVIFError, httpx.RequestError, XMLParseError, *SUBSCRIPTION_ERRORS)
SUBSCRIPTION_ERRORS = (Fault, asyncio.TimeoutError, aiohttp.ClientError)
RENEW_ERRORS = (ONVIFError, aiohttp.ClientError, XMLParseError, *SUBSCRIPTION_ERRORS)
SUBSCRIPTION_RESTART_INTERVAL_ON_ERROR = dt.timedelta(seconds=40)

# If the camera returns a subscription with a termination time that is less than
Expand Down Expand Up @@ -87,7 +86,8 @@
await self._subscription.Unsubscribe()

async def shutdown(self) -> None:
"""Shutdown the manager.
"""
Shutdown the manager.

This method is irreversible.
"""
Expand All @@ -105,7 +105,7 @@
"""Set the synchronization point."""
try:
await self._service.SetSynchronizationPoint()
except (Fault, asyncio.TimeoutError, TransportError, TypeError):
except (TimeoutError, Fault, aiohttp.ClientError, TypeError):

Check warning on line 108 in onvif/managers.py

View check run for this annotation

Codecov / codecov/patch

onvif/managers.py#L108

Added line #L108 was not covered by tests
logger.debug("%s: SetSynchronizationPoint failed", self._service.url)

def _cancel_renewals(self) -> None:
Expand Down Expand Up @@ -214,7 +214,8 @@
super().__init__(device, interval, subscription_lost_callback)

async def _start(self) -> float:
"""Start the notification processor.
"""
Start the notification processor.

Returns the next renewal call at time.
"""
Expand Down Expand Up @@ -290,7 +291,8 @@
"""Manager for PullPoint."""

async def _start(self) -> float:
"""Start the PullPoint manager.
"""
Start the PullPoint manager.

Returns the next renewal call at time.
"""
Expand Down
Loading