Skip to content

Commit 626b4c9

Browse files
authored
Add support for webhooks/notifications (#25)
1 parent 282bce7 commit 626b4c9

File tree

1 file changed

+74
-6
lines changed

1 file changed

+74
-6
lines changed

onvif/client.py

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,17 @@
99
from typing import Any, Dict, Optional, Tuple
1010

1111
import httpx
12-
from httpx import AsyncClient, BasicAuth, DigestAuth
12+
from httpx import AsyncClient, BasicAuth, DigestAuth, TransportError
1313
from zeep.cache import SqliteCache
1414
from zeep.client import AsyncClient as BaseZeepAsyncClient, Settings
15-
from zeep.exceptions import Fault
15+
from zeep.exceptions import Fault, XMLSyntaxError
1616
import zeep.helpers
17+
from zeep.loader import parse_xml
1718
from zeep.proxy import AsyncServiceProxy
1819
from zeep.transports import AsyncTransport, Transport
1920
from zeep.wsa import WsAddressingPlugin
2021
from zeep.wsdl import Document
22+
from zeep.wsdl.bindings.soap import SoapOperation
2123
from zeep.wsse.username import UsernameToken
2224

2325
from onvif.definition import SERVICES
@@ -206,9 +208,10 @@ def __init__(
206208
)
207209
)
208210
settings = _DEFAULT_SETTINGS
209-
document = _cached_document(url)
211+
self.document = _cached_document(url)
212+
self.binding_name = binding_name
210213
self.zeep_client_authless = ZeepAsyncClient(
211-
wsdl=document,
214+
wsdl=self.document,
212215
transport=self.transport,
213216
settings=settings,
214217
plugins=[WsAddressingPlugin()],
@@ -217,7 +220,7 @@ def __init__(
217220
binding_name, self.xaddr
218221
)
219222
self.zeep_client = ZeepAsyncClient(
220-
wsdl=document,
223+
wsdl=self.document,
221224
wsse=wsse,
222225
transport=self.transport,
223226
settings=settings,
@@ -287,9 +290,68 @@ def call(params=None):
287290
return service_wrapper(getattr(self.ws_client, name))
288291

289292

293+
class NotificationManager:
294+
"""Manager to process notifications."""
295+
296+
def __init__(self, device: "ONVIFCamera", config: Dict[str, Any]) -> None:
297+
"""Initialize the notification processor."""
298+
self._service: Optional[ONVIFService] = None
299+
self._operation: Optional[SoapOperation] = None
300+
self._device = device
301+
self._config = config
302+
303+
async def setup(self) -> ONVIFService:
304+
"""Setup the notification processor."""
305+
notify_service = self._device.create_notification_service()
306+
notify_subscribe = await notify_service.Subscribe(self._config)
307+
# pylint: disable=protected-access
308+
self._device.xaddrs[
309+
"http://www.onvif.org/ver10/events/wsdl/NotificationConsumer"
310+
] = notify_subscribe.SubscriptionReference.Address._value_1
311+
# Create subscription manager
312+
# 5.2.3 BASIC NOTIFICATION INTERFACE - NOTIFY
313+
# Call SetSynchronizationPoint to generate a notification message
314+
# to ensure the webhooks are working.
315+
#
316+
# If this fails this is OK as it just means we will switch
317+
# to webhook later when the first notification is received.
318+
service = self._device.create_onvif_service(
319+
"pullpoint", port_type="NotificationConsumer"
320+
)
321+
self._operation = service.document.bindings[service.binding_name].get(
322+
"PullMessages"
323+
)
324+
self._service = service
325+
return self._device.create_subscription_service("NotificationConsumer")
326+
327+
async def start(self) -> None:
328+
"""Start the notification processor."""
329+
assert self._service, "Call setup first"
330+
try:
331+
await self._service.SetSynchronizationPoint()
332+
except (Fault, asyncio.TimeoutError, TransportError, TypeError):
333+
logger.debug("%s: SetSynchronizationPoint failed", self._service.url)
334+
335+
def process(self, content: bytes) -> Optional[Any]:
336+
"""Process a notification message."""
337+
if not self._operation:
338+
logger.debug("%s: Notifications not setup", self._device.host)
339+
return
340+
try:
341+
envelope = parse_xml(
342+
content, # type: ignore[arg-type]
343+
_ASYNC_TRANSPORT,
344+
settings=_DEFAULT_SETTINGS,
345+
)
346+
except XMLSyntaxError as exc:
347+
logger.error("Received invalid XML: %s", exc)
348+
return None
349+
return self._operation.process_reply(envelope)
350+
351+
290352
class ONVIFCamera:
291353
"""
292-
Python Implemention ONVIF compliant device
354+
Python Implementation ONVIF compliant device
293355
This class integrates onvif services
294356
295357
adjust_time parameter allows authentication on cameras without being time synchronized.
@@ -392,6 +454,12 @@ async def create_pullpoint_subscription(
392454
return False
393455
return True
394456

457+
def create_notification_manager(
458+
self, config: Optional[Dict[str, Any]] = None
459+
) -> NotificationManager:
460+
"""Create a notification manager."""
461+
return NotificationManager(self, config)
462+
395463
async def close(self):
396464
"""Close all transports."""
397465
await self._snapshot_client.aclose()

0 commit comments

Comments
 (0)