|
7 | 7 | import time |
8 | 8 | import traceback |
9 | 9 | from contextlib import asynccontextmanager |
10 | | -from copy import deepcopy |
11 | 10 | from typing import Any, AsyncIterator, Dict, List, Mapping, Optional, Set, Tuple |
12 | 11 |
|
13 | 12 | import aiodocker |
14 | 13 | from aiodocker.utils import clean_filters, clean_map |
| 14 | +from fastapi import status |
15 | 15 | from models_library.projects import ProjectID |
16 | 16 | from models_library.projects_nodes_io import NodeID |
17 | 17 | from models_library.users import UserID |
18 | 18 | from packaging import version |
19 | 19 | from servicelib.utils import logged_gather |
| 20 | +from tenacity._asyncio import AsyncRetrying |
| 21 | +from tenacity.retry import retry_if_exception_type |
| 22 | +from tenacity.stop import stop_after_attempt |
| 23 | +from tenacity.wait import wait_exponential |
20 | 24 |
|
21 | 25 | from ...core.settings import DynamicSidecarSettings |
22 | 26 | from ...models.schemas.constants import ( |
@@ -79,6 +83,10 @@ async def _custom_volumes_get(self, id): # pylint: disable=redefined-builtin |
79 | 83 | _monkey_patch_aiodocker() |
80 | 84 |
|
81 | 85 |
|
| 86 | +class _RetryError(Exception): |
| 87 | + pass |
| 88 | + |
| 89 | + |
82 | 90 | @asynccontextmanager |
83 | 91 | async def docker_client() -> AsyncIterator[aiodocker.docker.Docker]: |
84 | 92 | client = None |
@@ -519,36 +527,50 @@ async def update_scheduler_data_label(scheduler_data: SchedulerData) -> None: |
519 | 527 | # NOTE: builtin `DockerServices.update` function is very limited. |
520 | 528 | # Using the same pattern but updating labels |
521 | 529 |
|
522 | | - try: |
523 | | - # fetch information from service name |
524 | | - service_inspect = await client.services.inspect(scheduler_data.service_name) |
525 | | - service_version = service_inspect["Version"]["Index"] |
526 | | - service_id = service_inspect["ID"] |
527 | | - spec = service_inspect["Spec"] |
528 | | - |
529 | | - # compose_spec needs to be json encoded |
530 | | - # before encoding it to json and storing it |
531 | | - # in the label |
532 | | - scheduler_data_copy = deepcopy(scheduler_data) |
533 | | - scheduler_data_copy.compose_spec = json.dumps( |
534 | | - scheduler_data_copy.compose_spec |
535 | | - ) |
536 | | - label_data = scheduler_data_copy.json() |
537 | | - spec["Labels"][DYNAMIC_SIDECAR_SCHEDULER_DATA_LABEL] = label_data |
538 | | - |
539 | | - await client._query_json( # pylint: disable=protected-access |
540 | | - f"services/{service_id}/update", |
541 | | - method="POST", |
542 | | - data=json.dumps(clean_map(spec)), |
543 | | - params={"version": service_version}, |
544 | | - ) |
545 | | - except aiodocker.exceptions.DockerError as e: |
546 | | - if not ( |
547 | | - e.status == 404 |
548 | | - and e.message == f"service {scheduler_data.service_name} not found" |
549 | | - ): |
550 | | - raise e |
551 | | - log.debug( |
552 | | - "Skip update for service '%s' which could not be found", |
553 | | - scheduler_data.service_name, |
554 | | - ) |
| 530 | + # The docker service update API is async, so `update out of sequence` error |
| 531 | + # might get raised. This is caused by the `service_version` being out of sync |
| 532 | + # with what is currently stored in the docker daemon. |
| 533 | + async for attempt in AsyncRetrying( |
| 534 | + # waits 1, 4, 8 seconds between retries and gives up |
| 535 | + stop=stop_after_attempt(3), |
| 536 | + wait=wait_exponential(), |
| 537 | + retry=retry_if_exception_type(_RetryError), |
| 538 | + reraise=True, |
| 539 | + ): |
| 540 | + with attempt: |
| 541 | + try: |
| 542 | + # fetch information from service name |
| 543 | + service_inspect = await client.services.inspect( |
| 544 | + scheduler_data.service_name |
| 545 | + ) |
| 546 | + service_version = service_inspect["Version"]["Index"] |
| 547 | + service_id = service_inspect["ID"] |
| 548 | + spec = service_inspect["Spec"] |
| 549 | + |
| 550 | + spec["Labels"][ |
| 551 | + DYNAMIC_SIDECAR_SCHEDULER_DATA_LABEL |
| 552 | + ] = scheduler_data.as_label_data() |
| 553 | + |
| 554 | + await client._query_json( # pylint: disable=protected-access |
| 555 | + f"services/{service_id}/update", |
| 556 | + method="POST", |
| 557 | + data=json.dumps(clean_map(spec)), |
| 558 | + params={"version": service_version}, |
| 559 | + ) |
| 560 | + except aiodocker.exceptions.DockerError as e: |
| 561 | + if not ( |
| 562 | + e.status == status.HTTP_404_NOT_FOUND |
| 563 | + and e.message |
| 564 | + == f"service {scheduler_data.service_name} not found" |
| 565 | + ): |
| 566 | + raise e |
| 567 | + if ( |
| 568 | + e.status == status.HTTP_500_INTERNAL_SERVER_ERROR |
| 569 | + and e.message |
| 570 | + == "rpc error: code = Unknown desc = update out of sequence" |
| 571 | + ): |
| 572 | + raise _RetryError() from e |
| 573 | + log.debug( |
| 574 | + "Skip update for service '%s' which could not be found", |
| 575 | + scheduler_data.service_name, |
| 576 | + ) |
0 commit comments