Skip to content

Commit 9594d7b

Browse files
authored
Merge branch 'master' into request/hide-osparc-integration
2 parents e30d250 + b478207 commit 9594d7b

File tree

37 files changed

+273
-224
lines changed

37 files changed

+273
-224
lines changed

packages/models-library/src/models_library/progress_bar.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@
22

33
from pydantic import BaseModel, ConfigDict
44

5-
from .basic_types import IDStr
6-
75
# NOTE: keep a list of possible unit, and please use correct official unit names
86
ProgressUnit: TypeAlias = Literal["Byte"]
97

108

119
class ProgressStructuredMessage(BaseModel):
12-
description: IDStr
10+
description: str
1311
current: float
1412
total: int
1513
unit: str | None = None
@@ -51,6 +49,7 @@ class ProgressStructuredMessage(BaseModel):
5149
class ProgressReport(BaseModel):
5250
actual_value: float
5351
total: float = 1.0
52+
attempt: int = 0
5453
unit: ProgressUnit | None = UNITLESS
5554
message: ProgressStructuredMessage | None = None
5655

Lines changed: 19 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,40 @@
11
import asyncio
2-
from collections.abc import AsyncGenerator, Coroutine
3-
from dataclasses import dataclass
4-
from typing import Any, Final, TypeAlias
2+
import logging
3+
from collections.abc import AsyncGenerator
4+
from typing import Any
55

66
from aiohttp import ClientConnectionError, ClientSession
77
from tenacity import TryAgain, retry
88
from tenacity.asyncio import AsyncRetrying
9+
from tenacity.before_sleep import before_sleep_log
910
from tenacity.retry import retry_if_exception_type
1011
from tenacity.stop import stop_after_delay
1112
from tenacity.wait import wait_random_exponential
1213
from yarl import URL
1314

14-
from ...rest_responses import unwrap_envelope
15+
from ...long_running_tasks._constants import DEFAULT_POLL_INTERVAL_S, HOUR
16+
from ...long_running_tasks._models import LRTask, RequestBody
17+
from ...rest_responses import unwrap_envelope_if_required
1518
from .. import status
1619
from .server import TaskGet, TaskId, TaskProgress, TaskStatus
1720

18-
RequestBody: TypeAlias = Any
21+
_logger = logging.getLogger(__name__)
22+
1923

20-
_MINUTE: Final[int] = 60 # in secs
21-
_HOUR: Final[int] = 60 * _MINUTE # in secs
22-
_DEFAULT_POLL_INTERVAL_S: Final[float] = 1
2324
_DEFAULT_AIOHTTP_RETRY_POLICY: dict[str, Any] = {
2425
"retry": retry_if_exception_type(ClientConnectionError),
2526
"wait": wait_random_exponential(max=20),
2627
"stop": stop_after_delay(60),
2728
"reraise": True,
29+
"before_sleep": before_sleep_log(_logger, logging.INFO),
2830
}
2931

3032

3133
@retry(**_DEFAULT_AIOHTTP_RETRY_POLICY)
3234
async def _start(session: ClientSession, url: URL, json: RequestBody | None) -> TaskGet:
3335
async with session.post(url, json=json) as response:
3436
response.raise_for_status()
35-
data, error = unwrap_envelope(await response.json())
36-
assert not error # nosec
37-
assert data is not None # nosec
37+
data = unwrap_envelope_if_required(await response.json())
3838
return TaskGet.model_validate(data)
3939

4040

@@ -50,21 +50,18 @@ async def _wait_for_completion(
5050
stop=stop_after_delay(client_timeout),
5151
reraise=True,
5252
retry=retry_if_exception_type(TryAgain),
53+
before_sleep=before_sleep_log(_logger, logging.DEBUG),
5354
):
5455
with attempt:
5556
async with session.get(status_url) as response:
5657
response.raise_for_status()
57-
data, error = unwrap_envelope(await response.json())
58-
assert not error # nosec
59-
assert data is not None # nosec
58+
data = unwrap_envelope_if_required(await response.json())
6059
task_status = TaskStatus.model_validate(data)
6160
yield task_status.task_progress
6261
if not task_status.done:
6362
await asyncio.sleep(
6463
float(
65-
response.headers.get(
66-
"retry-after", _DEFAULT_POLL_INTERVAL_S
67-
)
64+
response.headers.get("retry-after", DEFAULT_POLL_INTERVAL_S)
6865
)
6966
)
7067
msg = f"{task_id=}, {task_status.started=} has status: '{task_status.task_progress.message}' {task_status.task_progress.percent}%"
@@ -81,42 +78,21 @@ async def _task_result(session: ClientSession, result_url: URL) -> Any:
8178
async with session.get(result_url) as response:
8279
response.raise_for_status()
8380
if response.status != status.HTTP_204_NO_CONTENT:
84-
data, error = unwrap_envelope(await response.json())
85-
assert not error # nosec
86-
assert data # nosec
87-
return data
81+
return unwrap_envelope_if_required(await response.json())
8882
return None
8983

