Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
139 commits
Select commit Hold shift + click to select a range
daf740c
removed unrequired
Aug 6, 2025
392cc53
refactor fixture
Aug 6, 2025
6498e7a
enhanced removal and cancellation
Aug 6, 2025
97e8466
fixed shutdown
Aug 6, 2025
3bfa295
update comment
Aug 6, 2025
2d44252
avoid tests hanging
Aug 6, 2025
a77d79d
fixed test timing out
Aug 6, 2025
5d60b6a
fixed tests and spedup
Aug 7, 2025
19088cd
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 7, 2025
cb90c9e
refactor
Aug 7, 2025
fd1cae6
replaced TasksManager with BaseLongRunningManager in lrt_api
Aug 7, 2025
a9befe4
renamed cancellation to remove_task
Aug 7, 2025
df8636a
fixed hanging test
Aug 7, 2025
897ca66
fast cancellation of lrt
Aug 7, 2025
18dde52
lrt api refactor
Aug 11, 2025
c84389d
extended tests long_running_manager
Aug 11, 2025
6ff969d
fixed tests
Aug 11, 2025
08f82cc
removed unused
Aug 11, 2025
7f24c65
fixed fastapi tests
Aug 11, 2025
ae2f4f1
fixed most tests
Aug 11, 2025
e317256
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 11, 2025
de9805b
fixed interfaces
Aug 11, 2025
14eede7
fixed interface
Aug 11, 2025
119b892
makes settings manadatory
Aug 11, 2025
f5a8f1b
fixed interface
Aug 11, 2025
e664ba8
fixed broken test
Aug 11, 2025
5738b65
fixed tests director-v2
Aug 11, 2025
84b17df
fixed serialization of result and enhanced registration
Aug 12, 2025
fb9ff38
refactor interface for registration
Aug 12, 2025
258a979
fixed tests
Aug 12, 2025
7815f8d
fixeed tests
Aug 12, 2025
372e668
added todo
Aug 12, 2025
05b7d05
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 12, 2025
51c455f
fixede failing tests
Aug 12, 2025
fa30870
renamed
Aug 12, 2025
451e442
fixed tests
Aug 13, 2025
bd0dcfe
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 13, 2025
9d8b9ee
mypy
Aug 13, 2025
87939ab
fixeed tests
Aug 13, 2025
1695f75
fixed tests
Aug 13, 2025
d8904de
fixed tests
Aug 13, 2025
6b13f67
removed unused
Aug 13, 2025
a02f6e3
fixed issues with loops in test
Aug 13, 2025
7f6ecda
rename
Aug 13, 2025
730f20d
extended timeout period
Aug 13, 2025
83ce1e9
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 13, 2025
60ee272
enhanced errors
Aug 13, 2025
e9ad6dc
added message
Aug 13, 2025
2380c61
removed uncecessary code
Aug 13, 2025
82861ce
fixed issue with method registration
Aug 14, 2025
d8cfe2e
using correct namespaces to handle requests
Aug 14, 2025
b2f3ea5
fixed issues with failing services
Aug 14, 2025
793492e
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 14, 2025
37b672e
refactor setup
Aug 14, 2025
ba2e2af
fixed tests
Aug 14, 2025
817929e
fixed flaky tests
Aug 14, 2025
9b14fe5
fixed typos
Aug 14, 2025
31853f2
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 14, 2025
7bc16d1
renamed
Aug 15, 2025
159ab96
fixes concurrency issue with tests in CI
Aug 15, 2025
67f44e3
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 15, 2025
2ae799d
refactor long running manager
Aug 18, 2025
f3f0702
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 18, 2025
f95b2f7
alliged all names as in #8220
Aug 18, 2025
5edf2f2
using relative imports
Aug 18, 2025
e526943
simplified start_task interface
Aug 18, 2025
c30c122
fixed description
Aug 18, 2025
28f9e62
moved RedisNamespace in models
Aug 18, 2025
a9cc987
replaced both namespaces with only a single one
Aug 18, 2025
4224e13
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 18, 2025
8ce988a
removed unsued module
Aug 18, 2025
5e31621
rename interface
Aug 18, 2025
9e246d6
fixed error formatting
Aug 18, 2025
ef3b687
rephrased
Aug 18, 2025
bb92891
refactor naming
Aug 18, 2025
2a34259
docstring rename
Aug 18, 2025
a10f065
removed
Aug 18, 2025
56f201a
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 18, 2025
a3ed827
rename
Aug 18, 2025
3086829
fix pylint?
Aug 18, 2025
187d78f
revert change
Aug 18, 2025
95cd9d5
made modules flat
Aug 18, 2025
7d6d885
removed _models
Aug 18, 2025
984846e
fixed ciruclar dependency
Aug 18, 2025
d4e0ef8
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 19, 2025
3863e9c
renamed
Aug 19, 2025
5559e6d
renamed
Aug 19, 2025
eb1826c
rename
Aug 19, 2025
18be3a4
correct decorator order
Aug 19, 2025
9200ba1
fixed flaky test removing tasks tha have not been started
Aug 19, 2025
d510c8e
fixed tests
Aug 19, 2025
6a0d40c
fixed wrong usage
Aug 19, 2025
3e62d90
corrected tests
Aug 19, 2025
fbf229d
corrected tests
Aug 19, 2025
b432be3
let app initialize
Aug 19, 2025
3f76411
explicit wait for removal
Aug 19, 2025
f160ddb
removal does not wait for task to be removed and supports a timeout
Aug 19, 2025
d5dab8f
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 19, 2025
888efc8
added missing
Aug 19, 2025
afde03d
refactor error translaton layer to accepet specific error classes only
Aug 20, 2025
36a24bd
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 20, 2025
ba0cafd
pylint
Aug 20, 2025
d878b17
refactored error registration and detection
Aug 20, 2025
46cd3b3
refactor
Aug 20, 2025
ca0bca2
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 20, 2025
4d8d93d
fixed tests
Aug 20, 2025
e110883
added note
Aug 20, 2025
a48bd02
removed unused
Aug 20, 2025
4a7341a
rename
Aug 20, 2025
6552ddf
fixed public lrt_api interface
Aug 20, 2025
2d7e587
updated docstrings
Aug 20, 2025
7b26893
updated docstrings
Aug 20, 2025
8cfffb4
fixed namesapce
Aug 20, 2025
d1fad6f
fixed broken sidecar
Aug 20, 2025
4dd8535
avoid tests form hanging
Aug 20, 2025
0d3208e
renamed
Aug 21, 2025
4bf6d63
renamed
Aug 21, 2025
407fa83
fixed interfaces
Aug 21, 2025
3095ba6
rename
Aug 21, 2025
6747133
reverted changes
Aug 21, 2025
1add852
remove unused
Aug 22, 2025
6fe3901
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 22, 2025
501a672
changed how error is transferred
Aug 22, 2025
01be053
refactor
Aug 22, 2025
2dfe2fc
renamed
Aug 22, 2025
e3f4bcd
changed error raising
Aug 22, 2025
f8785db
renamed
Aug 22, 2025
2c77471
fixed broken tests
Aug 22, 2025
ba77f87
Merge branch 'master' into pr-osparc-long-running-rabbitmq-client
GitHK Aug 22, 2025
1a9d1eb
updated specs
Aug 22, 2025
89fbc25
renamed modules
Aug 22, 2025
07c4ad1
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 22, 2025
66c5dde
removed relative imports
Aug 22, 2025
5ca6a58
feedback
Aug 22, 2025
e8a8052
Merge branch 'pr-osparc-long-running-rabbitmq-client' of github.com:G…
Aug 22, 2025
ddac38f
fixed test
Aug 22, 2025
5e0efab
fixed issue
Aug 22, 2025
6032282
Merge branch 'master' into pr-osparc-long-running-rabbitmq-client
GitHK Aug 22, 2025
9fc652a
fixed borken tests
Aug 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ LICENSES_ITIS_VIP_API_URL=https://replace-with-itis-api/{category}
LICENSES_ITIS_VIP_CATEGORIES='{"HumanWholeBody": "Humans", "HumanBodyRegion": "Humans (Region)", "AnimalWholeBody": "Animal"}'
LICENSES_SPEAG_PHANTOMS_API_URL=https://replace-with-speag-api/{category}
LICENSES_SPEAG_PHANTOMS_CATEGORIES='{"ComputationalPhantom": "Phantom of the Opera"}'
LONG_RUNNING_TASKS_NAMESPACE_SUFFIX=development

