Skip to content

Commit ad0c87c

Browse files
✨ Expose multipart upload (#63)
* add new openapi.json * start adding wrapper for files api * first implementation of async fileupload * use complete/upload link directly * add initial test for uploading 10gb file * minor correction * update openapi.json as well as upload fcn and test * factor out completion request * cleanup * add final exception handling to http client * update tests * skip test if not right osparc version * tuple -> Tuple in hints to make py 3.6 happy * list[] -> List[] in hints * change initialization * minor correction * start using httpx in order to also have sync client * make pagination iterator sync for now * several small changes to iterator and test * remove aiohttp error handler * small bug fix * implement destination_folder input in download_file * several small changes according to PR feedback * list_jobs -> jobs * add junit test suite name * use junit prefix instead * fix addopts * fix prefix
1 parent f674691 commit ad0c87c

File tree

12 files changed

+908
-249
lines changed

12 files changed

+908
-249
lines changed

api/openapi.json

Lines changed: 537 additions & 174 deletions
Large diffs are not rendered by default.

clients/python/client/osparc/__init__.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"""
44
from typing import Tuple
55

6+
import nest_asyncio
67
from osparc_client import ( # APIs; API client; models
78
ApiClient,
89
ApiException,
@@ -13,7 +14,6 @@
1314
Configuration,
1415
ErrorGet,
1516
File,
16-
FilesApi,
1717
Groups,
1818
HTTPValidationError,
1919
Job,
@@ -22,10 +22,6 @@
2222
JobMetadataUpdate,
2323
JobOutputs,
2424
JobStatus,
25-
LimitOffsetPageFile,
26-
LimitOffsetPageJob,
27-
LimitOffsetPageSolver,
28-
LimitOffsetPageStudy,
2925
Links,
3026
Meta,
3127
MetaApi,
@@ -49,13 +45,18 @@
4945
__version__,
5046
)
5147

48+
from ._files_api import FilesApi
5249
from ._info import openapi
5350
from ._solvers_api import SolversApi
51+
from ._utils import PaginationGenerator
52+
53+
nest_asyncio.apply() # allow to run coroutines via asyncio.run(coro)
5454

5555
__all__: Tuple[str, ...] = (
5656
# imports from osparc_client
5757
"__version__",
5858
"FilesApi",
59+
"PaginationGenerator",
5960
"MetaApi",
6061
"SolversApi",
6162
"UsersApi",
@@ -86,14 +87,10 @@
8687
"OnePageSolverPort",
8788
"StudyPort",
8889
"Study",
89-
"LimitOffsetPageStudy",
90-
"LimitOffsetPageFile",
9190
"JobMetadataUpdate",
92-
"LimitOffsetPageJob",
9391
"Links",
9492
"SolverPort",
9593
"JobMetadata",
96-
"LimitOffsetPageSolver",
9794
"ErrorGet",
9895
"OnePageStudyPort",
9996
# imports from osparc
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import asyncio
2+
import json
3+
import math
4+
import random
5+
import shutil
6+
import string
7+
from pathlib import Path
8+
from typing import Any, Iterator, List, Optional, Tuple, Union
9+
10+
import httpx
11+
from httpx import AsyncClient, Response
12+
from osparc_client import (
13+
BodyCompleteMultipartUploadV0FilesFileIdCompletePost,
14+
ClientFile,
15+
ClientFileUploadSchema,
16+
)
17+
from osparc_client import FilesApi as _FilesApi
18+
from osparc_client import FileUploadCompletionBody, FileUploadLinks, UploadedPart
19+
from tqdm.asyncio import tqdm_asyncio
20+
21+
from . import ApiClient, File
22+
from ._http_client import AsyncHttpClient
23+
from ._utils import _file_chunk_generator
24+
25+
26+
class FilesApi(_FilesApi):
27+
"""Class for interacting with files"""
28+
29+
def __init__(self, api_client: Optional[ApiClient] = None):
30+
"""Construct object
31+
32+
Args:
33+
api_client (ApiClient, optinal): osparc.ApiClient object
34+
"""
35+
super().__init__(api_client)
36+
self._super = super(FilesApi, self)
37+
user: Optional[str] = self.api_client.configuration.username
38+
passwd: Optional[str] = self.api_client.configuration.password
39+
self._auth: Optional[httpx.BasicAuth] = (
40+
httpx.BasicAuth(username=user, password=passwd)
41+
if (user is not None and passwd is not None)
42+
else None
43+
)
44+
45+
def download_file(
46+
self, file_id: str, *, destination_folder: Optional[Path] = None
47+
) -> str:
48+
if destination_folder is not None and not destination_folder.is_dir():
49+
raise RuntimeError(
50+
f"destination_folder: {destination_folder} must be a directory"
51+
)
52+
downloaded_file: Path = Path(super().download_file(file_id))
53+
if destination_folder is not None:
54+
dest_file: Path = destination_folder / downloaded_file.name
55+
while dest_file.is_file():
56+
new_name = (
57+
downloaded_file.stem
58+
+ "".join(random.choices(string.ascii_letters, k=8))
59+
+ downloaded_file.suffix
60+
)
61+
dest_file = destination_folder / new_name
62+
shutil.move(downloaded_file, dest_file)
63+
downloaded_file = dest_file
64+
return str(downloaded_file.resolve())
65+
66+
def upload_file(self, file: Union[str, Path]):
67+
return asyncio.run(self.upload_file_async(file=file))
68+
69+
async def upload_file_async(self, file: Union[str, Path]) -> File:
70+
if isinstance(file, str):
71+
file = Path(file)
72+
if not file.is_file():
73+
raise RuntimeError(f"{file} is not a file")
74+
client_file: ClientFile = ClientFile(
75+
filename=file.name, filesize=file.stat().st_size
76+
)
77+
client_upload_schema: ClientFileUploadSchema = self._super.get_upload_links(
78+
client_file=client_file
79+
)
80+
chunk_size: int = client_upload_schema.upload_schema.chunk_size
81+
links: FileUploadLinks = client_upload_schema.upload_schema.links
82+
url_iter: Iterator[Tuple[int, str]] = enumerate(
83+
iter(client_upload_schema.upload_schema.urls), start=1
84+
)
85+
if len(client_upload_schema.upload_schema.urls) < math.ceil(
86+
file.stat().st_size / chunk_size
87+
):
88+
raise RuntimeError(
89+
"Did not receive sufficient number of upload URLs from the server."
90+
)
91+
92+
tasks: list = []
93+
async with AsyncHttpClient(
94+
exception_request_type="post",
95+
exception_url=links.abort_upload,
96+
exception_auth=self._auth,
97+
) as session:
98+
async for chunck, size in _file_chunk_generator(file, chunk_size):
99+
# following https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
100+
index, url = next(url_iter)
101+
task = asyncio.create_task(
102+
self._upload_chunck(
103+
http_client=session,
104+
chunck=chunck,
105+
chunck_size=size,
106+
upload_link=url,
107+
index=index,
108+
)
109+
)
110+
tasks.append(task)
111+
112+
uploaded_parts: List[UploadedPart] = await tqdm_asyncio.gather(*tasks)
113+
114+
return await self._complete_multipart_upload(
115+
session, links.complete_upload, client_file, uploaded_parts
116+
)
117+
118+
async def _complete_multipart_upload(
119+
self,
120+
http_client: AsyncClient,
121+
complete_link: str,
122+
client_file: ClientFile,
123+
uploaded_parts: List[UploadedPart],
124+
) -> File:
125+
complete_payload = BodyCompleteMultipartUploadV0FilesFileIdCompletePost(
126+
client_file=client_file,
127+
uploaded_parts=FileUploadCompletionBody(parts=uploaded_parts),
128+
)
129+
response: Response = await http_client.post(
130+
complete_link,
131+
json=complete_payload.to_dict(),
132+
auth=self._auth,
133+
)
134+
response.raise_for_status()
135+
payload: dict[str, Any] = response.json()
136+
return File(**payload)
137+
138+
async def _upload_chunck(
139+
self,
140+
http_client: AsyncClient,
141+
chunck: bytes,
142+
chunck_size: int,
143+
upload_link: str,
144+
index: int,
145+
) -> UploadedPart:
146+
response: Response = await http_client.put(
147+
upload_link, content=chunck, headers={"Content-Length": f"{chunck_size}"}
148+
)
149+
response.raise_for_status()
150+
assert response.headers # nosec
151+
assert "Etag" in response.headers # nosec
152+
etag: str = json.loads(response.headers["Etag"])
153+
return UploadedPart(number=index, e_tag=etag)
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from typing import Optional
2+
3+
import httpx
4+
import tenacity
5+
6+
7+
class AsyncHttpClient:
8+
"""Async http client context manager"""
9+
10+
def __init__(
11+
self,
12+
exception_request_type: Optional[str] = None,
13+
exception_url: Optional[str] = None,
14+
exception_auth: Optional[httpx.BasicAuth] = None,
15+
):
16+
self._client = httpx.AsyncClient()
17+
self._exc_callback = (
18+
getattr(self._client, exception_request_type)
19+
if exception_request_type
20+
else None
21+
)
22+
self._exc_url = exception_url
23+
self._exc_auth = exception_auth
24+
25+
async def __aenter__(self) -> httpx.AsyncClient:
26+
return self._client
27+
28+
async def __aexit__(self, exc_type, exc_value, traceback) -> None:
29+
if exc_value is None:
30+
await self._client.aclose()
31+
else: # exception raised: need to handle
32+
if self._exc_callback is not None:
33+
try:
34+
async for attempt in tenacity.AsyncRetrying(
35+
reraise=True,
36+
wait=tenacity.wait_fixed(1),
37+
stop=tenacity.stop_after_delay(10),
38+
retry=tenacity.retry_if_exception_type(httpx.RequestError),
39+
):
40+
with attempt:
41+
response = await self._exc_callback(
42+
self._exc_url, auth=self._exc_auth
43+
)
44+
response.raise_for_status()
45+
except Exception as err:
46+
await self._client.aclose()
47+
raise err from exc_value
48+
await self._client.aclose()
49+
raise exc_value

clients/python/client/osparc/_solvers_api.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from typing import Optional
22

3+
import httpx
34
from osparc_client import SolversApi as _SolversApi
45

56
from . import ApiClient
@@ -16,14 +17,19 @@ def __init__(self, api_client: Optional[ApiClient] = None):
1617
api_client (ApiClient, optinal): osparc.ApiClient object
1718
"""
1819
super().__init__(api_client)
20+
user: Optional[str] = self.api_client.configuration.username
21+
passwd: Optional[str] = self.api_client.configuration.password
22+
self._auth: Optional[httpx.BasicAuth] = (
23+
httpx.BasicAuth(username=user, password=passwd)
24+
if (user is not None and passwd is not None)
25+
else None
26+
)
1927

2028
def get_jobs_page(self, solver_key: str, version: str) -> None:
2129
"""Method only for internal use"""
2230
raise NotImplementedError("This method is only for internal use")
2331

24-
def get_jobs(
25-
self, solver_key: str, version: str, limit: int = 20, offset: int = 0
26-
) -> PaginationGenerator:
32+
def jobs(self, solver_key: str, version: str) -> PaginationGenerator:
2733
"""Returns an iterator through which one can iterate over
2834
all Jobs submitted to the solver
2935
@@ -39,9 +45,13 @@ def get_jobs(
3945
(its "length")
4046
"""
4147

42-
def pagination_method(limit, offset):
48+
def pagination_method():
4349
return super(SolversApi, self).get_jobs_page(
44-
solver_key=solver_key, version=version, limit=limit, offset=offset
50+
solver_key=solver_key, version=version, limit=20, offset=0
4551
)
4652

47-
return PaginationGenerator(pagination_method, limit, offset)
53+
return PaginationGenerator(
54+
pagination_method,
55+
base_url=self.api_client.configuration.host,
56+
auth=self._auth,
57+
)

0 commit comments

Comments
 (0)