Skip to content

Commit 0482d05

Browse files
authored
Refactor service creation to fix I/O in the event loop (#28)
1 parent c635f7b commit 0482d05

File tree

1 file changed

+82
-65
lines changed

1 file changed

+82
-65
lines changed

onvif/client.py

Lines changed: 82 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import logging
77
import os.path
88
import ssl
9-
from typing import Any, Dict, Optional, Tuple
9+
from typing import Any, Callable, Dict, Optional, Tuple
1010

1111
import httpx
1212
from httpx import AsyncClient, BasicAuth, DigestAuth, TransportError
@@ -33,6 +33,8 @@
3333
_DEFAULT_SETTINGS.strict = False
3434
_DEFAULT_SETTINGS.xml_huge_tree = True
3535

36+
_WSDL_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "wsdl")
37+
3638

3739
def create_no_verify_ssl_context() -> ssl.SSLContext:
3840
"""Return an SSL context that does not verify the server certificate.
@@ -176,24 +178,28 @@ class ONVIFService:
176178
def __init__(
177179
self,
178180
xaddr: str,
179-
user,
180-
passwd,
181-
url,
181+
user: Optional[str],
182+
passwd: Optional[str],
183+
url: str,
182184
encrypt=True,
183185
no_cache=False,
184186
dt_diff=None,
185187
binding_name="",
186188
binding_key="",
187-
):
189+
) -> None:
188190
if not _path_isfile(url):
189191
raise ONVIFError("%s doesn`t exist!" % url)
190192

191193
self.url = url
192194
self.xaddr = xaddr
193195
self.binding_key = binding_key
194-
wsse = UsernameDigestTokenDtDiff(
195-
user, passwd, dt_diff=dt_diff, use_digest=encrypt
196-
)
196+
# Set soap header for authentication
197+
self.user = user
198+
self.passwd = passwd
199+
# Indicate wether password digest is needed
200+
self.encrypt = encrypt
201+
self.dt_diff = dt_diff
202+
self.binding_name = binding_name
197203
# Create soap client
198204
client = AsyncClient(verify=_NO_VERIFY_SSL_CONTEXT, timeout=90)
199205
# The wsdl client should never actually be used, but it is required
@@ -207,9 +213,24 @@ def __init__(
207213
client=client, wsdl_client=wsdl_client, cache=SqliteCache()
208214
)
209215
)
216+
self.document: Optional[Document] = None
217+
self.zeep_client_authless: Optional[ZeepAsyncClient] = None
218+
self.ws_client_authless: Optional[AsyncServiceProxy] = None
219+
self.zeep_client: Optional[ZeepAsyncClient] = None
220+
self.ws_client: Optional[AsyncServiceProxy] = None
221+
self.create_type: Optional[Callable] = None
222+
self.loop = asyncio.get_event_loop()
223+
224+
async def setup(self):
225+
"""Setup the transport."""
210226
settings = _DEFAULT_SETTINGS
211-
self.document = _cached_document(url)
212-
self.binding_name = binding_name
227+
binding_name = self.binding_name
228+
wsse = UsernameDigestTokenDtDiff(
229+
self.user, self.passwd, dt_diff=self.dt_diff, use_digest=self.encrypt
230+
)
231+
self.document = await self.loop.run_in_executor(
232+
None, _cached_document, self.url
233+
)
213234
self.zeep_client_authless = ZeepAsyncClient(
214235
wsdl=self.document,
215236
transport=self.transport,
@@ -227,14 +248,6 @@ def __init__(
227248
plugins=[WsAddressingPlugin()],
228249
)
229250
self.ws_client = self.zeep_client.create_service(binding_name, self.xaddr)
230-
231-
# Set soap header for authentication
232-
self.user = user
233-
self.passwd = passwd
234-
# Indicate wether password digest is needed
235-
self.encrypt = encrypt
236-
self.dt_diff = dt_diff
237-
238251
namespace = binding_name[binding_name.find("{") + 1 : binding_name.find("}")]
239252
available_ns = self.zeep_client.namespaces
240253
active_ns = (
@@ -302,7 +315,7 @@ def __init__(self, device: "ONVIFCamera", config: Dict[str, Any]) -> None:
302315

303316
async def setup(self) -> ONVIFService:
304317
"""Setup the notification processor."""
305-
notify_service = self._device.create_notification_service()
318+
notify_service = await self._device.create_notification_service()
306319
notify_subscribe = await notify_service.Subscribe(self._config)
307320
# pylint: disable=protected-access
308321
self._device.xaddrs[
@@ -315,14 +328,14 @@ async def setup(self) -> ONVIFService:
315328
#
316329
# If this fails this is OK as it just means we will switch
317330
# to webhook later when the first notification is received.
318-
service = self._device.create_onvif_service(
331+
service = await self._device.create_onvif_service(
319332
"pullpoint", port_type="NotificationConsumer"
320333
)
321334
self._operation = service.document.bindings[service.binding_name].get(
322335
"PullMessages"
323336
)
324337
self._service = service
325-
return self._device.create_subscription_service("NotificationConsumer")
338+
return await self._device.create_subscription_service("NotificationConsumer")
326339

327340
async def start(self) -> None:
328341
"""Start the notification processor."""
@@ -372,15 +385,15 @@ class ONVIFCamera:
372385

373386
def __init__(
374387
self,
375-
host,
376-
port,
377-
user,
378-
passwd,
379-
wsdl_dir=os.path.join(os.path.dirname(os.path.dirname(__file__)), "wsdl"),
388+
host: str,
389+
port: int,
390+
user: Optional[str],
391+
passwd: Optional[str],
392+
wsdl_dir: str = _WSDL_PATH,
380393
encrypt=True,
381394
no_cache=False,
382395
adjust_time=False,
383-
):
396+
) -> None:
384397
os.environ.pop("http_proxy", None)
385398
os.environ.pop("https_proxy", None)
386399
self.host = host
@@ -402,7 +415,6 @@ def __init__(
402415

403416
self._snapshot_uris = {}
404417
self._snapshot_client = AsyncClient(verify=_NO_VERIFY_SSL_CONTEXT)
405-
self._background_tasks = set()
406418

407419
async def get_capabilities(self) -> Dict[str, Any]:
408420
"""Get device capabilities."""
@@ -413,7 +425,7 @@ async def get_capabilities(self) -> Dict[str, Any]:
413425
async def update_xaddrs(self):
414426
"""Update xaddrs for services."""
415427
self.dt_diff = None
416-
devicemgmt = self.create_devicemgmt_service()
428+
devicemgmt = await self.create_devicemgmt_service()
417429
if self.adjust_time:
418430
try:
419431
sys_date = await devicemgmt.authless_GetSystemDateAndTime()
@@ -432,7 +444,7 @@ async def update_xaddrs(self):
432444
self.dt_diff = cam_date - dt.datetime.utcnow()
433445
await devicemgmt.close()
434446
del self.services[devicemgmt.binding_key]
435-
devicemgmt = self.create_devicemgmt_service()
447+
devicemgmt = await self.create_devicemgmt_service()
436448

437449
# Get XAddr of services on the device
438450
self.xaddrs = {}
@@ -455,7 +467,7 @@ async def create_pullpoint_subscription(
455467
) -> bool:
456468
"""Create a pullpoint subscription."""
457469
try:
458-
events = self.create_events_service()
470+
events = await self.create_events_service()
459471
pullpoint = await events.CreatePullPointSubscription(config or {})
460472
# pylint: disable=protected-access
461473
self.xaddrs[
@@ -481,7 +493,7 @@ async def get_snapshot_uri(self, profile_token):
481493
"""Get the snapshot uri for a given profile."""
482494
uri = self._snapshot_uris.get(profile_token)
483495
if uri is None:
484-
media_service = self.create_media_service()
496+
media_service = await self.create_media_service()
485497
req = media_service.create_type("GetSnapshotUri")
486498
req.ProfileToken = profile_token
487499
result = await media_service.GetSnapshotUri(req)
@@ -517,7 +529,9 @@ async def get_snapshot(self, profile_token, basic_auth=False):
517529

518530
return None
519531

520-
def get_definition(self, name, port_type=None):
532+
def get_definition(
533+
self, name: str, port_type: Optional[str] = None
534+
) -> Tuple[str, str, str]:
521535
"""Returns xaddr and wsdl of specified service"""
522536
# Check if the service is supported
523537
if name not in SERVICES:
@@ -551,7 +565,9 @@ def get_definition(self, name, port_type=None):
551565

552566
return xaddr, wsdlpath, binding_name
553567

554-
def create_onvif_service(self, name, port_type=None):
568+
async def create_onvif_service(
569+
self, name: str, port_type: Optional[str] = None
570+
) -> ONVIFService:
555571
"""Create ONVIF service client"""
556572
name = name.lower()
557573
# Don't re-create bindings if the xaddr remains the same.
@@ -572,9 +588,7 @@ def create_onvif_service(self, name, port_type=None):
572588
)
573589
# Hold a reference to the task so it doesn't get
574590
# garbage collected before it completes.
575-
task = asyncio.create_task(existing_service.close())
576-
task.add_done_callback(self._background_tasks.remove)
577-
self._background_tasks.add(task)
591+
await existing_service.close()
578592
self.services.pop(binding_key)
579593

580594
logger.debug("Creating service %s with %s", binding_key, xaddr)
@@ -590,63 +604,66 @@ def create_onvif_service(self, name, port_type=None):
590604
binding_name=binding_name,
591605
binding_key=binding_key,
592606
)
607+
await service.setup()
593608

594609
self.services[binding_key] = service
595610

596611
return service
597612

598-
def create_devicemgmt_service(self):
613+
async def create_devicemgmt_service(self):
599614
"""Service creation helper."""
600-
return self.create_onvif_service("devicemgmt")
615+
return await self.create_onvif_service("devicemgmt")
601616

602-
def create_media_service(self):
617+
async def create_media_service(self):
603618
"""Service creation helper."""
604-
return self.create_onvif_service("media")
619+
return await self.create_onvif_service("media")
605620

606-
def create_ptz_service(self):
621+
async def create_ptz_service(self):
607622
"""Service creation helper."""
608-
return self.create_onvif_service("ptz")
623+
return await self.create_onvif_service("ptz")
609624

610-
def create_imaging_service(self):
625+
async def create_imaging_service(self):
611626
"""Service creation helper."""
612-
return self.create_onvif_service("imaging")
627+
return await self.create_onvif_service("imaging")
613628

614-
def create_deviceio_service(self):
629+
async def create_deviceio_service(self):
615630
"""Service creation helper."""
616-
return self.create_onvif_service("deviceio")
631+
return await self.create_onvif_service("deviceio")
617632

618-
def create_events_service(self):
633+
async def create_events_service(self):
619634
"""Service creation helper."""
620-
return self.create_onvif_service("events")
635+
return await self.create_onvif_service("events")
621636

622-
def create_analytics_service(self):
637+
async def create_analytics_service(self):
623638
"""Service creation helper."""
624-
return self.create_onvif_service("analytics")
639+
return await self.create_onvif_service("analytics")
625640

626-
def create_recording_service(self):
641+
async def create_recording_service(self):
627642
"""Service creation helper."""
628-
return self.create_onvif_service("recording")
643+
return await self.create_onvif_service("recording")
629644

630-
def create_search_service(self):
645+
async def create_search_service(self):
631646
"""Service creation helper."""
632-
return self.create_onvif_service("search")
647+
return await self.create_onvif_service("search")
633648

634-
def create_replay_service(self):
649+
async def create_replay_service(self):
635650
"""Service creation helper."""
636-
return self.create_onvif_service("replay")
651+
return await self.create_onvif_service("replay")
637652

638-
def create_pullpoint_service(self):
653+
async def create_pullpoint_service(self):
639654
"""Service creation helper."""
640-
return self.create_onvif_service("pullpoint", port_type="PullPointSubscription")
655+
return await self.create_onvif_service(
656+
"pullpoint", port_type="PullPointSubscription"
657+
)
641658

642-
def create_notification_service(self):
659+
async def create_notification_service(self):
643660
"""Service creation helper."""
644-
return self.create_onvif_service("notification")
661+
return await self.create_onvif_service("notification")
645662

646-
def create_subscription_service(self, port_type=None):
663+
async def create_subscription_service(self, port_type=None):
647664
"""Service creation helper."""
648-
return self.create_onvif_service("subscription", port_type=port_type)
665+
return await self.create_onvif_service("subscription", port_type=port_type)
649666

650-
def create_receiver_service(self):
667+
async def create_receiver_service(self):
651668
"""Service creation helper."""
652-
return self.create_onvif_service("receiver")
669+
return await self.create_onvif_service("receiver")

0 commit comments

Comments
 (0)