Skip to content
Merged
Show file tree
Hide file tree
Changes from 75 commits
Commits
Show all changes
144 commits
Select commit Hold shift + click to select a range
fb8628a
added in memory store to TasksManager
Jul 21, 2025
cb7f96d
remove unused and renamed DB
Jul 21, 2025
52d5a8f
renamed
Jul 21, 2025
878db59
moved interface to lrt_api
Jul 21, 2025
d8ccee4
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 22, 2025
b8efe90
add support for cancellation
Jul 22, 2025
ae3d7ef
cancelled tasks now are detected and removed
Jul 22, 2025
33b8ea2
erge remote-tracking branch 'upstream/master' into pr-osparc-long-run…
Jul 22, 2025
41db9fa
tasks are singularly tracked
Jul 22, 2025
dc7a13c
adjusted task cleanup
Jul 22, 2025
572d100
added missing test case
Jul 22, 2025
3ce8fa3
added redis store
Jul 23, 2025
f62f4b3
using new task manager with Redis peristency
Jul 23, 2025
f0330dd
added redis to tasksmanager
Jul 23, 2025
0e6516a
added Redis storage to long running tasks
Jul 23, 2025
2cda61e
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 23, 2025
41636d7
fixed tests
Jul 23, 2025
85cdd6b
repalced serializers
Jul 23, 2025
792d597
added test for multiple namespaces with redis store
Jul 23, 2025
9fbf1cf
fixed tests
Jul 23, 2025
e526df5
Revert "fixed tests"
Jul 23, 2025
0452113
fixed tests which required redis
Jul 23, 2025
e0cbc7b
fixed tests
Jul 23, 2025
cb79b56
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 23, 2025
a027afd
mypy
Jul 23, 2025
afb6eac
progress updates are now async and saved to storage
Jul 23, 2025
a73f920
fixed tests
Jul 23, 2025
61a5c1e
fixed broken tests
Jul 23, 2025
b57923f
updated interface
Jul 24, 2025
915ab48
fixed tests
Jul 24, 2025
6ee1d9f
mypy
Jul 24, 2025
ab3f0bd
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 24, 2025
827aa5e
fixed tests
Jul 24, 2025
f3b5e83
fixed tests
Jul 24, 2025
57db7eb
fixed test
Jul 24, 2025
1e92a2d
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 24, 2025
fc96620
added exclusive lock
Jul 24, 2025
2d5cf7a
changed mock
Jul 24, 2025
d4230fb
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 24, 2025
d39443f
mypy
Jul 24, 2025
8ca56b2
pylint
Jul 24, 2025
a42b243
fixed test
Jul 24, 2025
de5bb6d
fixed cli tests
Jul 25, 2025
68a6671
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 25, 2025
7baad68
fixed redis client sdk being stuck
Jul 25, 2025
0339d2e
added note for issue
Jul 25, 2025
28feba3
bumped timeout
Jul 25, 2025
b36ea0a
revert change
Jul 25, 2025
b206956
added timeout
Jul 25, 2025
1180187
trying to avoid lockup on teardown
Jul 25, 2025
9397869
avoid error on cancellation
Jul 25, 2025
9c35202
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 25, 2025
3a43c56
fixed client
Jul 25, 2025
ed87d5f
revert change, since it does nothing
Jul 25, 2025
fdc3ad7
removed is healthy
Jul 25, 2025
89f6bbb
fixed test
Jul 28, 2025
bae95a1
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 28, 2025
a82a8b8
fixed haning tests
Jul 28, 2025
1ec0334
pylint
Jul 28, 2025
402a6ac
refactor
Jul 28, 2025
e36eb8d
removed
Jul 28, 2025
89201e6
refactor client
Jul 28, 2025
7feeff6
fixed test
Jul 28, 2025
28e56ed
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 28, 2025
f883dfe
fixed in memory long running tasks for tests
Jul 29, 2025
a13cf50
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 29, 2025
fd3814d
copilot
Jul 29, 2025
2671668
added network to dynamic-sidecar
Jul 29, 2025
17b5ccc
fixed broken tests
Jul 29, 2025
6cb0631
added tests using multiple managers in parallel
Jul 29, 2025
be76636
fixed test
Jul 29, 2025
e05fba5
added transmission of error cancellation
Jul 29, 2025
b097ee7
pylint
Jul 29, 2025
a07dba7
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 29, 2025
9db9e83
mypy
Jul 29, 2025
e81328e
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 30, 2025
2f57230
added custom serializers
Jul 30, 2025
c3f919e
refactor
Jul 30, 2025
8a71108
mypy
Jul 30, 2025
def4201
fixed issue
Jul 30, 2025
355ccec
removed type
Jul 30, 2025
b7acc6c
relative imports
Jul 30, 2025
18a2075
comment update
Jul 30, 2025
3f99d97
refactor with retry
Jul 30, 2025
1717a94
moved to the requested place
Jul 30, 2025
15f6d18
simplify settings
Jul 30, 2025
c05fed9
refactor
Jul 30, 2025
b44d641
refactor
Jul 30, 2025
da4edf4
repalced with inmemory redis
Jul 30, 2025
81b41ea
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 30, 2025
ec08d2f
added missing dependency
Jul 30, 2025
b7043b3
fixed test
Jul 31, 2025
035685c
fixeed cli tests
Jul 31, 2025
ee63948
fixed flaky test
Jul 31, 2025
4d4a274
fixed test
Jul 31, 2025
093592f
moved fixture
Jul 31, 2025
9409199
refactoed fixture
Jul 31, 2025
fcd3e8a
fixed issue with removal of tasks
Jul 31, 2025
6561be5
mypy
Jul 31, 2025
6b257d0
removed in memoery
Jul 31, 2025
f699cd2
use proper patch
Jul 31, 2025
fac176e
refactir cancellation
Jul 31, 2025
1fddd08
fixeed removal and task cancellation
Jul 31, 2025
b9f1581
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Jul 31, 2025
3a1bc33
removed task timeout
Jul 31, 2025
d07cfda
refactored test
Jul 31, 2025
91c4651
avoid CI fro getting stuck
Aug 4, 2025
13b705d
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 4, 2025
3573bdd
added fakeredis
Aug 4, 2025
abab05f
fixed stuck tests and renamed namespace to redis_namespace
Aug 4, 2025
01f0e5e
added missing dependencies
Aug 4, 2025
e5dc682
added missing test dependency
Aug 4, 2025
66f1915
enhanced waiting for progress to be delivered
Aug 5, 2025
5d7e723
using in memory
Aug 5, 2025
49a79b2
applied suggestion
Aug 5, 2025
6e5eeb3
fixed interface
Aug 5, 2025
a8c6353
using correct definition
Aug 5, 2025
a190275
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 5, 2025
0271ba8
fixed broke ntests
Aug 5, 2025
c6c0f14
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 5, 2025
67d3f51
docstring
Aug 5, 2025
5ddd8ef
use in-memory-redis
Aug 5, 2025
7f021d0
replaced with example
Aug 5, 2025
9ecca80
no longer hangs on client teardown
Aug 5, 2025
52f97c8
removed unnecessary
Aug 5, 2025
6e03a7f
rename
Aug 5, 2025
7e512eb
renamed
Aug 5, 2025
7e28787
removed unused
Aug 5, 2025
dc4b657
simplified names
Aug 5, 2025
e2422a7
used better name
Aug 5, 2025
b079ac5
added comments
Aug 5, 2025
c43ad07
use in memory redis
Aug 5, 2025
0722ff4
reverted
Aug 5, 2025
72b9d06
try to fix tests
Aug 5, 2025
7a99147
ensure no data remains
Aug 5, 2025
505547d
refactor
Aug 5, 2025
7860ae9
working
Aug 5, 2025
169f617
use in memory redis
Aug 5, 2025
c0e328a
bumped timeout
Aug 5, 2025
af796c8
use in_memory redis
Aug 5, 2025
0ffced5
changed wait policy
Aug 6, 2025
e14acad
removed debug log message
Aug 6, 2025
acee55a
removed unused
Aug 6, 2025
5780e3e
Merge remote-tracking branch 'upstream/master' into pr-osparc-long-ru…
Aug 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/celery-library/src/celery_library/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ async def create_task_manager(
),
client_name="celery_tasks",
)
await redis_client_sdk.setup()
# GCR please address https://github.com/ITISFoundation/osparc-simcore/issues/8159

