Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/specs/web-server/_long_running_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from models_library.generics import Envelope
from models_library.rest_error import EnvelopedError
from servicelib.aiohttp.long_running_tasks._routes import _PathParam
from servicelib.long_running_tasks._models import TaskGet, TaskStatus
from servicelib.long_running_tasks.models import TaskGet, TaskStatus
from simcore_service_webserver._meta import API_VTAG
from simcore_service_webserver.tasks._exception_handlers import (
_TO_HTTP_ERROR_MAP as export_data_http_error_map,
Expand Down
2 changes: 1 addition & 1 deletion api/specs/web-server/_long_running_tasks_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from fastapi import APIRouter, Depends, status
from models_library.generics import Envelope
from servicelib.aiohttp.long_running_tasks._routes import _PathParam
from servicelib.long_running_tasks._models import TaskGet, TaskStatus
from servicelib.long_running_tasks.models import TaskGet, TaskStatus
from simcore_service_webserver._meta import API_VTAG

router = APIRouter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from aiohttp import web

from ...long_running_tasks._task import TasksManager
from ...long_running_tasks.task import TasksManager
from ._constants import (
APP_LONG_RUNNING_TASKS_MANAGER_KEY,
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from aiohttp import web
from common_library.json_serialization import json_dumps

from ...long_running_tasks._errors import (
from ...long_running_tasks.errors import (
TaskCancelledError,
TaskNotCompletedError,
TaskNotFoundError,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from pydantic import BaseModel
from servicelib.aiohttp import status

from ...long_running_tasks._errors import TaskNotCompletedError, TaskNotFoundError
from ...long_running_tasks._models import TaskGet, TaskId, TaskStatus
from ...long_running_tasks._task import TrackedTask
from ...long_running_tasks.errors import TaskNotCompletedError, TaskNotFoundError
from ...long_running_tasks.models import TaskGet, TaskId, TaskStatus
from ...long_running_tasks.task import TrackedTask
from ..requests_validation import parse_request_path_parameters_as
from ._dependencies import get_task_context, get_tasks_manager

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from pydantic import AnyHttpUrl, PositiveFloat, TypeAdapter

from ...aiohttp import status
from ...long_running_tasks._models import TaskGet
from ...long_running_tasks._task import (
from ...long_running_tasks.models import TaskGet
from ...long_running_tasks.task import (
TaskContext,
TaskProtocol,
TasksManager,
Expand Down Expand Up @@ -136,11 +136,11 @@ def setup(

async def on_cleanup_ctx(app: web.Application) -> AsyncGenerator[None, None]:
# add components to state
app[
APP_LONG_RUNNING_TASKS_MANAGER_KEY
] = long_running_task_manager = TasksManager(
stale_task_check_interval_s=stale_task_check_interval_s,
stale_task_detect_timeout_s=stale_task_detect_timeout_s,
app[APP_LONG_RUNNING_TASKS_MANAGER_KEY] = long_running_task_manager = (
TasksManager(
stale_task_check_interval_s=stale_task_check_interval_s,
stale_task_detect_timeout_s=stale_task_detect_timeout_s,
)
)

# add error handlers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@
from yarl import URL

from ...long_running_tasks._constants import DEFAULT_POLL_INTERVAL_S, HOUR
from ...long_running_tasks._models import LRTask, RequestBody
from ...long_running_tasks.models import (
LRTask,
RequestBody,
TaskGet,
TaskId,
TaskProgress,
TaskStatus,
)
from ...rest_responses import unwrap_envelope_if_required
from .. import status
from .server import TaskGet, TaskId, TaskProgress, TaskStatus

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -123,6 +129,3 @@ async def long_running_task_request(
if task:
await _abort_task(session, URL(task.abort_href))
raise


__all__: tuple[str, ...] = ("LRTask",)
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,19 @@
running task.
"""

from ...long_running_tasks._errors import TaskAlreadyRunningError, TaskCancelledError
from ...long_running_tasks._models import ProgressMessage, ProgressPercent
from ...long_running_tasks._task import (
TaskId,
TaskProgress,
TaskProtocol,
TasksManager,
TaskStatus,
)
from ._dependencies import (
create_task_name_from_request,
get_task_context,
get_tasks_manager,
)
from ._routes import TaskGet
from ._server import setup, start_long_running_task

__all__: tuple[str, ...] = (
"create_task_name_from_request",
"get_task_context",
"get_tasks_manager",
"ProgressMessage",
"ProgressPercent",
"setup",
"start_long_running_task",
"TaskAlreadyRunningError",
"TaskCancelledError",
"TaskId",
"TaskGet",
"TasksManager",
"TaskProgress",
"TaskProtocol",
"TaskStatus",
)

# nopycln: file
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_exponential

from ...long_running_tasks._errors import GenericClientError
from ...long_running_tasks._models import ClientConfiguration, TaskId, TaskStatus
from ...long_running_tasks.errors import GenericClientError
from ...long_running_tasks.models import ClientConfiguration, TaskId, TaskStatus

DEFAULT_HTTP_REQUESTS_TIMEOUT: Final[PositiveFloat] = 15
_DEFAULT_HTTP_REQUESTS_TIMEOUT: Final[PositiveFloat] = 15


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -207,7 +207,7 @@ def setup(
app: FastAPI,
*,
router_prefix: str = "",
http_requests_timeout: PositiveFloat = DEFAULT_HTTP_REQUESTS_TIMEOUT,
http_requests_timeout: PositiveFloat = _DEFAULT_HTTP_REQUESTS_TIMEOUT,
):
"""
- `router_prefix` by default it is assumed the server mounts the APIs on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

from pydantic import PositiveFloat

from ...long_running_tasks._errors import TaskClientTimeoutError
from ...long_running_tasks._models import (
from ...long_running_tasks.errors import TaskClientTimeoutError
from ...long_running_tasks.models import (
ProgressCallback,
ProgressMessage,
ProgressPercent,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from fastapi import Request

from ...long_running_tasks._task import TasksManager
from ...long_running_tasks.task import TasksManager


def get_tasks_manager(request: Request) -> TasksManager:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from starlette.requests import Request
from starlette.responses import JSONResponse

from ...long_running_tasks._errors import (
from ...long_running_tasks.errors import (
BaseLongRunningError,
TaskNotCompletedError,
TaskNotFoundError,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

from fastapi import APIRouter, Depends, Request, status

from ...long_running_tasks._errors import TaskNotCompletedError, TaskNotFoundError
from ...long_running_tasks._models import TaskGet, TaskId, TaskResult, TaskStatus
from ...long_running_tasks._task import TasksManager
from ...long_running_tasks.errors import TaskNotCompletedError, TaskNotFoundError
from ...long_running_tasks.models import TaskGet, TaskId, TaskResult, TaskStatus
from ...long_running_tasks.task import TasksManager
from ..requests_decorators import cancel_on_disconnect
from ._dependencies import get_tasks_manager

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from fastapi import APIRouter, FastAPI
from pydantic import PositiveFloat

from ...long_running_tasks._errors import BaseLongRunningError
from ...long_running_tasks._task import TasksManager
from ...long_running_tasks.errors import BaseLongRunningError
from ...long_running_tasks.task import TasksManager
from ._error_handlers import base_long_running_error_handler
from ._routes import router

Expand Down Expand Up @@ -50,4 +50,4 @@ async def on_shutdown() -> None:

# add error handlers
# NOTE: Exception handler can not be added during the on_startup script, otherwise not working correctly
app.add_exception_handler(BaseLongRunningError, base_long_running_error_handler) # type: ignore[arg-type]
app.add_exception_handler(BaseLongRunningError, base_long_running_error_handler) # type: ignore[arg-type]
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@

import httpx
from fastapi import status
from models_library.api_schemas_long_running_tasks.base import TaskProgress
from models_library.api_schemas_long_running_tasks.tasks import (
TaskGet,
TaskResult,
TaskStatus,
)
from tenacity import (
AsyncRetrying,
TryAgain,
Expand All @@ -27,17 +21,16 @@
from yarl import URL

from ...long_running_tasks._constants import DEFAULT_POLL_INTERVAL_S, HOUR
from ...long_running_tasks._models import (
ClientConfiguration,
from ...long_running_tasks.models import (
LRTask,
ProgressCallback,
ProgressMessage,
ProgressPercent,
RequestBody,
TaskGet,
TaskProgress,
TaskStatus,
)
from ...long_running_tasks._task import TaskId
from ...long_running_tasks.task import TaskId
from ...rest_responses import unwrap_envelope_if_required
from ._client import DEFAULT_HTTP_REQUESTS_TIMEOUT, Client, setup
from ._client import Client, setup
from ._context_manager import periodic_task_result

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -151,15 +144,7 @@ async def long_running_task_request(


__all__: tuple[str, ...] = (
"DEFAULT_HTTP_REQUESTS_TIMEOUT",
"Client",
"ClientConfiguration",
"LRTask",
"ProgressCallback",
"ProgressMessage",
"ProgressPercent",
"TaskId",
"TaskResult",
"periodic_task_result",
"setup",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,12 @@
running task. The client will take care of recovering the result from it.
"""

from models_library.api_schemas_long_running_tasks.tasks import TaskResult

from ...long_running_tasks._errors import TaskAlreadyRunningError, TaskCancelledError
from ...long_running_tasks._task import (
TaskId,
TaskProgress,
TasksManager,
TaskStatus,
start_task,
)
from ._dependencies import get_tasks_manager
from ._server import setup

__all__: tuple[str, ...] = (
"get_tasks_manager",
"setup",
"start_task",
"TaskAlreadyRunningError",
"TaskCancelledError",
"TaskId",
"TasksManager",
"TaskProgress",
"TaskResult",
"TaskStatus",
)

# nopycln: file
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
)
from pydantic import PositiveFloat

from ._errors import (
from .errors import (
TaskAlreadyRunningError,
TaskCancelledError,
TaskExceptionError,
TaskNotCompletedError,
TaskNotFoundError,
)
from ._models import TaskId, TaskName, TaskStatus, TrackedTask
from .models import TaskId, TaskName, TaskStatus, TrackedTask

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -262,7 +262,7 @@ async def _cancel_asyncio_task(
await asyncio.wait_for(
_await_task(task), timeout=self._cancel_task_timeout_s
)
except asyncio.TimeoutError:
except TimeoutError:
logger.warning(
"Timed out while awaiting for cancellation of '%s'", reference
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,22 @@
from pydantic import BaseModel, TypeAdapter
from pytest_simcore.helpers.assert_checks import assert_status
from servicelib.aiohttp import long_running_tasks, status
from servicelib.aiohttp.long_running_tasks.server import TaskId
from servicelib.aiohttp.requests_validation import parse_request_query_parameters_as
from servicelib.long_running_tasks._task import TaskContext
from servicelib.long_running_tasks.models import (
TaskGet,
TaskId,
TaskProgress,
TaskStatus,
)
from servicelib.long_running_tasks.task import TaskContext
from tenacity.asyncio import AsyncRetrying
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_fixed


async def _string_list_task(
task_progress: long_running_tasks.server.TaskProgress,
task_progress: TaskProgress,
num_strings: int,
sleep_time: float,
fail: bool,
Expand Down Expand Up @@ -93,7 +98,7 @@ async def _caller(client: TestClient, **query_kwargs) -> TaskId:
data, error = await assert_status(resp, status.HTTP_202_ACCEPTED)
assert data
assert not error
task_get = TypeAdapter(long_running_tasks.server.TaskGet).validate_python(data)
task_get = TypeAdapter(TaskGet).validate_python(data)
return task_get.task_id

return _caller
Expand Down Expand Up @@ -123,7 +128,7 @@ async def _waiter(
data, error = await assert_status(result, status.HTTP_200_OK)
assert data
assert not error
task_status = long_running_tasks.server.TaskStatus.model_validate(data)
task_status = TaskStatus.model_validate(data)
assert task_status
assert task_status.done

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
from pydantic import TypeAdapter
from pytest_simcore.helpers.assert_checks import assert_status
from servicelib.aiohttp import long_running_tasks, status
from servicelib.aiohttp.long_running_tasks.server import TaskGet, TaskId
from servicelib.aiohttp.rest_middlewares import append_rest_middlewares
from servicelib.long_running_tasks._task import TaskContext
from servicelib.long_running_tasks.models import TaskGet, TaskId, TaskStatus
from servicelib.long_running_tasks.task import TaskContext
from tenacity.asyncio import AsyncRetrying
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_delay
Expand Down Expand Up @@ -70,7 +70,7 @@ async def test_workflow(
data, error = await assert_status(result, status.HTTP_200_OK)
assert data
assert not error
task_status = long_running_tasks.server.TaskStatus.model_validate(data)
task_status = TaskStatus.model_validate(data)
assert task_status
progress_updates.append(
(task_status.task_progress.message, task_status.task_progress.percent)
Expand Down
Loading
Loading