9084

9185
@retry(**_DEFAULT_AIOHTTP_RETRY_POLICY)
9286
async def _abort_task(session: ClientSession, abort_url: URL) -> None:
9387
async with session.delete(abort_url) as response:
9488
response.raise_for_status()
95-
data, error = unwrap_envelope(await response.json())
96-
assert not error # nosec
97-
assert not data # nosec
98-
99-
100-
@dataclass(frozen=True)
101-
class LRTask:
102-
progress: TaskProgress
103-
_result: Coroutine[Any, Any, Any] | None = None
104-
105-
def done(self) -> bool:
106-
return self._result is not None
107-
108-
async def result(self) -> Any:
109-
if not self._result:
110-
msg = "No result ready!"
111-
raise ValueError(msg)
112-
return await self._result
11389

11490

11591
async def long_running_task_request(
11692
session: ClientSession,
11793
url: URL,
11894
json: RequestBody | None = None,
119-
client_timeout: int = 1 * _HOUR,
95+
client_timeout: int = 1 * HOUR,
12096
) -> AsyncGenerator[LRTask, None]:
12197
"""Will use the passed `ClientSession` to call an oSparc long
12298
running task `url` passing `json` as request body.
@@ -147,3 +123,6 @@ async def long_running_task_request(
147123
if task:
148124
await _abort_task(session, URL(task.abort_href))
149125
raise
126+
127+
128+
__all__: tuple[str, ...] = ("LRTask",)

packages/service-library/src/servicelib/archiving_utils/_interface_7zip.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from typing import Final
1111

1212
import tqdm
13-
from models_library.basic_types import IDStr
1413
from pydantic import NonNegativeInt
1514
from servicelib.logging_utils import log_catch
1615
from tqdm.contrib.logging import tqdm_logging_redirect
@@ -199,7 +198,7 @@ async def archive_dir(
199198
) -> None:
200199
if progress_bar is None:
201200
progress_bar = ProgressBarData(
202-
num_steps=1, description=IDStr(f"compressing {dir_to_compress.name}")
201+
num_steps=1, description=f"compressing {dir_to_compress.name}"
203202
)
204203

205204
options = " ".join(
@@ -223,7 +222,7 @@ async def archive_dir(
223222

224223
async with AsyncExitStack() as exit_stack:
225224
sub_progress = await exit_stack.enter_async_context(
226-
progress_bar.sub_progress(folder_size_bytes, description=IDStr("..."))
225+
progress_bar.sub_progress(folder_size_bytes, description="...")
227226
)
228227

229228
tqdm_progress = exit_stack.enter_context(
@@ -290,7 +289,7 @@ async def unarchive_dir(
290289
) -> set[Path]:
291290
if progress_bar is None:
292291
progress_bar = ProgressBarData(
293-
num_steps=1, description=IDStr(f"extracting {archive_to_extract.name}")
292+
num_steps=1, description=f"extracting {archive_to_extract.name}"
294293
)
295294

296295
# get archive information
@@ -304,7 +303,7 @@ async def unarchive_dir(
304303

305304
async with AsyncExitStack() as exit_stack:
306305
sub_prog = await exit_stack.enter_async_context(
307-
progress_bar.sub_progress(steps=total_bytes, description=IDStr("..."))
306+
progress_bar.sub_progress(steps=total_bytes, description="...")
308307
)
309308

310309
tqdm_progress = exit_stack.enter_context(

packages/service-library/src/servicelib/docker_utils.py

Lines changed: 76 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import logging
23
from collections.abc import Awaitable, Callable
34
from contextlib import AsyncExitStack
@@ -11,8 +12,21 @@
1112
from models_library.docker import DockerGenericTag
1213
from models_library.generated_models.docker_rest_api import ProgressDetail
1314
from models_library.utils.change_case import snake_to_camel
14-
from pydantic import BaseModel, ByteSize, ConfigDict, TypeAdapter, ValidationError
15+
from pydantic import (
16+
BaseModel,
17+
ByteSize,
18+
ConfigDict,
19+
NonNegativeInt,
20+
TypeAdapter,
21+
ValidationError,
22+
)
1523
from settings_library.docker_registry import RegistrySettings
24+
from tenacity import (
25+
retry,
26+
retry_if_exception_type,
27+
stop_after_attempt,
28+
wait_random_exponential,
29+
)
1630
from yarl import URL
1731

1832
from .logging_utils import LogLevelInt
@@ -209,6 +223,8 @@ async def pull_image(
209223
progress_bar: ProgressBarData,
210224
log_cb: LogCB,
211225
image_information: DockerImageManifestsV2 | None,
226+
*,
227+
retry_upon_error_count: NonNegativeInt = 10,
212228
) -> None:
213229
"""pull a docker image to the host machine.
214230
@@ -219,7 +235,9 @@ async def pull_image(
219235
progress_bar -- the current progress bar
220236
log_cb -- a callback function to send logs to
221237
image_information -- the image layer information. If this is None, then no fine progress will be retrieved.
238+
retry_upon_error_count -- number of tries if there is a TimeoutError. Usually cased by networking issues.
222239
"""
240+
223241
registry_auth = None
224242
if registry_settings.REGISTRY_URL and registry_settings.REGISTRY_URL in image:
225243
registry_auth = {
@@ -245,39 +263,64 @@ async def pull_image(
245263

246264
client = await exit_stack.enter_async_context(aiodocker.Docker())
247265

248-
reported_progress = 0.0
249-
async for pull_progress in client.images.pull(
250-
image, stream=True, auth=registry_auth
251-
):
252-
try:
253-
parsed_progress = TypeAdapter(_DockerPullImage).validate_python(
254-
pull_progress
266+
def _reset_progress_from_previous_attempt() -> None:
267+
for pulled_status in layer_id_to_size.values():
268+
pulled_status.downloaded = 0
269+
pulled_status.extracted = 0
270+
271+
attempt: NonNegativeInt = 1
272+
273+
@retry(
274+
wait=wait_random_exponential(),
275+
stop=stop_after_attempt(retry_upon_error_count),
276+
reraise=True,
277+
retry=retry_if_exception_type(asyncio.TimeoutError),
278+
)
279+
async def _pull_image_with_retry() -> None:
280+
nonlocal attempt
281+
if attempt > 1:
282+
# for each attempt rest the progress
283+
progress_bar.reset()
284+
_reset_progress_from_previous_attempt()
285+
attempt += 1
286+
287+
_logger.info("attempt '%s' trying to pull image='%s'", attempt, image)
288+
289+
reported_progress = 0.0
290+
async for pull_progress in client.images.pull(
291+
image, stream=True, auth=registry_auth
292+
):
293+
try:
294+
parsed_progress = TypeAdapter(_DockerPullImage).validate_python(
295+
pull_progress
296+
)
297+
except ValidationError:
298+
_logger.exception(
299+
"Unexpected error while validating '%s'. "
300+
"TIP: This is probably an unforeseen pull status text that shall be added to the code. "
301+
"The pulling process will still continue.",
302+
f"{pull_progress=}",
303+
)
304+
else:
305+
await _parse_pull_information(
306+
parsed_progress, layer_id_to_size=layer_id_to_size
307+
)
308+
309+
# compute total progress
310+
total_downloaded_size = sum(
311+
layer.downloaded for layer in layer_id_to_size.values()
255312
)
256-
except ValidationError:
257-
_logger.exception(
258-
"Unexpected error while validating '%s'. "
259-
"TIP: This is probably an unforeseen pull status text that shall be added to the code. "
260-
"The pulling process will still continue.",
261-
f"{pull_progress=}",
313+
total_extracted_size = sum(
314+
layer.extracted for layer in layer_id_to_size.values()
262315
)
263-
else:
264-
await _parse_pull_information(
265-
parsed_progress, layer_id_to_size=layer_id_to_size
316+
total_progress = (total_downloaded_size + total_extracted_size) / 2.0
317+
progress_to_report = total_progress - reported_progress
318+
await progress_bar.update(progress_to_report)
319+
reported_progress = total_progress
320+
321+
await log_cb(
322+
f"pulling {image_short_name}: {pull_progress}...",
323+
logging.DEBUG,
266324
)
267325

268-
# compute total progress
269-
total_downloaded_size = sum(
270-
layer.downloaded for layer in layer_id_to_size.values()
271-
)
272-
total_extracted_size = sum(
273-
layer.extracted for layer in layer_id_to_size.values()
274-
)
275-
total_progress = (total_downloaded_size + total_extracted_size) / 2.0
276-
progress_to_report = total_progress - reported_progress
277-
await progress_bar.update(progress_to_report)
278-
reported_progress = total_progress
279-
280-
await log_cb(
281-
f"pulling {image_short_name}: {pull_progress}...",
282-
logging.DEBUG,
283-
)
326+
await _pull_image_with_retry()

packages/service-library/src/servicelib/fastapi/docker_utils.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from typing import Final
44

55
import httpx
6-
from models_library.basic_types import IDStr
76
from models_library.docker import DockerGenericTag
87
from pydantic import ByteSize, TypeAdapter, ValidationError
98
from settings_library.docker_registry import RegistrySettings
@@ -129,9 +128,7 @@ async def pull_images(
129128
num_steps=images_total_size,
130129
progress_report_cb=progress_cb,
131130
progress_unit="Byte",
132-
description=IDStr(
133-
f"pulling {len(images)} images",
134-
),
131+
description=f"pulling {len(images)} images",
135132
) as pbar:
136133

137134
await asyncio.gather(

packages/service-library/src/servicelib/fastapi/long_running_tasks/_routes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ async def list_tasks(
3131

3232
@router.get(
3333
"/{task_id}",
34+
response_model=TaskStatus,
3435
responses={
3536
status.HTTP_404_NOT_FOUND: {"description": "Task does not exist"},
3637
},

0 commit comments

Comments
 (0)