return CeleryTaskManager(
app,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from collections.abc import Awaitable, Callable
from typing import Annotated, TypeAlias

from pydantic import BaseModel, Field, field_validator, validate_call
Expand All @@ -22,8 +23,16 @@ class TaskProgress(BaseModel):
message: ProgressMessage = ""
percent: ProgressPercent = 0.0

# used to propagate progress updates internally
_update_callback: Callable[["TaskProgress"], Awaitable[None]] | None = None

def set_update_callback(
self, callback: Callable[["TaskProgress"], Awaitable[None]]
) -> None:
self._update_callback = callback

@validate_call
def update(
async def update(
self,
*,
message: ProgressMessage | None = None,
Expand All @@ -40,6 +49,14 @@ def update(

_logger.debug("Progress update: %s", f"{self}")

if self._update_callback is not None:
await self._update_callback(self)
else:
_logger.warning(
"No update callback set for TaskProgress %s, progress will not be propagated",
self.task_id,
)

@classmethod
def create(cls, task_id: TaskId | None = None) -> "TaskProgress":
return cls(task_id=task_id)
Expand Down
26 changes: 26 additions & 0 deletions packages/pytest-simcore/src/pytest_simcore/long_running_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# pylint: disable=unused-argument

import functools

import pytest
from pytest_mock import MockerFixture
from servicelib.long_running_tasks import task
from servicelib.long_running_tasks._store.in_memory import InMemoryStore


def _mock_decorator_with_args(*decorator_args, **decorator_kwargs):
def decorator(func):
@functools.wraps(func)
async def wrapper(*func_args, **func_kwargs):
return await func(*func_args, **func_kwargs)

return wrapper

return decorator


@pytest.fixture
def use_in_memory_long_running_tasks(mocker: MockerFixture) -> None:
mocker.patch.object(task, "RedisStore", InMemoryStore)
# for testing the exclsive is not required so it's disabled
mocker.patch.object(task, "exclusive", _mock_decorator_with_args)
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import datetime

from aiohttp import web
from settings_library.redis import RedisSettings

from ...long_running_tasks.base_long_running_manager import BaseLongRunningManager
from ...long_running_tasks.task import TaskContext, TasksManager
from ...long_running_tasks.models import TaskContext
from ...long_running_tasks.task import Namespace, TasksManager
from ._constants import APP_LONG_RUNNING_MANAGER_KEY
from ._request import get_task_context

Expand All @@ -14,11 +16,15 @@ def __init__(
app: web.Application,
stale_task_check_interval: datetime.timedelta,
stale_task_detect_timeout: datetime.timedelta,
redis_settings: RedisSettings,
namespace: Namespace,
):
self._app = app
self._tasks_manager = TasksManager(
stale_task_check_interval=stale_task_check_interval,
stale_task_detect_timeout=stale_task_detect_timeout,
redis_settings=redis_settings,
namespace=namespace,
)

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async def list_tasks(request: web.Request) -> web.Response:
result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}",
abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}",
)
for t in lrt_api.list_tasks(
for t in await lrt_api.list_tasks(
long_running_manager.tasks_manager,
long_running_manager.get_task_context(request),
)
Expand All @@ -41,7 +41,7 @@ async def get_task_status(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(_PathParam, request)
long_running_manager = get_long_running_manager(request.app)

task_status: TaskStatus = lrt_api.get_task_status(
task_status: TaskStatus = await lrt_api.get_task_status(
long_running_manager.tasks_manager,
long_running_manager.get_task_context(request),
path_params.task_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@
from aiohttp import web
from common_library.json_serialization import json_dumps
from pydantic import AnyHttpUrl, TypeAdapter
from servicelib.long_running_tasks.task import Namespace
from settings_library.redis import RedisSettings

from ...aiohttp import status
from ...long_running_tasks import lrt_api
from ...long_running_tasks.constants import (
DEFAULT_STALE_TASK_CHECK_INTERVAL,
DEFAULT_STALE_TASK_DETECT_TIMEOUT,
)
from ...long_running_tasks.models import TaskGet
from ...long_running_tasks.task import RegisteredTaskName, TaskContext
from ...long_running_tasks.models import TaskContext, TaskGet
from ...long_running_tasks.task import RegisteredTaskName
from ..typing_extension import Handler
from . import _routes
from ._constants import (
Expand Down Expand Up @@ -91,9 +93,8 @@ async def start_long_running_task(
except asyncio.CancelledError:
# cancel the task, the client has disconnected
if task_id:
long_running_manager = get_long_running_manager(request_.app)
await long_running_manager.tasks_manager.cancel_task(
task_id, with_task_context=None
await lrt_api.cancel_task(
long_running_manager.tasks_manager, task_context, task_id
)
raise

Expand Down Expand Up @@ -121,6 +122,8 @@ def setup(
app: web.Application,
*,
router_prefix: str,
redis_settings: RedisSettings,
namespace: Namespace,
handler_check_decorator: Callable = _no_ops_decorator,
task_request_context_decorator: Callable = _no_task_context_decorator,
stale_task_check_interval: datetime.timedelta = DEFAULT_STALE_TASK_CHECK_INTERVAL,
Expand All @@ -146,6 +149,8 @@ async def on_cleanup_ctx(app: web.Application) -> AsyncGenerator[None, None]:
app=app,
stale_task_check_interval=stale_task_check_interval,
stale_task_detect_timeout=stale_task_detect_timeout,
redis_settings=redis_settings,
namespace=namespace,
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import datetime

from fastapi import FastAPI
from settings_library.redis import RedisSettings

from ...long_running_tasks.base_long_running_manager import BaseLongRunningManager
from ...long_running_tasks.task import TasksManager
from ...long_running_tasks.task import Namespace, TasksManager


class FastAPILongRunningManager(BaseLongRunningManager):
Expand All @@ -12,11 +13,15 @@ def __init__(
app: FastAPI,
stale_task_check_interval: datetime.timedelta,
stale_task_detect_timeout: datetime.timedelta,
redis_settings: RedisSettings,
namespace: Namespace,
):
self._app = app
self._tasks_manager = TasksManager(
stale_task_check_interval=stale_task_check_interval,
stale_task_detect_timeout=stale_task_detect_timeout,
redis_settings=redis_settings,
namespace=namespace,
)

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ async def list_tasks(
request.url_for("cancel_and_delete_task", task_id=t.task_id)
),
)
for t in lrt_api.list_tasks(
long_running_manager.tasks_manager, task_context=None
for t in await lrt_api.list_tasks(
long_running_manager.tasks_manager, task_context={}
)
]

Expand All @@ -51,8 +51,8 @@ async def get_task_status(
],
) -> TaskStatus:
assert request # nosec
return lrt_api.get_task_status(
long_running_manager.tasks_manager, task_context=None, task_id=task_id
return await lrt_api.get_task_status(
long_running_manager.tasks_manager, task_context={}, task_id=task_id
)


Expand All @@ -75,7 +75,7 @@ async def get_task_result(
) -> TaskResult | Any:
assert request # nosec
return await lrt_api.get_task_result(
long_running_manager.tasks_manager, task_context=None, task_id=task_id
long_running_manager.tasks_manager, task_context={}, task_id=task_id
)


Expand All @@ -98,5 +98,5 @@ async def cancel_and_delete_task(
) -> None:
assert request # nosec
await lrt_api.remove_task(
long_running_manager.tasks_manager, task_context=None, task_id=task_id
long_running_manager.tasks_manager, task_context={}, task_id=task_id
)
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import datetime

from fastapi import APIRouter, FastAPI
from settings_library.redis import RedisSettings

from ...long_running_tasks.constants import (
DEFAULT_STALE_TASK_CHECK_INTERVAL,
DEFAULT_STALE_TASK_DETECT_TIMEOUT,
)
from ...long_running_tasks.errors import BaseLongRunningError
from ...long_running_tasks.task import Namespace
from ._error_handlers import base_long_running_error_handler
from ._manager import FastAPILongRunningManager
from ._routes import router
Expand All @@ -16,6 +18,8 @@ def setup(
app: FastAPI,
*,
router_prefix: str = "",
redis_settings: RedisSettings,
namespace: Namespace,
stale_task_check_interval: datetime.timedelta = DEFAULT_STALE_TASK_CHECK_INTERVAL,
stale_task_detect_timeout: datetime.timedelta = DEFAULT_STALE_TASK_DETECT_TIMEOUT,
) -> None:
Expand All @@ -41,6 +45,8 @@ async def on_startup() -> None:
app=app,
stale_task_check_interval=stale_task_check_interval,
stale_task_detect_timeout=stale_task_detect_timeout,
redis_settings=redis_settings,
namespace=namespace,
)
)
await long_running_manager.setup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ async def redis_client_sdk_lifespan(_: FastAPI, state: State) -> AsyncIterator[S
redis_dsn_with_secrets,
client_name=redis_state.REDIS_CLIENT_NAME,
)
await redis_client.setup()

try:
yield {"REDIS_CLIENT_SDK": redis_client, **called_state}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from abc import abstractmethod

from ..models import TaskContext, TaskData, TaskId


class BaseStore:

@abstractmethod
async def get_task_data(self, task_id: TaskId) -> TaskData | None:
"""Retrieve a tracked task by its key."""

@abstractmethod
async def set_task_data(self, task_id: TaskId, value: TaskData) -> None:
"""Set a tracked task with its key."""

@abstractmethod
async def list_tasks_data(self) -> list[TaskData]:
"""List all tracked tasks."""

@abstractmethod
async def delete_task_data(self, task_id: TaskId) -> None:
"""Delete a tracked task by its key."""

@abstractmethod
async def set_as_cancelled(
self, task_id: TaskId, with_task_context: TaskContext
) -> None:
"""Mark a tracked task as cancelled."""

@abstractmethod
async def get_cancelled(self) -> dict[TaskId, TaskContext]:
"""Get cancelled tasks."""

@abstractmethod
async def setup(self) -> None:
"""Setup the store, if needed."""

@abstractmethod
async def shutdown(self) -> None:
"""Shutdown the store, if needed."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from ..models import TaskContext, TaskData, TaskId
from .base import BaseStore


class InMemoryStore(BaseStore):
def __init__(self, *args, **kwargs):
_ = args
_ = kwargs
self._tasks_data: dict[TaskId, TaskData] = {}
self._cancelled_tasks: dict[TaskId, TaskContext] = {}

async def setup(self) -> None:
pass

async def shutdown(self) -> None:
pass

async def get_task_data(self, task_id: TaskId) -> TaskData | None:
return self._tasks_data.get(task_id, None)

async def set_task_data(self, task_id: TaskId, value: TaskData) -> None:
self._tasks_data[task_id] = value

async def list_tasks_data(self) -> list[TaskData]:
return list(self._tasks_data.values())

async def delete_task_data(self, task_id: TaskId) -> None:
self._tasks_data.pop(task_id, None)

async def set_as_cancelled(
self, task_id: TaskId, with_task_context: TaskContext
) -> None:
self._cancelled_tasks[task_id] = with_task_context

async def get_cancelled(self) -> dict[TaskId, TaskContext]:
return self._cancelled_tasks
Loading
Loading