Skip to content

Commit bbafb6d

Browse files
authored
Saimon/support thread execution (#260)
1 parent 8bc3694 commit bbafb6d

File tree

10 files changed

+139
-120
lines changed

10 files changed

+139
-120
lines changed

cterasdk/clients/async_requests.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@ def session_parameters(client_settings):
1717
class Session:
1818
"""Asynchronous HTTP Session"""
1919

20-
def __init__(self, client_settings):
21-
self.initialize(client_settings)
20+
def __init__(self, settings):
21+
self._settings = settings
22+
self._session = None
2223

23-
def initialize(self, client_settings):
24-
self._session = aiohttp.ClientSession(trace_configs=[async_tracers.default()], **session_parameters(client_settings))
24+
@property
25+
def session(self):
26+
if self._session is None:
27+
self._session = aiohttp.ClientSession(trace_configs=[async_tracers.default()], **session_parameters(self._settings))
28+
return self._session
2529

2630
@property
2731
def cookies(self):
@@ -41,7 +45,7 @@ async def await_promise(self, r, *, on_response=None):
4145

4246
async def _request(self, r, *, on_response=None):
4347
try:
44-
response = await self._session.request(r.method, r.url, **r.kwargs)
48+
response = await self.session.request(r.method, r.url, **r.kwargs)
4549
return asyncio.create_task(on_response(response))
4650
except aiohttp.ClientSSLError as error:
4751
logging.getLogger('cterasdk.http').warning(error)
@@ -61,10 +65,11 @@ async def _request(self, r, *, on_response=None):
6165

6266
@property
6367
def closed(self):
64-
return self._session.closed
68+
return self._session.closed if self._session is not None else True
6569

6670
async def close(self):
67-
await self._session.close()
71+
if self._session is not None:
72+
await self._session.close()
6873

6974

7075
class CookieJar:

cterasdk/clients/asynchronous/clients.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ async def delete(self, path, *, on_response=None, **kwargs):
3535

3636
async def _request(self, request, *, on_response=None):
3737
on_response = on_response if on_response else AsyncResponse.new()
38-
response = await self._async_session.await_promise(self.join_headers(request), on_response=on_response)
38+
response = await self._session.await_promise(self.join_headers(request), on_response=on_response)
3939
return await errors.accept(response)
4040

4141

@@ -45,8 +45,8 @@ class AsyncWebDAV(AsyncClient):
4545

4646
class AsyncJSON(AsyncClient):
4747

48-
def __init__(self, builder=None, async_session=None, authenticator=None, client_settings=None):
49-
super().__init__(builder, async_session, authenticator, client_settings)
48+
def __init__(self, builder=None, session=None, settings=None, authenticator=None):
49+
super().__init__(builder, session, settings, authenticator)
5050
self.headers.update_headers({'Content-Type': 'application/json'})
5151

5252
async def get(self, path, **kwargs):

cterasdk/clients/base.py

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import logging
21
from . import async_requests
32
from ..common import utils
43

@@ -32,23 +31,38 @@ def update_headers(self, headers):
3231
class BaseClient:
3332
"""Base Client"""
3433

35-
def __init__(self, builder=None, async_session=None, authenticator=None, client_settings=None):
34+
def __init__(self, builder=None, session=None, settings=None, authenticator=None):
3635
"""
3736
Initialize a Client
3837
3938
:param builder: Endpoint builder.
40-
:param ,optional async_session: Re-use an asynchronous session.
39+
:param ,optional session: Session.
40+
:param ,optional settings: Client Session Settings.
4141
:param ,optional authenticator: Authenticator function.
4242
"""
4343
self._headers = PersistentHeaders()
4444
self._authenticator = authenticator
4545
self._builder = builder
46-
self._client_settings = client_settings
47-
self._async_session = async_session if async_session else async_requests.Session(client_settings)
46+
self._session = session if session else async_requests.Session(settings)
47+
48+
def clone(self, definition, builder=None, authenticator=None):
49+
"""
50+
Clone a Client
51+
52+
:param class definition: Class definition.
53+
:param ,optional builder: Endpoint builder.
54+
:param ,optional authenticator: Authenticator function.
55+
"""
56+
return definition(
57+
builder if builder is not None else self._builder,
58+
self._session,
59+
None,
60+
authenticator if authenticator is not None else self._authenticator,
61+
)
4862

4963
@property
5064
def cookies(self):
51-
return CookieJar(self._async_session.cookies)
65+
return CookieJar(self._session.cookies)
5266

5367
@property
5468
def headers(self):
@@ -62,24 +76,17 @@ def join_headers(self, request):
6276
def baseurl(self):
6377
return self._builder()
6478

65-
def __str__(self):
66-
return f"({self.__class__.__name__} client at {hex(hash(self))}, baseurl={self.baseurl})"
67-
68-
def _before_request(self):
69-
if self._async_session.closed:
70-
logging.getLogger('cterasdk.http').debug('Session Closed. Renewing.')
71-
self._async_session.initialize(self._client_settings)
72-
7379
def request(self, request, *, on_response=None):
74-
self._before_request()
7580
return self._request(request, on_response=on_response)
7681

7782
async def async_request(self, request, *, on_response=None):
78-
self._before_request()
7983
return await self._request(request, on_response=on_response)
8084

8185
async def close(self):
82-
await self._async_session.close()
86+
await self._session.close()
87+
88+
def __str__(self):
89+
return f"({self.__class__.__name__} client at {hex(hash(self))}, baseurl={self.baseurl})"
8390

8491

8592
class BaseResponse:

cterasdk/clients/synchronous/clients.py

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11

2-
import time
32
import asyncio
4-
import logging
3+
import threading
54
from . import errors
65
from ..base import BaseClient
76
from ..common import Serializers
@@ -50,10 +49,11 @@ def delete(self, path, *, on_response=None, **kwargs):
5049

5150
def _request(self, request, *, on_response=None):
5251
on_response = on_response if on_response else SyncResponse.new()
53-
return execute_request(self._async_session, self.join_headers(request), on_response=on_response)
52+
response = execute(self._session.await_promise, self.join_headers(request), on_response=on_response)
53+
return errors.accept(response)
5454

5555
def close(self): # pylint: disable=invalid-overridden-method
56-
return asyncio.get_event_loop().run_until_complete(super().close())
56+
return execute(super().close)
5757

5858

5959
class Folders(Client):
@@ -190,52 +190,73 @@ def login(self):
190190
return response.json()
191191

192192

193-
def execute_request(async_session, request, *, on_response, max_retries=3, backoff_factor=2):
194-
retries = 0
195-
while retries < max_retries:
196-
try:
197-
response = asyncio.get_event_loop().run_until_complete(async_session.await_promise(request, on_response=on_response))
198-
return errors.accept(response)
199-
except (ConnectionError, TimeoutError):
200-
retries += 1
201-
if retries < max_retries:
202-
delay = backoff_factor ** retries
203-
logging.getLogger('cterasdk.http').warning("Retrying in %s seconds.", delay)
204-
time.sleep(delay)
205-
else:
206-
logging.getLogger('cterasdk.http').error("Max retries reached. Request failed.")
207-
raise
208-
return None
193+
event_loop = asyncio.new_event_loop()
194+
195+
196+
def execute(target, *args, **kwargs):
197+
loop = asyncio.get_event_loop()
198+
199+
if not loop.is_running():
200+
return loop.run_until_complete(target(*args, **kwargs))
201+
202+
return run_threadsafe(event_loop, target, *args, **kwargs)
209203

210204

211205
class SyncResponse(AsyncResponse):
212206
"""Synchronous Response Object"""
213207

214-
def __init__(self, response):
215-
super().__init__(response)
216-
self._executor = asyncio.get_event_loop()
217-
218208
def iter_content(self, chunk_size=None):
219209
while True:
220210
try:
221-
yield self._executor.run_until_complete(super().async_iter_content(chunk_size).__anext__())
211+
yield execute(super().async_iter_content(chunk_size).__anext__)
222212
except StopAsyncIteration:
223213
break
224214

225215
def text(self): # pylint: disable=invalid-overridden-method
226-
return self._consume_response(super().text)
216+
return execute(super().text)
227217

228218
def json(self): # pylint: disable=invalid-overridden-method
229-
return self._consume_response(super().json)
219+
return execute(super().json)
230220

231221
def xml(self): # pylint: disable=invalid-overridden-method
232-
return self._consume_response(super().xml)
233-
234-
def _consume_response(self, consumer):
235-
return self._executor.run_until_complete(consumer())
222+
return execute(super().xml)
236223

237224
@staticmethod
238225
def new():
239226
async def new_response(response):
240227
return SyncResponse(response)
241228
return new_response
229+
230+
231+
def run_threadsafe(loop, target, *args, **kwargs):
232+
event = threading.Event()
233+
234+
t = Task(loop, event, target, *args, **kwargs)
235+
t.start()
236+
237+
event.wait()
238+
239+
if t.exception:
240+
raise t.exception
241+
return t.response
242+
243+
244+
class Task(threading.Thread):
245+
246+
def __init__(self, loop, event, target, *args, **kwargs):
247+
super().__init__(name='Thread-safe Executor')
248+
self.loop = loop
249+
self.event = event
250+
self.target = target
251+
self.args = args
252+
self.kwargs = kwargs
253+
self.exception = None
254+
self.response = None
255+
256+
def run(self):
257+
try:
258+
self.response = self.loop.run_until_complete(self.target(*self.args, **self.kwargs))
259+
except Exception as e: # pylint: disable=broad-exception-caught
260+
self.exception = e
261+
finally:
262+
self.event.set()

cterasdk/direct/client.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,12 @@ def __init__(self, baseurl, credentials):
3333
:param str baseurl: Portal URL
3434
:param cterasdk.objects.asynchronous.directio.Credentials credentials: Credentials
3535
"""
36-
ctera_direct = cterasdk.settings.sessions.ctera_direct
37-
self._api = AsyncJSON(EndpointBuilder.new(baseurl, '/directio'), authenticator=lambda *_: True,
38-
client_settings=client_settings(ctera_direct.api))
39-
self._client = AsyncClient(DefaultBuilder(), authenticator=lambda *_: True, client_settings=client_settings(ctera_direct.storage))
36+
self._api = AsyncJSON(EndpointBuilder.new(baseurl, '/directio'),
37+
settings=client_settings(cterasdk.settings.sessions.ctera_direct.api),
38+
authenticator=lambda *_: True)
39+
self._client = AsyncClient(DefaultBuilder(),
40+
settings=client_settings(cterasdk.settings.sessions.ctera_direct.storage),
41+
authenticator=lambda *_: True)
4042
self._credentials = credentials
4143

4244
async def _direct(self, file_id):

cterasdk/objects/asynchronous/core.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,13 @@ def __init__(self, core):
2424
class V1:
2525

2626
def __init__(self, core):
27-
session = core._generic._async_session
28-
self.api = clients.AsyncAPI(EndpointBuilder.new(core.base, core.context, '/api'), session, core._authenticator,
29-
core._generic._client_settings)
27+
self.api = core.default.clone(clients.AsyncAPI, EndpointBuilder.new(core.base, core.context, '/api'))
3028

3129

3230
class V2:
3331

3432
def __init__(self, core):
35-
session = core._generic._async_session
36-
self.api = clients.AsyncJSON(EndpointBuilder.new(core.base, core.context, '/v2/api'), session,
37-
core._authenticator, core._generic._client_settings)
33+
self.api = core.default.clone(clients.AsyncJSON, EndpointBuilder.new(core.base, core.context, '/v2/api'))
3834

3935

4036
class IO:
@@ -46,9 +42,7 @@ def __init__(self, core):
4642
class WebDAV:
4743

4844
def __init__(self, core):
49-
session = core._generic._async_session
50-
self._webdav = clients.AsyncWebDAV(EndpointBuilder.new(core.base, core.context, '/webdav'), session, core._authenticator,
51-
core._generic._client_settings)
45+
self._webdav = core.default.clone(clients.AsyncWebDAV, EndpointBuilder.new(core.base, core.context, '/webdav'))
5246

5347
@property
5448
def download(self):
@@ -62,15 +56,20 @@ async def __aenter__(self):
6256

6357
def __init__(self, host, port=None, https=True):
6458
super().__init__(host, port, https, base=None)
65-
self._generic = clients.AsyncClient(EndpointBuilder.new(self.base), authenticator=self._authenticator,
66-
client_settings=client_settings(cterasdk.settings.sessions.metadata_connector))
59+
self._default = clients.AsyncClient(EndpointBuilder.new(self.base),
60+
settings=client_settings(cterasdk.settings.sessions.metadata_connector),
61+
authenticator=self._authenticator)
6762
self._ctera_session = Session(self.host(), self.context)
6863
self._ctera_clients = Clients(self)
6964

7065
self.cloudfs = cloudfs.CloudFS(self)
7166
self.notifications = notifications.Notifications(self)
7267
self.users = users.Users(self)
7368

69+
@property
70+
def default(self):
71+
return self._default
72+
7473
@property
7574
def v1(self):
7675
return self.clients.v1
@@ -93,7 +92,7 @@ async def logout(self):
9392
if self._ctera_session.connected:
9493
await self._login_object.logout()
9594
self._ctera_session.stop_session()
96-
await self._generic.close()
95+
await self.default.close()
9796

9897
@property
9998
def _login_object(self):
@@ -103,7 +102,7 @@ def _authenticator(self, url):
103102
return authenticators.core(self.session(), url, self.context)
104103

105104
async def __aexit__(self, exc_type, exc, tb):
106-
await self._generic.close()
105+
await self.default.close()
107106

108107

109108
class AsyncGlobalAdmin(AsyncPortal):

cterasdk/objects/services.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,9 @@ def __enter__(self):
9191

9292
def __init__(self, host, port, https, base):
9393
super().__init__(host, port, https, base)
94-
self._generic = clients.Client(endpoints.EndpointBuilder.new(self.base), authenticator=self._authenticator,
95-
client_settings=client_settings(cterasdk.settings.sessions.management))
94+
self._default = clients.Client(endpoints.EndpointBuilder.new(self.base),
95+
settings=client_settings(cterasdk.settings.sessions.management),
96+
authenticator=self._authenticator)
9697

9798
def login(self, username, password):
9899
"""
@@ -111,7 +112,7 @@ def logout(self):
111112
if self._ctera_session.connected:
112113
self._login_object.logout()
113114
self._ctera_session.stop_session()
114-
self._generic.close()
115+
self._default.close()
115116

116117
@abstractmethod
117118
def test(self):
@@ -122,17 +123,21 @@ def test(self):
122123
def _session_id_key(self):
123124
return NotImplementedError("Subclass must implement the '_session_id_key' property")
124125

126+
@property
127+
def default(self):
128+
return self._default
129+
125130
def get_session_id(self):
126131
"""
127132
Get Session Identifier
128133
129134
:return str: Session ID
130135
"""
131-
return self._generic.cookies.get(self._session_id_key)
136+
return self._default.cookies.get(self._session_id_key)
132137

133138
def set_session_id(self, session_id):
134-
self._generic.cookies.update({self._session_id_key: session_id}, self._generic.baseurl)
139+
self._default.cookies.update({self._session_id_key: session_id}, self._default.baseurl)
135140
self._ctera_session.start_session(self)
136141

137142
def __exit__(self, exc_type, exc_value, exc_tb):
138-
self._generic.close()
143+
self._default.close()

0 commit comments

Comments
 (0)