# Can use 'docker run -it itisfoundation/invitations:latest simcore-service-invitations generate-dotenv --auto-password'
INVITATIONS_DEFAULT_PRODUCT=osparc
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-testing-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1252,7 +1252,7 @@ jobs:
unit-test-service-library:
needs: changes
if: ${{ needs.changes.outputs.service-library == 'true' || github.event_name == 'push' || github.event.inputs.force_all_builds == 'true' }}
timeout-minutes: 18 # if this timeout gets too small, then split the tests
timeout-minutes: 20 # if this timeout gets too small, then split the tests
name: "[unit] service-library"
runs-on: ${{ matrix.os }}
strategy:
Expand Down
20 changes: 8 additions & 12 deletions api/specs/web-server/_long_running_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,44 +32,40 @@
@router.get(
"/tasks",
response_model=Envelope[list[TaskGet]],
name="list_tasks",
description="Lists all long running tasks",
responses=_export_data_responses,
)
def get_async_jobs(): ...
def get_async_jobs():
"""Lists all long running tasks"""


@router.get(
"/tasks/{task_id}",
response_model=Envelope[TaskStatus],
name="get_task_status",
description="Retrieves the status of a task",
responses=_export_data_responses,
)
def get_async_job_status(
_path_params: Annotated[_PathParam, Depends()],
): ...
):
"""Retrieves the status of a task"""


