Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def __init__(self, app: FastAPI, async_client: AsyncClient, base_url: str):
"""
self.app = app
self._async_client = async_client
self._base_url = base_url
self.base_url = base_url

@property
def _client_configuration(self) -> ClientConfiguration:
Expand All @@ -129,7 +129,7 @@ def _client_configuration(self) -> ClientConfiguration:

def _get_url(self, path: str) -> str:
url_path = f"{self._client_configuration.router_prefix}{path}".lstrip("/")
url = TypeAdapter(AnyHttpUrl).validate_python(f"{self._base_url}{url_path}")
url = TypeAdapter(AnyHttpUrl).validate_python(f"{self.base_url}{url_path}")
return f"{url}"

@retry_on_http_errors
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import asyncio
from asyncio.log import logger
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any, Final

from pydantic import PositiveFloat
from servicelib.logging_errors import create_troubleshotting_log_message

from ...long_running_tasks.errors import TaskClientTimeoutError
from ...long_running_tasks.errors import TaskClientTimeoutError, TaskExceptionError
from ...long_running_tasks.models import (
ProgressCallback,
ProgressMessage,
Expand All @@ -16,6 +17,8 @@
)
from ._client import Client

_logger = logging.getLogger(__name__)

# NOTE: very short running requests are involved
MAX_CONCURRENCY: Final[int] = 10

Expand Down Expand Up @@ -96,7 +99,7 @@ async def periodic_task_result(

async def _status_update() -> TaskStatus:
task_status: TaskStatus = await client.get_task_status(task_id)
logger.debug("Task status %s", task_status.model_dump_json())
_logger.debug("Task status %s", task_status.model_dump_json())
await progress_manager.update(
task_id=task_id,
message=task_status.task_progress.message,
Expand All @@ -114,7 +117,7 @@ async def _wait_for_task_result() -> Any:

try:
result = await asyncio.wait_for(_wait_for_task_result(), timeout=task_timeout)
logger.debug("%s, %s", f"{task_id=}", f"{result=}")
_logger.debug("%s, %s", f"{task_id=}", f"{result=}")

yield result
except TimeoutError as e:
Expand All @@ -124,3 +127,13 @@ async def _wait_for_task_result() -> Any:
timeout=task_timeout,
exception=e,
) from e
except Exception as e:
error = TaskExceptionError(task_id=task_id, exception=e, traceback="")
_logger.warning(
create_troubleshotting_log_message(
user_error_msg=f"{task_id=} raised an exception",
error=e,
tip=f"Check the logs of the service responding to '{client.base_url}'",
)
)
raise error from e
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import logging
from typing import Any

from common_library.error_codes import create_error_code
from servicelib.logging_errors import create_troubleshotting_log_kwargs

from .errors import TaskNotCompletedError, TaskNotFoundError
from .models import TaskBase, TaskId, TaskStatus
from .task import TaskContext, TasksManager, TrackedTask

_logger = logging.getLogger(__name__)


def list_tasks(
tasks_manager: TasksManager, task_context: TaskContext | None
Expand All @@ -25,14 +32,29 @@ async def get_task_result(
tasks_manager: TasksManager, task_context: TaskContext | None, task_id: TaskId
) -> Any:
try:
return tasks_manager.get_task_result(
task_id=task_id, with_task_context=task_context
task_result = tasks_manager.get_task_result(
task_id, with_task_context=task_context
)
await tasks_manager.remove_task(
task_id, with_task_context=task_context, reraise_errors=False
)
return task_result
except (TaskNotFoundError, TaskNotCompletedError):
raise
except Exception as exc:
_logger.exception(
**create_troubleshotting_log_kwargs(
user_error_msg=f"{task_id=} raised an exception",
error=exc,
error_code=create_error_code(exc),
error_context={"task_context": task_context, "task_id": task_id},
),
)
finally:
# the task is always removed even if an error occurs
# the task shall be removed in this case
await tasks_manager.remove_task(
task_id, with_task_context=task_context, reraise_errors=False
)
raise


async def remove_task(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from servicelib.fastapi.long_running_tasks.server import setup as setup_server
from servicelib.long_running_tasks.errors import (
TaskClientTimeoutError,
TaskExceptionError,
)
from servicelib.long_running_tasks.models import (
ProgressMessage,
Expand Down Expand Up @@ -148,7 +149,7 @@ async def test_task_result_task_result_is_an_error(

url = TypeAdapter(AnyHttpUrl).validate_python("http://backgroud.testserver.io/")
client = Client(app=bg_task_app, async_client=async_client, base_url=url)
with pytest.raises(RuntimeError, match="I am failing as requested"):
with pytest.raises(TaskExceptionError, match="I am failing as requested"):
async with periodic_task_result(
client,
task_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from models_library.users import UserID
from servicelib.fastapi.http_client_thin import BaseHttpClientError
from servicelib.logging_utils import log_context
from servicelib.long_running_tasks.errors import TaskExceptionError
from servicelib.long_running_tasks.models import ProgressCallback, TaskProgress
from servicelib.rabbitmq import RabbitMQClient
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
Expand Down Expand Up @@ -134,7 +135,7 @@ async def service_remove_containers(
await sidecars_client.stop_service(
scheduler_data.endpoint, progress_callback=progress_callback
)
except BaseHttpClientError as e:
except (BaseHttpClientError, TaskExceptionError) as e:
_logger.info(
(
"Could not remove service containers for %s. "
Expand All @@ -151,7 +152,7 @@ async def service_free_reserved_disk_space(
scheduler_data: SchedulerData = _get_scheduler_data(app, node_id)
try:
await sidecars_client.free_reserved_disk_space(scheduler_data.endpoint)
except BaseHttpClientError as e:
except (BaseHttpClientError, TaskExceptionError) as e:
_logger.info(
(
"Could not remove service containers for %s. "
Expand Down Expand Up @@ -369,7 +370,7 @@ async def attempt_pod_removal_and_data_saving(
scheduler_data.dynamic_sidecar.were_state_and_outputs_saved = True

_logger.info("dynamic-sidecar saved: state and output ports")
except BaseHttpClientError as e:
except (BaseHttpClientError, TaskExceptionError) as e:
_logger.error( # noqa: TRY400
(
"Could not contact dynamic-sidecar to save service "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict
from servicelib.fastapi.long_running_tasks.client import Client, periodic_task_result
from servicelib.fastapi.long_running_tasks.client import setup as client_setup
from servicelib.long_running_tasks.errors import TaskExceptionError
from servicelib.long_running_tasks.models import TaskId
from simcore_sdk.node_ports_common.exceptions import NodeNotFound
from simcore_service_dynamic_sidecar._meta import API_VTAG
Expand All @@ -42,10 +43,7 @@
from simcore_service_dynamic_sidecar.models.shared_store import SharedStore
from simcore_service_dynamic_sidecar.modules.inputs import enable_inputs_pulling
from simcore_service_dynamic_sidecar.modules.outputs._context import OutputsContext
from simcore_service_dynamic_sidecar.modules.outputs._manager import (
OutputsManager,
UploadPortsFailedError,
)
from simcore_service_dynamic_sidecar.modules.outputs._manager import OutputsManager

FAST_STATUS_POLL: Final[float] = 0.1
CREATE_SERVICE_CONTAINERS_TIMEOUT: Final[float] = 60
Expand Down Expand Up @@ -681,7 +679,7 @@ async def _test_code() -> None:
if not mock_port_keys:
await _test_code()
else:
with pytest.raises(UploadPortsFailedError) as exec_info:
with pytest.raises(TaskExceptionError) as exec_info:
await _test_code()
assert f"the node id {missing_node_uuid} was not found" in f"{exec_info.value}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict
from servicelib.fastapi.long_running_tasks.client import Client, periodic_task_result
from servicelib.fastapi.long_running_tasks.client import setup as client_setup
from servicelib.long_running_tasks.errors import TaskExceptionError
from servicelib.long_running_tasks.models import TaskId
from simcore_service_dynamic_sidecar._meta import API_VTAG
from simcore_service_dynamic_sidecar.core.docker_utils import get_container_states
Expand Down Expand Up @@ -258,7 +259,7 @@ async def test_user_services_fail_to_start(
with_compose_down: bool,
mock_user_services_fail_to_start: None,
):
with pytest.raises(RuntimeError):
with pytest.raises(TaskExceptionError):
async with periodic_task_result(
client=client,
task_id=await _get_task_id_create_service_containers(
Expand Down Expand Up @@ -314,7 +315,7 @@ async def test_user_services_fail_to_stop_or_save_data(
# in case of manual intervention multiple stops will be sent
_EXPECTED_STOP_MESSAGES = 4
for _ in range(_EXPECTED_STOP_MESSAGES):
with pytest.raises(RuntimeError):
with pytest.raises(TaskExceptionError):
async with periodic_task_result(
client=client,
task_id=await _get_task_id_docker_compose_down(httpx_async_client),
Expand Down
Loading