Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import asyncio
from asyncio.log import logger
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any, Final

from pydantic import PositiveFloat

from ...long_running_tasks.errors import TaskClientTimeoutError
from ...long_running_tasks.errors import TaskClientTimeoutError, TaskExceptionError
from ...long_running_tasks.models import (
ProgressCallback,
ProgressMessage,
Expand All @@ -16,6 +16,8 @@
)
from ._client import Client

_logger = logging.getLogger(__name__)

# NOTE: very short running requests are involved
MAX_CONCURRENCY: Final[int] = 10

Expand Down Expand Up @@ -96,7 +98,7 @@ async def periodic_task_result(

async def _status_update() -> TaskStatus:
task_status: TaskStatus = await client.get_task_status(task_id)
logger.debug("Task status %s", task_status.model_dump_json())
_logger.debug("Task status %s", task_status.model_dump_json())
await progress_manager.update(
task_id=task_id,
message=task_status.task_progress.message,
Expand All @@ -114,7 +116,7 @@ async def _wait_for_task_result() -> Any:

try:
result = await asyncio.wait_for(_wait_for_task_result(), timeout=task_timeout)
logger.debug("%s, %s", f"{task_id=}", f"{result=}")
_logger.debug("%s, %s", f"{task_id=}", f"{result=}")

yield result
except TimeoutError as e:
Expand All @@ -124,3 +126,11 @@ async def _wait_for_task_result() -> Any:
timeout=task_timeout,
exception=e,
) from e
except Exception as e:
error = TaskExceptionError(
task_id=task_id,
exception=e,
traceback=f"check remote side for logs, HINT: service replying to: '{client._base_url}' for '{task_id=}'", # noqa: SLF001 # pylint:disable=protected-access
)
_logger.warning("%s", error)
raise error from e
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import logging
import traceback
from typing import Any

from .errors import TaskNotCompletedError, TaskNotFoundError
from .models import TaskBase, TaskId, TaskStatus
from .task import TaskContext, TasksManager, TrackedTask

_logger = logging.getLogger(__name__)


def list_tasks(
tasks_manager: TasksManager, task_context: TaskContext | None
Expand All @@ -25,14 +30,25 @@ async def get_task_result(
tasks_manager: TasksManager, task_context: TaskContext | None, task_id: TaskId
) -> Any:
try:
return tasks_manager.get_task_result(
task_id=task_id, with_task_context=task_context
task_result = tasks_manager.get_task_result(
task_id, with_task_context=task_context
)
await tasks_manager.remove_task(
task_id, with_task_context=task_context, reraise_errors=False
)
finally:
# the task is always removed even if an error occurs
return task_result
except (TaskNotFoundError, TaskNotCompletedError):
raise
except Exception as exc:
# the task raised an exception
formatted_traceback = "".join(traceback.format_exception(exc))
_logger.info("Task '%s' raised an exception: %s", task_id, formatted_traceback)

# the task shall be removed in this case
await tasks_manager.remove_task(
task_id, with_task_context=task_context, reraise_errors=False
)
raise


async def remove_task(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from servicelib.fastapi.long_running_tasks.server import setup as setup_server
from servicelib.long_running_tasks.errors import (
TaskClientTimeoutError,
TaskExceptionError,
)
from servicelib.long_running_tasks.models import (
ProgressMessage,
Expand Down Expand Up @@ -148,7 +149,7 @@ async def test_task_result_task_result_is_an_error(

url = TypeAdapter(AnyHttpUrl).validate_python("http://backgroud.testserver.io/")
client = Client(app=bg_task_app, async_client=async_client, base_url=url)
with pytest.raises(RuntimeError, match="I am failing as requested"):
with pytest.raises(TaskExceptionError, match="I am failing as requested"):
async with periodic_task_result(
client,
task_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from models_library.users import UserID
from servicelib.fastapi.http_client_thin import BaseHttpClientError
from servicelib.logging_utils import log_context
from servicelib.long_running_tasks.errors import TaskExceptionError
from servicelib.long_running_tasks.models import ProgressCallback, TaskProgress
from servicelib.rabbitmq import RabbitMQClient
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
Expand Down Expand Up @@ -134,7 +135,7 @@ async def service_remove_containers(
await sidecars_client.stop_service(
scheduler_data.endpoint, progress_callback=progress_callback
)
except BaseHttpClientError as e:
except (BaseHttpClientError, TaskExceptionError) as e:
_logger.info(
(
"Could not remove service containers for %s. "
Expand All @@ -151,7 +152,7 @@ async def service_free_reserved_disk_space(
scheduler_data: SchedulerData = _get_scheduler_data(app, node_id)
try:
await sidecars_client.free_reserved_disk_space(scheduler_data.endpoint)
except BaseHttpClientError as e:
except (BaseHttpClientError, TaskExceptionError) as e:
_logger.info(
(
"Could not remove service containers for %s. "
Expand Down Expand Up @@ -369,7 +370,7 @@ async def attempt_pod_removal_and_data_saving(
scheduler_data.dynamic_sidecar.were_state_and_outputs_saved = True

_logger.info("dynamic-sidecar saved: state and output ports")
except BaseHttpClientError as e:
except (BaseHttpClientError, TaskExceptionError) as e:
_logger.error( # noqa: TRY400
(
"Could not contact dynamic-sidecar to save service "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict
from servicelib.fastapi.long_running_tasks.client import Client, periodic_task_result
from servicelib.fastapi.long_running_tasks.client import setup as client_setup
from servicelib.long_running_tasks.errors import TaskExceptionError
from servicelib.long_running_tasks.models import TaskId
from simcore_sdk.node_ports_common.exceptions import NodeNotFound
from simcore_service_dynamic_sidecar._meta import API_VTAG
Expand All @@ -42,10 +43,7 @@
from simcore_service_dynamic_sidecar.models.shared_store import SharedStore
from simcore_service_dynamic_sidecar.modules.inputs import enable_inputs_pulling
from simcore_service_dynamic_sidecar.modules.outputs._context import OutputsContext
from simcore_service_dynamic_sidecar.modules.outputs._manager import (
OutputsManager,
UploadPortsFailedError,
)
from simcore_service_dynamic_sidecar.modules.outputs._manager import OutputsManager

FAST_STATUS_POLL: Final[float] = 0.1
CREATE_SERVICE_CONTAINERS_TIMEOUT: Final[float] = 60
Expand Down Expand Up @@ -681,7 +679,7 @@ async def _test_code() -> None:
if not mock_port_keys:
await _test_code()
else:
with pytest.raises(UploadPortsFailedError) as exec_info:
with pytest.raises(TaskExceptionError) as exec_info:
await _test_code()
assert f"the node id {missing_node_uuid} was not found" in f"{exec_info.value}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict
from servicelib.fastapi.long_running_tasks.client import Client, periodic_task_result
from servicelib.fastapi.long_running_tasks.client import setup as client_setup
from servicelib.long_running_tasks.errors import TaskExceptionError
from servicelib.long_running_tasks.models import TaskId
from simcore_service_dynamic_sidecar._meta import API_VTAG
from simcore_service_dynamic_sidecar.core.docker_utils import get_container_states
Expand Down Expand Up @@ -258,7 +259,7 @@ async def test_user_services_fail_to_start(
with_compose_down: bool,
mock_user_services_fail_to_start: None,
):
with pytest.raises(RuntimeError):
with pytest.raises(TaskExceptionError):
async with periodic_task_result(
client=client,
task_id=await _get_task_id_create_service_containers(
Expand Down Expand Up @@ -314,7 +315,7 @@ async def test_user_services_fail_to_stop_or_save_data(
# in case of manual intervention multiple stops will be sent
_EXPECTED_STOP_MESSAGES = 4
for _ in range(_EXPECTED_STOP_MESSAGES):
with pytest.raises(RuntimeError):
with pytest.raises(TaskExceptionError):
async with periodic_task_result(
client=client,
task_id=await _get_task_id_docker_compose_down(httpx_async_client),
Expand Down
Loading