@router.delete(
"/tasks/{task_id}",
name="cancel_and_delete_task",
description="Cancels and deletes a task",
responses=_export_data_responses,
status_code=status.HTTP_204_NO_CONTENT,
)
def cancel_async_job(
_path_params: Annotated[_PathParam, Depends()],
): ...
):
"""Cancels and removes a task"""


@router.get(
"/tasks/{task_id}/result",
response_model=Any,
name="get_task_result",
description="Retrieves the result of a task",
responses=_export_data_responses,
)
def get_async_job_result(
_path_params: Annotated[_PathParam, Depends()],
): ...
):
"""Retrieves the result of a task"""
6 changes: 3 additions & 3 deletions api/specs/web-server/_long_running_tasks_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ async def get_task_status(

@router.delete(
"/{task_id}",
name="cancel_and_delete_task",
description="Cancels and deletes a task",
name="remove_task",
description="Cancels and removes a task",
status_code=status.HTTP_204_NO_CONTENT,
)
async def cancel_and_delete_task(
async def remove_task(
_path_params: Annotated[_PathParam, Depends()],
): ...

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from datetime import timedelta

import pytest
from pytest_mock import MockerFixture


@pytest.fixture
async def fast_long_running_tasks_cancellation(
mocker: MockerFixture,
) -> None:
mocker.patch(
"servicelib.long_running_tasks.task._CANCEL_TASKS_CHECK_INTERVAL",
new=timedelta(seconds=1),
)
Original file line number Diff line number Diff line change
@@ -1,41 +1,12 @@
import datetime

from aiohttp import web
from settings_library.redis import RedisSettings

from ...long_running_tasks.base_long_running_manager import BaseLongRunningManager
from ...long_running_tasks.models import TaskContext
from ...long_running_tasks.task import RedisNamespace, TasksManager
from ._constants import APP_LONG_RUNNING_MANAGER_KEY
from ._request import get_task_context


class AiohttpLongRunningManager(BaseLongRunningManager):
def __init__(
self,
app: web.Application,
stale_task_check_interval: datetime.timedelta,
stale_task_detect_timeout: datetime.timedelta,
redis_settings: RedisSettings,
redis_namespace: RedisNamespace,
):
self._app = app
self._tasks_manager = TasksManager(
stale_task_check_interval=stale_task_check_interval,
stale_task_detect_timeout=stale_task_detect_timeout,
redis_settings=redis_settings,
redis_namespace=redis_namespace,
)

@property
def tasks_manager(self) -> TasksManager:
return self._tasks_manager

async def setup(self) -> None:
await self._tasks_manager.setup()

async def teardown(self) -> None:
await self._tasks_manager.teardown()

@staticmethod
def get_task_context(request: web.Request) -> TaskContext:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from typing import Any
from typing import Annotated, Any

from aiohttp import web
from pydantic import BaseModel
from models_library.rest_base import RequestParameters
from pydantic import BaseModel, Field

from ...aiohttp import status
from ...long_running_tasks import lrt_api
from ...long_running_tasks.models import TaskGet, TaskId
from ..requests_validation import parse_request_path_parameters_as
from ..requests_validation import (
parse_request_path_parameters_as,
parse_request_query_parameters_as,
)
from ..rest_responses import create_data_response
from ._manager import get_long_running_manager

Expand All @@ -26,10 +30,11 @@ async def list_tasks(request: web.Request) -> web.Response:
task_id=t.task_id,
status_href=f"{request.app.router['get_task_status'].url_for(task_id=t.task_id)}",
result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}",
abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}",
abort_href=f"{request.app.router['remove_task'].url_for(task_id=t.task_id)}",
)
for t in await lrt_api.list_tasks(
long_running_manager.tasks_manager,
long_running_manager.rpc_client,
long_running_manager.lrt_namespace,
long_running_manager.get_task_context(request),
)
]
Expand All @@ -42,7 +47,8 @@ async def get_task_status(request: web.Request) -> web.Response:
long_running_manager = get_long_running_manager(request.app)

