Skip to content

Commit 6e0423e

Browse files
saimonationSaimon Michelson
andauthored
Saimon/direct io streamer max workers (#318)
* add max workers, update change-log, update session expires error message to log in again * list root directory if no path was provided * add tls error and ancestors error exceptions * Catch session expired and log when trying to log out on an expired session Catch Forbidden and throw AuthenticationError for asynchronous login sessions to Portal * introduce handling for conflicts when copying and moving files. allow to resolve conflicts by overriding, renaming or skipping files * add ancestors exception and resolve unit test errors * update file browser and changelog * pass flake8 * resolve pylint errors * pass final lint * update docs to pass lint * fix lint * resolve ut errors and update changelog version * ensure destination is writeable target, update to pass exceptions * resolve regression in edge filer browser exceptions * update to pass lint and flake8 --------- Co-authored-by: Saimon Michelson <saimon@localhost.localdomain>
1 parent 298d456 commit 6e0423e

File tree

30 files changed

+674
-348
lines changed

30 files changed

+674
-348
lines changed

cterasdk/asynchronous/core/files/browser.py

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
from ....cio.core import CorePath
1+
from ....cio.core import CorePath, a_await_or_future
22
from ....lib.storage import asynfs, commonfs
3+
from ....exceptions.io import FileConflict
34
from ..base_command import BaseCommand
45
from . import io
56

@@ -61,11 +62,11 @@ async def download_many(self, target, objects, destination=None):
6162
handle = await self.handle_many(target, *objects)
6263
return await asynfs.write(directory, name, handle)
6364

64-
async def listdir(self, path, depth=None, include_deleted=False):
65+
async def listdir(self, path=None, depth=None, include_deleted=False):
6566
"""
6667
List Directory
6768
68-
:param str path: Path
69+
:param str,optional path: Path, defaults to the Cloud Drive root
6970
:param bool,optional include_deleted: Include deleted files, defaults to False
7071
"""
7172
return await io.listdir(self._core, self.normalize(path), depth=depth, include_deleted=include_deleted)
@@ -105,18 +106,34 @@ async def public_link(self, path, access='RO', expire_in=30):
105106
"""
106107
return await io.public_link(self._core, self.normalize(path), access, expire_in)
107108

108-
async def copy(self, *paths, destination=None, wait=False):
109+
async def _try_with_resolver(self, func, *paths, destination=None, resolver=None, cursor=None, wait=False):
110+
async def wrapper(resume_from=None):
111+
ref = await func(self._core, *paths, destination=destination, resolver=resolver, cursor=resume_from)
112+
return await a_await_or_future(self._core, ref, wait)
113+
114+
try:
115+
return await wrapper(cursor)
116+
except FileConflict as e:
117+
if resolver:
118+
return await wrapper(e.cursor)
119+
raise
120+
121+
async def copy(self, *paths, destination=None, resolver=None, cursor=None, wait=False):
109122
"""
110123
Copy one or more files or folders
111124
112125
:param list[str] paths: List of paths
113126
:param str destination: Destination
127+
:param cterasdk.core.types.ConflictResolver resolver: Conflict resolver, defaults to ``None``
128+
:param cterasdk.common.object.Object cursor: Resume copy from cursor
114129
:param bool,optional wait: ``True`` Wait for task to complete, or ``False`` to return an awaitable task object.
115130
:returns: Task status object, or an awaitable task object
116131
:rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask`
117132
"""
118133
try:
119-
return await io.copy(self._core, *[self.normalize(path) for path in paths], destination=self.normalize(destination), wait=wait)
134+
return await self._try_with_resolver(io.copy, *[self.normalize(path) for path in paths],
135+
destination=self.normalize(destination),
136+
resolver=resolver, cursor=cursor, wait=wait)
120137
except ValueError:
121138
raise ValueError('Copy destination was not specified.')
122139

@@ -188,7 +205,8 @@ async def rename(self, path, name, *, wait=False):
188205
:returns: Task status object, or an awaitable task object
189206
:rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask`
190207
"""
191-
return await io.rename(self._core, self.normalize(path), name, wait=wait)
208+
ref = await io.rename(self._core, self.normalize(path), name)
209+
return await a_await_or_future(self._core, ref, wait)
192210

193211
async def delete(self, *paths, wait=False):
194212
"""
@@ -199,7 +217,8 @@ async def delete(self, *paths, wait=False):
199217
:returns: Task status object, or an awaitable task object
200218
:rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask`
201219
"""
202-
return await io.remove(self._core, *[self.normalize(path) for path in paths], wait=wait)
220+
ref = await io.remove(self._core, *[self.normalize(path) for path in paths])
221+
return await a_await_or_future(self._core, ref, wait)
203222

204223
async def undelete(self, *paths, wait=False):
205224
"""
@@ -210,19 +229,24 @@ async def undelete(self, *paths, wait=False):
210229
:returns: Task status object, or an awaitable task object
211230
:rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask`
212231
"""
213-
return await io.recover(self._core, *[self.normalize(path) for path in paths], wait=wait)
232+
ref = await io.recover(self._core, *[self.normalize(path) for path in paths])
233+
return await a_await_or_future(self._core, ref, wait)
214234

215-
async def move(self, *paths, destination=None, wait=False):
235+
async def move(self, *paths, destination=None, resolver=None, cursor=None, wait=False):
216236
"""
217237
Move one or more files or folders
218238
219239
:param list[str] paths: List of paths
220240
:param str destination: Destination
241+
:param cterasdk.core.types.ConflictResolver resolver: Conflict resolver, defaults to ``None``
242+
:param cterasdk.common.object.Object cursor: Resume copy from cursor
221243
:param bool,optional wait: ``True`` Wait for task to complete, or ``False`` to return an awaitable task object.
222244
:returns: Task status object, or an awaitable task object
223245
:rtype: cterasdk.common.object.Object or :class:`cterasdk.lib.tasks.AwaitablePortalTask`
224246
"""
225247
try:
226-
return await io.move(self._core, *[self.normalize(path) for path in paths], destination=self.normalize(destination), wait=wait)
248+
return await self._try_with_resolver(io.move, *[self.normalize(path) for path in paths],
249+
destination=self.normalize(destination),
250+
resolver=resolver, cursor=cursor, wait=wait)
227251
except ValueError:
228252
raise ValueError('Move destination was not specified.')

cterasdk/asynchronous/core/files/io.py

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
2-
from ....cio.common import encode_request_parameter, a_await_or_future
2+
from ....cio.common import encode_request_parameter
33
from ....cio import core as fs
4-
from ....exceptions.io import ResourceNotFoundError, NotADirectory, ResourceExistsError
4+
from ....exceptions.io import ResourceNotFoundError, ResourceExistsError
55
from .. import query
66

77

@@ -68,40 +68,34 @@ async def makedirs(core, path):
6868
logger.debug('Resource already exists: %s', path.reference.as_posix())
6969

7070

71-
async def rename(core, path, name, *, wait=False):
71+
async def rename(core, path, name):
7272
with fs.rename(path, name) as param:
73-
ref = await core.v1.api.execute('', 'moveResources', param)
74-
return await a_await_or_future(core, ref, wait)
73+
return await core.v1.api.execute('', 'moveResources', param)
7574

7675

77-
async def remove(core, *paths, wait=False):
76+
async def remove(core, *paths):
7877
with fs.delete(*paths) as param:
79-
ref = await core.v1.api.execute('', 'deleteResources', param)
80-
return await a_await_or_future(core, ref, wait)
78+
return await core.v1.api.execute('', 'deleteResources', param)
8179

8280

83-
async def recover(core, *paths, wait=False):
81+
async def recover(core, *paths):
8482
with fs.recover(*paths) as param:
85-
ref = await core.v1.api.execute('', 'restoreResources', param)
86-
return await a_await_or_future(core, ref, wait)
83+
return await core.v1.api.execute('', 'restoreResources', param)
8784

8885

89-
async def copy(core, *paths, destination=None, wait=False):
90-
with fs.copy(*paths, destination=destination) as param:
91-
ref = await core.v1.api.execute('', 'copyResources', param)
92-
return await a_await_or_future(core, ref, wait)
86+
async def copy(core, *paths, destination=None, resolver=None, cursor=None):
87+
with fs.copy(*paths, destination=destination, resolver=resolver, cursor=cursor) as param:
88+
return await core.v1.api.execute('', 'copyResources', param)
9389

9490

95-
async def move(core, *paths, destination=None, wait=False):
96-
with fs.move(*paths, destination=destination) as param:
97-
ref = await core.v1.api.execute('', 'moveResources', param)
98-
return await a_await_or_future(core, ref, wait)
91+
async def move(core, *paths, destination=None, resolver=None, cursor=None):
92+
with fs.move(*paths, destination=destination, resolver=resolver, cursor=cursor) as param:
93+
return await core.v1.api.execute('', 'moveResources', param)
9994

10095

10196
async def ensure_directory(core, directory, suppress_error=False):
10297
present, resource = await metadata(core, directory, suppress_error=True)
103-
if (not present or not resource.isFolder) and not suppress_error:
104-
raise NotADirectory(directory.absolute)
98+
fs.ensure_directory(present, resource, directory, suppress_error)
10599
return resource.isFolder if present else False, resource
106100

107101

@@ -151,7 +145,9 @@ async def _validate_destination(core, name, destination):
151145
is_dir, resource = await ensure_directory(core, destination, suppress_error=True)
152146
if not is_dir:
153147
is_dir, resource = await ensure_directory(core, destination.parent)
148+
fs.ensure_writeable(resource, destination.parent)
154149
return resource.cloudFolderInfo.uid, destination.name, destination.parent
150+
fs.ensure_writeable(resource, destination)
155151
return resource.cloudFolderInfo.uid, name, destination
156152

157153

cterasdk/asynchronous/core/login.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import logging
22

33
from .base_command import BaseCommand
4-
from ...exceptions import CTERAException
4+
from ...exceptions.transport import Forbidden
5+
from ...exceptions.session import SessionExpired
6+
from ...exceptions.auth import AuthenticationError
57

68

79
logger = logging.getLogger('cterasdk.core')
@@ -23,9 +25,9 @@ async def login(self, username, password):
2325
try:
2426
await self._core.v1.api.form_data('/login', {'j_username': username, 'j_password': password})
2527
logger.info("User logged in. %s", {'host': host, 'user': username})
26-
except CTERAException:
27-
logger.error("Login failed. %s", {'host': host, 'user': username})
28-
raise
28+
except Forbidden as error:
29+
logger.error('Login failed. %s', {'host': host, 'user': username})
30+
raise AuthenticationError() from error
2931

3032
async def sso(self, ctera_ticket):
3133
"""
@@ -40,5 +42,8 @@ async def logout(self):
4042
"""
4143
Log out of the portal
4244
"""
43-
await self._core.v1.api.form_data('/logout', {})
44-
logger.info("User logged out. %s", {'host': self._core.host()})
45+
try:
46+
await self._core.v1.api.form_data('/logout', {})
47+
logger.info("User logged out. %s", {'host': self._core.host()})
48+
except SessionExpired:
49+
logger.info("Session expired and is no longer active.")

cterasdk/asynchronous/core/notifications.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from ...common import Object
88
from ...lib import CursorResponse
99
from ...exceptions.transport import HTTPError
10-
from ...exceptions.notifications import NotificationsError
10+
from ...exceptions.notifications import NotificationsError, AncestorsError
1111

1212

1313
logger = logging.getLogger('cterasdk.notifications')
@@ -86,12 +86,12 @@ async def ancestors(self, descendant):
8686
param = Object()
8787
param.folder_id = descendant.folder_id
8888
param.guid = descendant.guid
89-
logger.debug('Getting ancestors. %s', {'guid': param.guid, 'folder_id': param.folder_id})
89+
logger.debug('Getting ancestors for: %s:%s', param.folder_id, param.guid)
9090
try:
9191
return await self._core.v2.api.post('/metadata/ancestors', param)
92-
except HTTPError:
93-
logger.error('Could not retrieve ancestors. %s', {'folder_id': param.folder_id, 'guid': param.guid})
94-
raise
92+
except HTTPError as error:
93+
logger.error('Could not retrieve ancestors for: %s:%s', param.folder_id, param.guid)
94+
raise AncestorsError(param.folder_id, param.guid) from error
9595

9696

9797
class Service(BaseCommand):

cterasdk/cio/common.py

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -77,27 +77,3 @@ def encode_request_parameter(param):
7777
return dict(
7878
inputXML=utf8_decode(toxmlstr(param))
7979
)
80-
81-
82-
def await_or_future(ctera, ref, wait):
83-
"""
84-
Wait for task completion, or return an awaitable task object.
85-
86-
:param str ref: Task reference
87-
:param bool wait: ``True`` to wait for task completion, ``False`` to return an awaitable task object
88-
"""
89-
if wait:
90-
return ctera.tasks.wait(ref)
91-
return ctera.tasks.awaitable_task(ref)
92-
93-
94-
async def a_await_or_future(ctera, ref, wait):
95-
"""
96-
Wait for task completion, or return an awaitable task object.
97-
98-
:param str ref: Task reference
99-
:param bool wait: ``True`` to wait for task completion, ``False`` to return an awaitable task object
100-
"""
101-
if wait:
102-
return await ctera.tasks.wait(ref)
103-
return ctera.tasks.awaitable_task(ref)

0 commit comments

Comments
 (0)