6
6
import logging
7
7
import os .path
8
8
import ssl
9
- from typing import IO , Any , Dict , Optional , Tuple , Union
9
+ from typing import Any , Dict , Optional , Tuple
10
10
11
11
import httpx
12
12
from httpx import AsyncClient , BasicAuth , DigestAuth
13
13
from zeep .cache import SqliteCache
14
14
from zeep .client import AsyncClient as BaseZeepAsyncClient , Settings
15
15
from zeep .exceptions import Fault
16
16
import zeep .helpers
17
- from zeep .loader import parse_xml
18
17
from zeep .proxy import AsyncServiceProxy
19
- from zeep .transports import AsyncTransport
18
+ from zeep .transports import AsyncTransport , Transport
20
19
from zeep .wsa import WsAddressingPlugin
21
20
from zeep .wsdl import Document
22
21
from zeep .wsse .username import UsernameToken
@@ -91,39 +90,30 @@ def apply(self, envelope, headers):
91
90
return result
92
91
93
92
94
- class AsyncSafeTransport :
95
- """A transport that blocks all I/O for zeep."""
93
+ class AsyncSafeTransport ( Transport ) :
94
+ """A transport that blocks all remote I/O for zeep."""
96
95
97
- def load (self , * args : Any , ** kwargs : Any ) -> None :
98
- """Load the given XML document.
99
-
100
- This should never be called, but we want to raise
101
- an error if it is so we know we're doing something wrong
102
- and do not accidentally block the event loop.
103
- """
104
- raise RuntimeError ("Loading is not supported in async mode" )
96
+ def load (self , url : str ) -> None :
97
+ """Load the given XML document."""
98
+ if not _path_isfile (url ):
99
+ raise RuntimeError (f"Loading { url } is not supported in async mode" )
100
+ # Ideally this would happen in the executor but the library
101
+ # does not call this from a coroutine so the best we can do
102
+ # without a major refactor is to cache this so it only happens
103
+ # once per process at startup. Previously it would happen once
104
+ # per service per camera per setup which is a lot of blocking
105
+ # I/O in the event loop so this is a major improvement.
106
+ with open (os .path .expanduser (url ), "rb" ) as fh :
107
+ return fh .read ()
105
108
106
109
107
110
_ASYNC_TRANSPORT = AsyncSafeTransport ()
108
111
109
112
110
113
@lru_cache (maxsize = 128 )
111
- def _cached_parse_xml ( path : str ) -> Any :
114
+ def _cached_document ( url : str ) -> Document :
112
115
"""Load external XML document from disk."""
113
- with open (os .path .expanduser (path ), "rb" ) as fh :
114
- return parse_xml (fh .read (), _ASYNC_TRANSPORT , settings = _DEFAULT_SETTINGS )
115
-
116
-
117
- class DocumentWithCache (Document ):
118
- """A WSDL document that supports caching."""
119
-
120
- def _get_xml_document (self , url : Union [IO , str ]) -> Any :
121
- """Load external XML document from a file-like object or URL."""
122
- if _path_isfile (url ):
123
- return _cached_parse_xml (url )
124
- raise RuntimeError (
125
- f"Cannot fetch { url } in async mode because it would block the event loop"
126
- )
116
+ return Document (url , _ASYNC_TRANSPORT , settings = _DEFAULT_SETTINGS )
127
117
128
118
129
119
class ZeepAsyncClient (BaseZeepAsyncClient ):
@@ -216,8 +206,9 @@ def __init__(
216
206
)
217
207
)
218
208
settings = _DEFAULT_SETTINGS
209
+ document = _cached_document (url )
219
210
self .zeep_client_authless = ZeepAsyncClient (
220
- wsdl = DocumentWithCache ( url , self . transport , settings = settings ) ,
211
+ wsdl = document ,
221
212
transport = self .transport ,
222
213
settings = settings ,
223
214
plugins = [WsAddressingPlugin ()],
@@ -226,7 +217,7 @@ def __init__(
226
217
binding_name , self .xaddr
227
218
)
228
219
self .zeep_client = ZeepAsyncClient (
229
- wsdl = DocumentWithCache ( url , self . transport , settings = settings ) ,
220
+ wsdl = document ,
230
221
wsse = wsse ,
231
222
transport = self .transport ,
232
223
settings = settings ,
0 commit comments