task_status = await lrt_api.get_task_status(
long_running_manager.tasks_manager,
long_running_manager.rpc_client,
long_running_manager.lrt_namespace,
long_running_manager.get_task_context(request),
path_params.task_id,
)
Expand All @@ -56,20 +62,36 @@ async def get_task_result(request: web.Request) -> web.Response | Any:

# NOTE: this might raise an exception that will be catched by the _error_handlers
return await lrt_api.get_task_result(
long_running_manager.tasks_manager,
long_running_manager.rpc_client,
long_running_manager.lrt_namespace,
long_running_manager.get_task_context(request),
path_params.task_id,
)


@routes.delete("/{task_id}", name="cancel_and_delete_task")
async def cancel_and_delete_task(request: web.Request) -> web.Response:
class _RemoveTaskQueryParams(RequestParameters):
wait_for_removal: Annotated[
bool,
Field(
description=(
"when True waits for the task to be removed "
"completly instead of returning immediately"
)
),
] = True


@routes.delete("/{task_id}", name="remove_task")
async def remove_task(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(_PathParam, request)
query_params = parse_request_query_parameters_as(_RemoveTaskQueryParams, request)
long_running_manager = get_long_running_manager(request.app)

await lrt_api.remove_task(
long_running_manager.tasks_manager,
long_running_manager.rpc_client,
long_running_manager.lrt_namespace,
long_running_manager.get_task_context(request),
path_params.task_id,
wait_for_removal=query_params.wait_for_removal,
)
return web.json_response(status=status.HTTP_204_NO_CONTENT)
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@
from aiohttp.web import HTTPException
from common_library.json_serialization import json_dumps
from pydantic import AnyHttpUrl, TypeAdapter
from settings_library.rabbit import RabbitSettings
from settings_library.redis import RedisSettings

from ...aiohttp import status
from ...long_running_tasks import lrt_api
from ...long_running_tasks._redis_serialization import (
from ...long_running_tasks._serialization import (
BaseObjectSerializer,
register_custom_serialization,
)
from ...long_running_tasks.constants import (
DEFAULT_STALE_TASK_CHECK_INTERVAL,
DEFAULT_STALE_TASK_DETECT_TIMEOUT,
)
from ...long_running_tasks.models import TaskContext, TaskGet
from ...long_running_tasks.task import RedisNamespace, RegisteredTaskName
from ...long_running_tasks.models import (
LRTNamespace,
RegisteredTaskName,
TaskContext,
TaskGet,
)
from ..typing_extension import Handler
from . import _routes
from ._constants import (
Expand Down Expand Up @@ -63,7 +68,8 @@ async def start_long_running_task(
task_id = None
try:
task_id = await lrt_api.start_task(
long_running_manager.tasks_manager,
long_running_manager.rpc_client,
long_running_manager.lrt_namespace,
registerd_task_name,
fire_and_forget=fire_and_forget,
task_context=task_context,
Expand All @@ -81,7 +87,7 @@ async def start_long_running_task(
f"http://{ip_addr}:{port}{request_.app.router['get_task_result'].url_for(task_id=task_id)}" # NOSONAR
)
abort_url = TypeAdapter(AnyHttpUrl).validate_python(
f"http://{ip_addr}:{port}{request_.app.router['cancel_and_delete_task'].url_for(task_id=task_id)}" # NOSONAR
f"http://{ip_addr}:{port}{request_.app.router['remove_task'].url_for(task_id=task_id)}" # NOSONAR
)
task_get = TaskGet(
task_id=task_id,
Expand All @@ -98,7 +104,11 @@ async def start_long_running_task(
# remove the task, the client was disconnected
if task_id:
await lrt_api.remove_task(
long_running_manager.tasks_manager, task_context, task_id
long_running_manager.rpc_client,
long_running_manager.lrt_namespace,
task_context,
task_id,
wait_for_removal=True,
)
raise

Expand Down Expand Up @@ -143,20 +153,23 @@ def setup(
*,
router_prefix: str,
redis_settings: RedisSettings,
redis_namespace: RedisNamespace,
handler_check_decorator: Callable = _no_ops_decorator,
task_request_context_decorator: Callable = _no_task_context_decorator,
rabbit_settings: RabbitSettings,
lrt_namespace: LRTNamespace,
stale_task_check_interval: datetime.timedelta = DEFAULT_STALE_TASK_CHECK_INTERVAL,
stale_task_detect_timeout: datetime.timedelta = DEFAULT_STALE_TASK_DETECT_TIMEOUT,
handler_check_decorator: Callable = _no_ops_decorator,
task_request_context_decorator: Callable = _no_task_context_decorator,
) -> None:
"""
- `router_prefix` APIs are mounted on `/...`, this
will change them to be mounted as `{router_prefix}/...`
- `stale_task_check_interval_s` interval at which the
- `redis_settings` settings for Redis connection
- `rabbit_settings` settings for RabbitMQ connection
- `lrt_namespace` namespace for the long-running tasks
- `stale_task_check_interval` interval at which the
TaskManager checks for tasks which are no longer being
actively monitored by a client
- `stale_task_detect_timeout_s` interval after which a
task is considered stale
- `stale_task_detect_timeout` interval after which atask is considered stale
"""

async def on_cleanup_ctx(app: web.Application) -> AsyncGenerator[None, None]:
Expand All @@ -168,11 +181,11 @@ async def on_cleanup_ctx(app: web.Application) -> AsyncGenerator[None, None]:
# add components to state
app[APP_LONG_RUNNING_MANAGER_KEY] = long_running_manager = (
AiohttpLongRunningManager(
app=app,
stale_task_check_interval=stale_task_check_interval,
stale_task_detect_timeout=stale_task_detect_timeout,
redis_settings=redis_settings,
redis_namespace=redis_namespace,
rabbit_settings=rabbit_settings,
lrt_namespace=lrt_namespace,
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from aiohttp.web import HTTPInternalServerError, Request, StreamResponse, middleware
from servicelib.mimetype_constants import (
MIMETYPE_APPLICATION_JSON,
MIMETYPE_APPLICATION_ND_JSON,
)

from ..mimetype_constants import MIMETYPE_APPLICATION_JSON, MIMETYPE_APPLICATION_ND_JSON
from ..utils_profiling_middleware import _is_profiling, _profiler, append_profile


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import httpx
from fastapi import FastAPI
from servicelib.fastapi.tracing import setup_httpx_client_tracing
from settings_library.tracing import TracingSettings

from .tracing import setup_httpx_client_tracing


def setup_client_session(
app: FastAPI,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ async def get_task_result(
return result.json()

@retry_on_http_errors
async def cancel_and_delete_task(
async def remove_task(
self, task_id: TaskId, *, timeout: PositiveFloat | None = None # noqa: ASYNC109
) -> None:
timeout = timeout or self._client_configuration.default_timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ async def _wait_for_task_result() -> Any:

yield result
except TimeoutError as e:
await client.cancel_and_delete_task(task_id)
await client.remove_task(task_id)
raise TaskClientTimeoutError(
task_id=task_id,
timeout=task_timeout,
Expand Down
Loading
Loading