diff --git a/packages/service-library/src/servicelib/fastapi/long_running_tasks/_client.py b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_client.py index a344e6b09a1..5a17014e209 100644 --- a/packages/service-library/src/servicelib/fastapi/long_running_tasks/_client.py +++ b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_client.py @@ -120,7 +120,7 @@ def __init__(self, app: FastAPI, async_client: AsyncClient, base_url: str): """ self.app = app self._async_client = async_client - self._base_url = base_url + self.base_url = base_url @property def _client_configuration(self) -> ClientConfiguration: @@ -129,7 +129,7 @@ def _client_configuration(self) -> ClientConfiguration: def _get_url(self, path: str) -> str: url_path = f"{self._client_configuration.router_prefix}{path}".lstrip("/") - url = TypeAdapter(AnyHttpUrl).validate_python(f"{self._base_url}{url_path}") + url = TypeAdapter(AnyHttpUrl).validate_python(f"{self.base_url}{url_path}") return f"{url}" @retry_on_http_errors diff --git a/packages/service-library/src/servicelib/fastapi/long_running_tasks/_context_manager.py b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_context_manager.py index 3cc1b65c8f6..6a33e4ed581 100644 --- a/packages/service-library/src/servicelib/fastapi/long_running_tasks/_context_manager.py +++ b/packages/service-library/src/servicelib/fastapi/long_running_tasks/_context_manager.py @@ -1,12 +1,13 @@ import asyncio -from asyncio.log import logger +import logging from collections.abc import AsyncIterator from contextlib import asynccontextmanager from typing import Any, Final from pydantic import PositiveFloat +from servicelib.logging_errors import create_troubleshotting_log_message -from ...long_running_tasks.errors import TaskClientTimeoutError +from ...long_running_tasks.errors import TaskClientTimeoutError, TaskExceptionError from ...long_running_tasks.models import ( ProgressCallback, ProgressMessage, @@ -16,6 +17,8 @@ ) from ._client import Client +_logger = logging.getLogger(__name__) + # NOTE: very short running requests are involved MAX_CONCURRENCY: Final[int] = 10 @@ -96,7 +99,7 @@ async def periodic_task_result( async def _status_update() -> TaskStatus: task_status: TaskStatus = await client.get_task_status(task_id) - logger.debug("Task status %s", task_status.model_dump_json()) + _logger.debug("Task status %s", task_status.model_dump_json()) await progress_manager.update( task_id=task_id, message=task_status.task_progress.message, @@ -114,7 +117,7 @@ async def _wait_for_task_result() -> Any: try: result = await asyncio.wait_for(_wait_for_task_result(), timeout=task_timeout) - logger.debug("%s, %s", f"{task_id=}", f"{result=}") + _logger.debug("%s, %s", f"{task_id=}", f"{result=}") yield result except TimeoutError as e: @@ -124,3 +127,13 @@ async def _wait_for_task_result() -> Any: timeout=task_timeout, exception=e, ) from e + except Exception as e: + error = TaskExceptionError(task_id=task_id, exception=e, traceback="") + _logger.warning( + create_troubleshotting_log_message( + user_error_msg=f"{task_id=} raised an exception", + error=e, + tip=f"Check the logs of the service responding to '{client.base_url}'", + ) + ) + raise error from e diff --git a/packages/service-library/src/servicelib/long_running_tasks/http_endpoint_responses.py b/packages/service-library/src/servicelib/long_running_tasks/http_endpoint_responses.py index be873f1a1a2..5256849541b 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/http_endpoint_responses.py +++ b/packages/service-library/src/servicelib/long_running_tasks/http_endpoint_responses.py @@ -1,8 +1,15 @@ +import logging from typing import Any +from common_library.error_codes import create_error_code +from servicelib.logging_errors import create_troubleshotting_log_kwargs + +from .errors import TaskNotCompletedError, TaskNotFoundError from .models import TaskBase, TaskId, TaskStatus from .task import TaskContext, TasksManager, TrackedTask +_logger = logging.getLogger(__name__) + def list_tasks( tasks_manager: TasksManager, task_context: TaskContext | None @@ -25,14 +32,29 @@ async def get_task_result( tasks_manager: TasksManager, task_context: TaskContext | None, task_id: TaskId ) -> Any: try: - return tasks_manager.get_task_result( - task_id=task_id, with_task_context=task_context + task_result = tasks_manager.get_task_result( + task_id, with_task_context=task_context + ) + await tasks_manager.remove_task( + task_id, with_task_context=task_context, reraise_errors=False + ) + return task_result + except (TaskNotFoundError, TaskNotCompletedError): + raise + except Exception as exc: + _logger.exception( + **create_troubleshotting_log_kwargs( + user_error_msg=f"{task_id=} raised an exception", + error=exc, + error_code=create_error_code(exc), + error_context={"task_context": task_context, "task_id": task_id}, + ), ) - finally: - # the task is always removed even if an error occurs + # the task shall be removed in this case await tasks_manager.remove_task( task_id, with_task_context=task_context, reraise_errors=False ) + raise async def remove_task( diff --git a/packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks_context_manager.py b/packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks_context_manager.py index 9072a70ddc0..32d884d1ff8 100644 --- a/packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks_context_manager.py +++ b/packages/service-library/tests/fastapi/long_running_tasks/test_long_running_tasks_context_manager.py @@ -18,6 +18,7 @@ from servicelib.fastapi.long_running_tasks.server import setup as setup_server from servicelib.long_running_tasks.errors import ( TaskClientTimeoutError, + TaskExceptionError, ) from servicelib.long_running_tasks.models import ( ProgressMessage, @@ -148,7 +149,7 @@ async def test_task_result_task_result_is_an_error( url = TypeAdapter(AnyHttpUrl).validate_python("http://backgroud.testserver.io/") client = Client(app=bg_task_app, async_client=async_client, base_url=url) - with pytest.raises(RuntimeError, match="I am failing as requested"): + with pytest.raises(TaskExceptionError, match="I am failing as requested"): async with periodic_task_result( client, task_id, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py index 6aa648b31e0..2caa9f7a237 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py @@ -21,6 +21,7 @@ from models_library.users import UserID from servicelib.fastapi.http_client_thin import BaseHttpClientError from servicelib.logging_utils import log_context +from servicelib.long_running_tasks.errors import TaskExceptionError from servicelib.long_running_tasks.models import ProgressCallback, TaskProgress from servicelib.rabbitmq import RabbitMQClient from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient @@ -134,7 +135,7 @@ async def service_remove_containers( await sidecars_client.stop_service( scheduler_data.endpoint, progress_callback=progress_callback ) - except BaseHttpClientError as e: + except (BaseHttpClientError, TaskExceptionError) as e: _logger.info( ( "Could not remove service containers for %s. " @@ -151,7 +152,7 @@ async def service_free_reserved_disk_space( scheduler_data: SchedulerData = _get_scheduler_data(app, node_id) try: await sidecars_client.free_reserved_disk_space(scheduler_data.endpoint) - except BaseHttpClientError as e: + except (BaseHttpClientError, TaskExceptionError) as e: _logger.info( ( "Could not remove service containers for %s. " @@ -369,7 +370,7 @@ async def attempt_pod_removal_and_data_saving( scheduler_data.dynamic_sidecar.were_state_and_outputs_saved = True _logger.info("dynamic-sidecar saved: state and output ports") - except BaseHttpClientError as e: + except (BaseHttpClientError, TaskExceptionError) as e: _logger.error( # noqa: TRY400 ( "Could not contact dynamic-sidecar to save service " diff --git a/services/dynamic-sidecar/tests/unit/test_api_rest_containers_long_running_tasks.py b/services/dynamic-sidecar/tests/unit/test_api_rest_containers_long_running_tasks.py index b00491ce764..b2b1d005266 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_rest_containers_long_running_tasks.py +++ b/services/dynamic-sidecar/tests/unit/test_api_rest_containers_long_running_tasks.py @@ -30,6 +30,7 @@ from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict from servicelib.fastapi.long_running_tasks.client import Client, periodic_task_result from servicelib.fastapi.long_running_tasks.client import setup as client_setup +from servicelib.long_running_tasks.errors import TaskExceptionError from servicelib.long_running_tasks.models import TaskId from simcore_sdk.node_ports_common.exceptions import NodeNotFound from simcore_service_dynamic_sidecar._meta import API_VTAG @@ -42,10 +43,7 @@ from simcore_service_dynamic_sidecar.models.shared_store import SharedStore from simcore_service_dynamic_sidecar.modules.inputs import enable_inputs_pulling from simcore_service_dynamic_sidecar.modules.outputs._context import OutputsContext -from simcore_service_dynamic_sidecar.modules.outputs._manager import ( - OutputsManager, - UploadPortsFailedError, -) +from simcore_service_dynamic_sidecar.modules.outputs._manager import OutputsManager FAST_STATUS_POLL: Final[float] = 0.1 CREATE_SERVICE_CONTAINERS_TIMEOUT: Final[float] = 60 @@ -681,7 +679,7 @@ async def _test_code() -> None: if not mock_port_keys: await _test_code() else: - with pytest.raises(UploadPortsFailedError) as exec_info: + with pytest.raises(TaskExceptionError) as exec_info: await _test_code() assert f"the node id {missing_node_uuid} was not found" in f"{exec_info.value}" diff --git a/services/dynamic-sidecar/tests/unit/test_api_rest_workflow_service_metrics.py b/services/dynamic-sidecar/tests/unit/test_api_rest_workflow_service_metrics.py index ee3ab015a1c..62755395c99 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_rest_workflow_service_metrics.py +++ b/services/dynamic-sidecar/tests/unit/test_api_rest_workflow_service_metrics.py @@ -34,6 +34,7 @@ from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict from servicelib.fastapi.long_running_tasks.client import Client, periodic_task_result from servicelib.fastapi.long_running_tasks.client import setup as client_setup +from servicelib.long_running_tasks.errors import TaskExceptionError from servicelib.long_running_tasks.models import TaskId from simcore_service_dynamic_sidecar._meta import API_VTAG from simcore_service_dynamic_sidecar.core.docker_utils import get_container_states @@ -258,7 +259,7 @@ async def test_user_services_fail_to_start( with_compose_down: bool, mock_user_services_fail_to_start: None, ): - with pytest.raises(RuntimeError): + with pytest.raises(TaskExceptionError): async with periodic_task_result( client=client, task_id=await _get_task_id_create_service_containers( @@ -314,7 +315,7 @@ async def test_user_services_fail_to_stop_or_save_data( # in case of manual intervention multiple stops will be sent _EXPECTED_STOP_MESSAGES = 4 for _ in range(_EXPECTED_STOP_MESSAGES): - with pytest.raises(RuntimeError): + with pytest.raises(TaskExceptionError): async with periodic_task_result( client=client, task_id=await _get_task_id_docker_compose_down(httpx_async_client),