Skip to content

Commit 547ee94

Browse files
GitHKAndrei Neagu
andauthored
🐛 retry pulling image layer when they timeout (#7051)
Co-authored-by: Andrei Neagu <[email protected]>
1 parent 97bc8af commit 547ee94

File tree

28 files changed

+186
-130
lines changed

28 files changed

+186
-130
lines changed

packages/models-library/src/models_library/progress_bar.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@
22

33
from pydantic import BaseModel, ConfigDict
44

5-
from .basic_types import IDStr
6-
75
# NOTE: keep a list of possible unit, and please use correct official unit names
86
ProgressUnit: TypeAlias = Literal["Byte"]
97

108

119
class ProgressStructuredMessage(BaseModel):
12-
description: IDStr
10+
description: str
1311
current: float
1412
total: int
1513
unit: str | None = None
@@ -51,6 +49,7 @@ class ProgressStructuredMessage(BaseModel):
5149
class ProgressReport(BaseModel):
5250
actual_value: float
5351
total: float = 1.0
52+
attempt: int = 0
5453
unit: ProgressUnit | None = UNITLESS
5554
message: ProgressStructuredMessage | None = None
5655

packages/service-library/src/servicelib/archiving_utils/_interface_7zip.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from typing import Final
1111

1212
import tqdm
13-
from models_library.basic_types import IDStr
1413
from pydantic import NonNegativeInt
1514
from servicelib.logging_utils import log_catch
1615
from tqdm.contrib.logging import tqdm_logging_redirect
@@ -199,7 +198,7 @@ async def archive_dir(
199198
) -> None:
200199
if progress_bar is None:
201200
progress_bar = ProgressBarData(
202-
num_steps=1, description=IDStr(f"compressing {dir_to_compress.name}")
201+
num_steps=1, description=f"compressing {dir_to_compress.name}"
203202
)
204203

205204
options = " ".join(
@@ -223,7 +222,7 @@ async def archive_dir(
223222

224223
async with AsyncExitStack() as exit_stack:
225224
sub_progress = await exit_stack.enter_async_context(
226-
progress_bar.sub_progress(folder_size_bytes, description=IDStr("..."))
225+
progress_bar.sub_progress(folder_size_bytes, description="...")
227226
)
228227

229228
tqdm_progress = exit_stack.enter_context(
@@ -290,7 +289,7 @@ async def unarchive_dir(
290289
) -> set[Path]:
291290
if progress_bar is None:
292291
progress_bar = ProgressBarData(
293-
num_steps=1, description=IDStr(f"extracting {archive_to_extract.name}")
292+
num_steps=1, description=f"extracting {archive_to_extract.name}"
294293
)
295294

296295
# get archive information
@@ -304,7 +303,7 @@ async def unarchive_dir(
304303

305304
async with AsyncExitStack() as exit_stack:
306305
sub_prog = await exit_stack.enter_async_context(
307-
progress_bar.sub_progress(steps=total_bytes, description=IDStr("..."))
306+
progress_bar.sub_progress(steps=total_bytes, description="...")
308307
)
309308

310309
tqdm_progress = exit_stack.enter_context(

packages/service-library/src/servicelib/docker_utils.py

Lines changed: 76 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import logging
23
from collections.abc import Awaitable, Callable
34
from contextlib import AsyncExitStack
@@ -11,8 +12,21 @@
1112
from models_library.docker import DockerGenericTag
1213
from models_library.generated_models.docker_rest_api import ProgressDetail
1314
from models_library.utils.change_case import snake_to_camel
14-
from pydantic import BaseModel, ByteSize, ConfigDict, TypeAdapter, ValidationError
15+
from pydantic import (
16+
BaseModel,
17+
ByteSize,
18+
ConfigDict,
19+
NonNegativeInt,
20+
TypeAdapter,
21+
ValidationError,
22+
)
1523
from settings_library.docker_registry import RegistrySettings
24+
from tenacity import (
25+
retry,
26+
retry_if_exception_type,
27+
stop_after_attempt,
28+
wait_random_exponential,
29+
)
1630
from yarl import URL
1731

1832
from .logging_utils import LogLevelInt
@@ -209,6 +223,8 @@ async def pull_image(
209223
progress_bar: ProgressBarData,
210224
log_cb: LogCB,
211225
image_information: DockerImageManifestsV2 | None,
226+
*,
227+
retry_upon_error_count: NonNegativeInt = 10,
212228
) -> None:
213229
"""pull a docker image to the host machine.
214230
@@ -219,7 +235,9 @@ async def pull_image(
219235
progress_bar -- the current progress bar
220236
log_cb -- a callback function to send logs to
221237
image_information -- the image layer information. If this is None, then no fine progress will be retrieved.
238+
retry_upon_error_count -- number of tries if there is a TimeoutError. Usually cased by networking issues.
222239
"""
240+
223241
registry_auth = None
224242
if registry_settings.REGISTRY_URL and registry_settings.REGISTRY_URL in image:
225243
registry_auth = {
@@ -245,39 +263,64 @@ async def pull_image(
245263

246264
client = await exit_stack.enter_async_context(aiodocker.Docker())
247265

248-
reported_progress = 0.0
249-
async for pull_progress in client.images.pull(
250-
image, stream=True, auth=registry_auth
251-
):
252-
try:
253-
parsed_progress = TypeAdapter(_DockerPullImage).validate_python(
254-
pull_progress
266+
def _reset_progress_from_previous_attempt() -> None:
267+
for pulled_status in layer_id_to_size.values():
268+
pulled_status.downloaded = 0
269+
pulled_status.extracted = 0
270+
271+
attempt: NonNegativeInt = 1
272+
273+
@retry(
274+
wait=wait_random_exponential(),
275+
stop=stop_after_attempt(retry_upon_error_count),
276+
reraise=True,
277+
retry=retry_if_exception_type(asyncio.TimeoutError),
278+
)
279+
async def _pull_image_with_retry() -> None:
280+
nonlocal attempt
281+
if attempt > 1:
282+
# for each attempt rest the progress
283+
progress_bar.reset()
284+
_reset_progress_from_previous_attempt()
285+
attempt += 1
286+
287+
_logger.info("attempt '%s' trying to pull image='%s'", attempt, image)
288+
289+
reported_progress = 0.0
290+
async for pull_progress in client.images.pull(
291+
image, stream=True, auth=registry_auth
292+
):
293+
try:
294+
parsed_progress = TypeAdapter(_DockerPullImage).validate_python(
295+
pull_progress
296+
)
297+
except ValidationError:
298+
_logger.exception(
299+
"Unexpected error while validating '%s'. "
300+
"TIP: This is probably an unforeseen pull status text that shall be added to the code. "
301+
"The pulling process will still continue.",
302+
f"{pull_progress=}",
303+
)
304+
else:
305+
await _parse_pull_information(
306+
parsed_progress, layer_id_to_size=layer_id_to_size
307+
)
308+
309+
# compute total progress
310+
total_downloaded_size = sum(
311+
layer.downloaded for layer in layer_id_to_size.values()
255312
)
256-
except ValidationError:
257-
_logger.exception(
258-
"Unexpected error while validating '%s'. "
259-
"TIP: This is probably an unforeseen pull status text that shall be added to the code. "
260-
"The pulling process will still continue.",
261-
f"{pull_progress=}",
313+
total_extracted_size = sum(
314+
layer.extracted for layer in layer_id_to_size.values()
262315
)
263-
else:
264-
await _parse_pull_information(
265-
parsed_progress, layer_id_to_size=layer_id_to_size
316+
total_progress = (total_downloaded_size + total_extracted_size) / 2.0
317+
progress_to_report = total_progress - reported_progress
318+
await progress_bar.update(progress_to_report)
319+
reported_progress = total_progress
320+
321+
await log_cb(
322+
f"pulling {image_short_name}: {pull_progress}...",
323+
logging.DEBUG,
266324
)
267325

268-
# compute total progress
269-
total_downloaded_size = sum(
270-
layer.downloaded for layer in layer_id_to_size.values()
271-
)
272-
total_extracted_size = sum(
273-
layer.extracted for layer in layer_id_to_size.values()
274-
)
275-
total_progress = (total_downloaded_size + total_extracted_size) / 2.0
276-
progress_to_report = total_progress - reported_progress
277-
await progress_bar.update(progress_to_report)
278-
reported_progress = total_progress
279-
280-
await log_cb(
281-
f"pulling {image_short_name}: {pull_progress}...",
282-
logging.DEBUG,
283-
)
326+
await _pull_image_with_retry()

packages/service-library/src/servicelib/fastapi/docker_utils.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from typing import Final
44

55
import httpx
6-
from models_library.basic_types import IDStr
76
from models_library.docker import DockerGenericTag
87
from pydantic import ByteSize, TypeAdapter, ValidationError
98
from settings_library.docker_registry import RegistrySettings
@@ -129,9 +128,7 @@ async def pull_images(
129128
num_steps=images_total_size,
130129
progress_report_cb=progress_cb,
131130
progress_unit="Byte",
132-
description=IDStr(
133-
f"pulling {len(images)} images",
134-
),
131+
description=f"pulling {len(images)} images",
135132
) as pbar:
136133

137134
await asyncio.gather(

packages/service-library/src/servicelib/progress_bar.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from inspect import isawaitable
55
from typing import Final, Optional, Protocol, runtime_checkable
66

7-
from models_library.basic_types import IDStr
87
from models_library.progress_bar import (
98
ProgressReport,
109
ProgressStructuredMessage,
@@ -18,6 +17,7 @@
1817
_MIN_PROGRESS_UPDATE_PERCENT: Final[float] = 0.01
1918
_INITIAL_VALUE: Final[float] = -1.0
2019
_FINAL_VALUE: Final[float] = 1.0
20+
_PROGRESS_ALREADY_REACGED_MAXIMUM: Final[str] = "Progress already reached maximum of"
2121

2222

2323
@runtime_checkable
@@ -84,10 +84,11 @@ async def main_fct():
8484
"description": "Optionally defines the step relative weight (defaults to steps of equal weights)"
8585
},
8686
)
87-
description: IDStr = field(metadata={"description": "define the progress name"})
87+
description: str = field(metadata={"description": "define the progress name"})
8888
progress_unit: ProgressUnit | None = None
8989
progress_report_cb: AsyncReportCB | ReportCB | None = None
9090
_current_steps: float = _INITIAL_VALUE
91+
_currnet_attempt: int = 0
9192
_children: list["ProgressBarData"] = field(default_factory=list)
9293
_parent: Optional["ProgressBarData"] = None
9394
_continuous_value_lock: asyncio.Lock = field(init=False)
@@ -147,6 +148,7 @@ async def _report_external(self, value: float) -> None:
147148
# NOTE: here we convert back to actual value since this is possibly weighted
148149
actual_value=value * self.num_steps,
149150
total=self.num_steps,
151+
attempt=self._currnet_attempt,
150152
unit=self.progress_unit,
151153
message=self.compute_report_message_stuct(),
152154
),
@@ -176,7 +178,7 @@ async def update(self, steps: float = 1) -> None:
176178
if new_steps_value > self.num_steps:
177179
_logger.warning(
178180
"%s",
179-
f"Progress already reached maximum of {self.num_steps=}, "
181+
f"{_PROGRESS_ALREADY_REACGED_MAXIMUM} {self.num_steps=}, "
180182
f"cause: {self._current_steps=} is updated by {steps=}"
181183
"TIP: sub progresses are not created correctly please check the stack trace",
182184
stack_info=True,
@@ -197,6 +199,11 @@ async def update(self, steps: float = 1) -> None:
197199
await self._update_parent(parent_update_value)
198200
await self._report_external(new_progress_value)
199201

202+
def reset(self) -> None:
203+
self._currnet_attempt += 1
204+
self._current_steps = _INITIAL_VALUE
205+
self._last_report_value = _INITIAL_VALUE
206+
200207
async def set_(self, new_value: float) -> None:
201208
await self.update(new_value - self._current_steps)
202209

@@ -207,7 +214,7 @@ async def finish(self) -> None:
207214
def sub_progress(
208215
self,
209216
steps: int,
210-
description: IDStr,
217+
description: str,
211218
step_weights: list[float] | None = None,
212219
progress_unit: ProgressUnit | None = None,
213220
) -> "ProgressBarData":

packages/service-library/tests/aiohttp/test_docker_utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,15 @@ def _assert_progress_report_values(
9595
# NOTE: we exclude the message part here as this is already tested in servicelib
9696
# check first progress
9797
assert mocked_progress_cb.call_args_list[0].args[0].dict(
98-
exclude={"message"}
98+
exclude={"message", "attempt"}
9999
) == ProgressReport(actual_value=0, total=total, unit="Byte").model_dump(
100-
exclude={"message"}
100+
exclude={"message", "attempt"}
101101
)
102102
# check last progress
103103
assert mocked_progress_cb.call_args_list[-1].args[0].dict(
104-
exclude={"message"}
104+
exclude={"message", "attempt"}
105105
) == ProgressReport(actual_value=total, total=total, unit="Byte").model_dump(
106-
exclude={"message"}
106+
exclude={"message", "attempt"}
107107
)
108108

109109

packages/service-library/tests/fastapi/test_docker_utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,15 @@ def _assert_progress_report_values(
101101
# NOTE: we exclude the message part here as this is already tested in servicelib
102102
# check first progress
103103
assert mocked_progress_cb.call_args_list[0].args[0].dict(
104-
exclude={"message"}
104+
exclude={"message", "attempt"}
105105
) == ProgressReport(actual_value=0, total=total, unit="Byte").model_dump(
106-
exclude={"message"}
106+
exclude={"message", "attempt"}
107107
)
108108
# check last progress
109109
assert mocked_progress_cb.call_args_list[-1].args[0].dict(
110-
exclude={"message"}
110+
exclude={"message", "attempt"}
111111
) == ProgressReport(actual_value=total, total=total, unit="Byte").model_dump(
112-
exclude={"message"}
112+
exclude={"message", "attempt"}
113113
)
114114

115115

packages/service-library/tests/test_progress_bar.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from servicelib.progress_bar import (
1616
_INITIAL_VALUE,
1717
_MIN_PROGRESS_UPDATE_PERCENT,
18+
_PROGRESS_ALREADY_REACGED_MAXIMUM,
1819
ProgressBarData,
1920
)
2021

@@ -258,10 +259,43 @@ async def test_set_progress(caplog: pytest.LogCaptureFixture, faker: Faker):
258259
assert root._current_steps == pytest.approx(34) # noqa: SLF001
259260
await root.set_(58)
260261
assert root._current_steps == pytest.approx(50) # noqa: SLF001
261-
assert "already reached maximum" in caplog.messages[0]
262+
assert "WARNING" in caplog.text
263+
assert _PROGRESS_ALREADY_REACGED_MAXIMUM in caplog.messages[0]
262264
assert "TIP:" in caplog.messages[0]
263265

264266

267+
async def test_reset_progress(caplog: pytest.LogCaptureFixture, faker: Faker):
268+
async with ProgressBarData(num_steps=50, description=faker.pystr()) as root:
269+
assert root._current_steps == pytest.approx(0) # noqa: SLF001
270+
assert root.num_steps == 50
271+
assert root.step_weights is None
272+
await root.set_(50)
273+
assert root._current_steps == pytest.approx(50) # noqa: SLF001
274+
assert "WARNING" not in caplog.text
275+
assert _PROGRESS_ALREADY_REACGED_MAXIMUM not in caplog.text
276+
await root.set_(51)
277+
assert root._current_steps == pytest.approx(50) # noqa: SLF001
278+
assert "WARNING" in caplog.text
279+
assert _PROGRESS_ALREADY_REACGED_MAXIMUM in caplog.text
280+
281+
caplog.clear()
282+
root.reset()
283+
284+
assert root._current_steps == pytest.approx(-1) # noqa: SLF001
285+
assert "WARNING" not in caplog.text
286+
assert _PROGRESS_ALREADY_REACGED_MAXIMUM not in caplog.text
287+
288+
await root.set_(12)
289+
assert root._current_steps == pytest.approx(12) # noqa: SLF001
290+
assert "WARNING" not in caplog.text
291+
assert _PROGRESS_ALREADY_REACGED_MAXIMUM not in caplog.text
292+
293+
await root.set_(51)
294+
assert root._current_steps == pytest.approx(50) # noqa: SLF001
295+
assert "WARNING" in caplog.text
296+
assert _PROGRESS_ALREADY_REACGED_MAXIMUM in caplog.text
297+
298+
265299
async def test_concurrent_progress_bar(faker: Faker):
266300
async def do_something(root: ProgressBarData):
267301
async with root.sub_progress(steps=50, description=faker.pystr()) as sub:
@@ -304,7 +338,7 @@ async def test_too_many_updates_does_not_raise_but_show_warning_with_stack(
304338
await root.update()
305339
await root.update()
306340
await root.update()
307-
assert "already reached maximum" in caplog.messages[0]
341+
assert _PROGRESS_ALREADY_REACGED_MAXIMUM in caplog.messages[0]
308342
assert "TIP:" in caplog.messages[0]
309343

310344

0 commit comments

Comments
 (0)