Skip to content

Commit 7d9939c

Browse files
author
Andrei Neagu
committed
Merge remote-tracking branch 'upstream/master' into pr-osparc-migrate-dy-scheduler-part2
2 parents cf96cf3 + d0d210d commit 7d9939c

File tree

202 files changed

+3718
-1627
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

202 files changed

+3718
-1627
lines changed

.github/workflows/ci-pact-master.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ jobs:
2222
PACT_BROKER_PASSWORD: ${{ secrets.PACT_BROKER_PASSWORD }}
2323
steps:
2424
- name: setup python environment
25-
uses: actions/setup-python@v5
25+
uses: actions/setup-python@v6
2626
with:
2727
python-version: "3.11"
2828
- name: install uv

.github/workflows/ci-testing-deploy.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,7 @@ jobs:
861861
with:
862862
driver: docker-container
863863
- name: setup python environment
864-
uses: actions/setup-python@v5
864+
uses: actions/setup-python@v6
865865
with:
866866
python-version: ${{ matrix.python }}
867867
- name: install uv
@@ -1851,7 +1851,7 @@ jobs:
18511851
with:
18521852
python-version: ${{ matrix.python }}
18531853
cache-dependency-glob: "**/e2e/requirements/requirements.txt"
1854-
- uses: actions/setup-node@v4.4.0
1854+
- uses: actions/setup-node@v5.0.0
18551855
with:
18561856
node-version: ${{ matrix.node }}
18571857
cache: "npm"

.github/workflows/ci-testing-pull-request.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ jobs:
3535
runs-on: ubuntu-latest
3636
steps:
3737
- name: setup python environment
38-
uses: actions/setup-python@v5
38+
uses: actions/setup-python@v6
3939
with:
4040
python-version: "3.11"
4141
- name: install uv
@@ -65,7 +65,7 @@ jobs:
6565
runs-on: ubuntu-latest
6666
steps:
6767
- name: setup python environment
68-
uses: actions/setup-python@v5
68+
uses: actions/setup-python@v6
6969
with:
7070
python-version: "3.11"
7171
- name: checkout
@@ -93,7 +93,7 @@ jobs:
9393
runs-on: ubuntu-latest
9494
steps:
9595
- name: setup python environment
96-
uses: actions/setup-python@v5
96+
uses: actions/setup-python@v6
9797
with:
9898
python-version: "3.11"
9999
- name: checkout

api/specs/web-server/_functions.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
FunctionGroupPathParams,
2424
)
2525
from simcore_service_webserver.functions._controller._functions_rest_schemas import (
26+
FunctionDeleteQueryParams,
2627
FunctionGetQueryParams,
2728
FunctionPathParams,
2829
FunctionsListQueryParams,
@@ -80,6 +81,7 @@ async def update_function(
8081
)
8182
async def delete_function(
8283
_path: Annotated[FunctionPathParams, Depends()],
84+
_query: Annotated[as_query(FunctionDeleteQueryParams), Depends()],
8385
): ...
8486

8587

packages/aws-library/requirements/_base.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ jmespath==1.0.1
118118
# aiobotocore
119119
# boto3
120120
# botocore
121+
jsonref==1.1.0
122+
# via
123+
# -r requirements/../../../packages/models-library/requirements/_base.in
124+
# -r requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/_base.in
121125
jsonschema==4.23.0
122126
# via
123127
# -r requirements/../../../packages/models-library/requirements/_base.in

packages/celery-library/requirements/_base.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ idna==3.10
109109
# yarl
110110
importlib-metadata==8.6.1
111111
# via opentelemetry-api
112+
jsonref==1.1.0
113+
# via
114+
# -r requirements/../../../packages/models-library/requirements/_base.in
115+
# -r requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/_base.in
112116
jsonschema==4.23.0
113117
# via
114118
# -r requirements/../../../packages/models-library/requirements/_base.in

packages/celery-library/src/celery_library/backends/_redis.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
_CELERY_TASK_INFO_PREFIX: Final[str] = "celery-task-info-"
2020
_CELERY_TASK_ID_KEY_ENCODING = "utf-8"
2121
_CELERY_TASK_ID_KEY_SEPARATOR: Final[str] = ":"
22-
_CELERY_TASK_SCAN_COUNT_PER_BATCH: Final[int] = 10000
22+
_CELERY_TASK_SCAN_COUNT_PER_BATCH: Final[int] = 1000
2323
_CELERY_TASK_METADATA_KEY: Final[str] = "metadata"
2424
_CELERY_TASK_PROGRESS_KEY: Final[str] = "progress"
2525

@@ -51,11 +51,6 @@ async def create_task(
5151
expiry,
5252
)
5353

54-
async def exists_task(self, task_id: TaskID) -> bool:
55-
n = await self._redis_client_sdk.redis.exists(_build_key(task_id))
56-
assert isinstance(n, int) # nosec
57-
return n > 0
58-
5954
async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None:
6055
raw_result = await self._redis_client_sdk.redis.hget(_build_key(task_id), _CELERY_TASK_METADATA_KEY) # type: ignore
6156
if not raw_result:
@@ -131,3 +126,8 @@ async def set_task_progress(self, task_id: TaskID, report: ProgressReport) -> No
131126
key=_CELERY_TASK_PROGRESS_KEY,
132127
value=report.model_dump_json(),
133128
) # type: ignore
129+
130+
async def task_exists(self, task_id: TaskID) -> bool:
131+
n = await self._redis_client_sdk.redis.exists(_build_key(task_id))
132+
assert isinstance(n, int) # nosec
133+
return n > 0

packages/celery-library/src/celery_library/errors.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import base64
22
import pickle
33

4+
from common_library.errors_classes import OsparcErrorMixin
5+
46

57
class TransferrableCeleryError(Exception):
68
def __repr__(self) -> str:
@@ -22,3 +24,7 @@ def decode_celery_transferrable_error(error: TransferrableCeleryError) -> Except
2224
assert isinstance(error, TransferrableCeleryError) # nosec
2325
result: Exception = pickle.loads(base64.b64decode(error.args[0])) # noqa: S301
2426
return result
27+
28+
29+
class TaskNotFoundError(OsparcErrorMixin, Exception):
30+
msg_template = "Task with id '{task_id}' was not found"

packages/celery-library/src/celery_library/rpc/_async_jobs.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from models_library.api_schemas_rpc_async_jobs.exceptions import (
1414
JobAbortedError,
1515
JobError,
16+
JobMissingError,
1617
JobNotDoneError,
1718
JobSchedulerError,
1819
)
@@ -22,6 +23,7 @@
2223
from servicelib.rabbitmq import RPCRouter
2324

2425
from ..errors import (
26+
TaskNotFoundError,
2527
TransferrableCeleryError,
2628
decode_celery_transferrable_error,
2729
)
@@ -30,7 +32,7 @@
3032
router = RPCRouter()
3133

3234

33-
@router.expose(reraise_if_error_type=(JobSchedulerError,))
35+
@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
3436
async def cancel(
3537
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
3638
):
@@ -42,11 +44,13 @@ async def cancel(
4244
task_filter=task_filter,
4345
task_uuid=job_id,
4446
)
47+
except TaskNotFoundError as exc:
48+
raise JobMissingError(job_id=job_id) from exc
4549
except CeleryError as exc:
4650
raise JobSchedulerError(exc=f"{exc}") from exc
4751

4852

49-
@router.expose(reraise_if_error_type=(JobSchedulerError,))
53+
@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError))
5054
async def status(
5155
task_manager: TaskManager, job_id: AsyncJobId, job_filter: AsyncJobFilter
5256
) -> AsyncJobStatus:
@@ -59,6 +63,8 @@ async def status(
5963
task_filter=task_filter,
6064
task_uuid=job_id,
6165
)
66+
except TaskNotFoundError as exc:
67+
raise JobMissingError(job_id=job_id) from exc
6268
except CeleryError as exc:
6369
raise JobSchedulerError(exc=f"{exc}") from exc
6470

@@ -71,9 +77,10 @@ async def status(
7177

7278
@router.expose(
7379
reraise_if_error_type=(
80+
JobAbortedError,
7481
JobError,
82+
JobMissingError,
7583
JobNotDoneError,
76-
JobAbortedError,
7784
JobSchedulerError,
7885
)
7986
)
@@ -97,11 +104,11 @@ async def result(
97104
task_filter=task_filter,
98105
task_uuid=job_id,
99106
)
107+
except TaskNotFoundError as exc:
108+
raise JobMissingError(job_id=job_id) from exc
100109
except CeleryError as exc:
101110
raise JobSchedulerError(exc=f"{exc}") from exc
102111

103-
if _status.task_state == TaskState.ABORTED:
104-
raise JobAbortedError(job_id=job_id)
105112
if _status.task_state == TaskState.FAILURE:
106113
# fallback exception to report
107114
exc_type = type(_result).__name__

packages/celery-library/src/celery_library/task.py

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,7 @@
66
from functools import wraps
77
from typing import Any, Concatenate, Final, ParamSpec, TypeVar, overload
88

9-
from celery import Celery # type: ignore[import-untyped]
10-
from celery.contrib.abortable import ( # type: ignore[import-untyped]
11-
AbortableAsyncResult,
12-
AbortableTask,
13-
)
9+
from celery import Celery, Task # type: ignore[import-untyped]
1410
from celery.exceptions import Ignore # type: ignore[import-untyped]
1511
from common_library.async_tools import cancel_wait_task
1612
from pydantic import NonNegativeInt
@@ -39,42 +35,42 @@ class TaskAbortedError(Exception): ...
3935
def _async_task_wrapper(
4036
app: Celery,
4137
) -> Callable[
42-
[Callable[Concatenate[AbortableTask, P], Coroutine[Any, Any, R]]],
43-
Callable[Concatenate[AbortableTask, P], R],
38+
[Callable[Concatenate[Task, P], Coroutine[Any, Any, R]]],
39+
Callable[Concatenate[Task, P], R],
4440
]:
4541
def decorator(
46-
coro: Callable[Concatenate[AbortableTask, P], Coroutine[Any, Any, R]],
47-
) -> Callable[Concatenate[AbortableTask, P], R]:
42+
coro: Callable[Concatenate[Task, P], Coroutine[Any, Any, R]],
43+
) -> Callable[Concatenate[Task, P], R]:
4844
@wraps(coro)
49-
def wrapper(task: AbortableTask, *args: P.args, **kwargs: P.kwargs) -> R:
45+
def wrapper(task: Task, *args: P.args, **kwargs: P.kwargs) -> R:
5046
app_server = get_app_server(app)
5147
# NOTE: task.request is a thread local object, so we need to pass the id explicitly
5248
assert task.request.id is not None # nosec
5349

54-
async def run_task(task_id: TaskID) -> R:
50+
async def _run_task(task_id: TaskID) -> R:
5551
try:
5652
async with asyncio.TaskGroup() as tg:
57-
main_task = tg.create_task(
53+
async_io_task = tg.create_task(
5854
coro(task, *args, **kwargs),
5955
)
6056

61-
async def abort_monitor():
62-
abortable_result = AbortableAsyncResult(task_id, app=app)
63-
while not main_task.done():
64-
if abortable_result.is_aborted():
57+
async def _abort_monitor():
58+
while not async_io_task.done():
59+
if not await app_server.task_manager.task_exists(
60+
task_id
61+
):
6562
await cancel_wait_task(
66-
main_task,
63+
async_io_task,
6764
max_delay=_DEFAULT_CANCEL_TASK_TIMEOUT.total_seconds(),
6865
)
69-
AbortableAsyncResult(task_id, app=app).forget()
7066
raise TaskAbortedError
7167
await asyncio.sleep(
7268
_DEFAULT_ABORT_TASK_TIMEOUT.total_seconds()
7369
)
7470

75-
tg.create_task(abort_monitor())
71+
tg.create_task(_abort_monitor())
7672

77-
return main_task.result()
73+
return async_io_task.result()
7874
except BaseExceptionGroup as eg:
7975
task_aborted_errors, other_errors = eg.split(TaskAbortedError)
8076

@@ -88,7 +84,7 @@ async def abort_monitor():
8884
raise other_errors.exceptions[0] from eg
8985

9086
return asyncio.run_coroutine_threadsafe(
91-
run_task(task.request.id),
87+
_run_task(task.request.id),
9288
app_server.event_loop,
9389
).result()
9490

@@ -102,14 +98,14 @@ def _error_handling(
10298
delay_between_retries: timedelta,
10399
dont_autoretry_for: tuple[type[Exception], ...],
104100
) -> Callable[
105-
[Callable[Concatenate[AbortableTask, P], R]],
106-
Callable[Concatenate[AbortableTask, P], R],
101+
[Callable[Concatenate[Task, P], R]],
102+
Callable[Concatenate[Task, P], R],
107103
]:
108104
def decorator(
109-
func: Callable[Concatenate[AbortableTask, P], R],
110-
) -> Callable[Concatenate[AbortableTask, P], R]:
105+
func: Callable[Concatenate[Task, P], R],
106+
) -> Callable[Concatenate[Task, P], R]:
111107
@wraps(func)
112-
def wrapper(task: AbortableTask, *args: P.args, **kwargs: P.kwargs) -> R:
108+
def wrapper(task: Task, *args: P.args, **kwargs: P.kwargs) -> R:
113109
try:
114110
return func(task, *args, **kwargs)
115111
except TaskAbortedError as exc:
@@ -144,7 +140,7 @@ def wrapper(task: AbortableTask, *args: P.args, **kwargs: P.kwargs) -> R:
144140
@overload
145141
def register_task(
146142
app: Celery,
147-
fn: Callable[Concatenate[AbortableTask, TaskID, P], Coroutine[Any, Any, R]],
143+
fn: Callable[Concatenate[Task, TaskID, P], Coroutine[Any, Any, R]],
148144
task_name: str | None = None,
149145
timeout: timedelta | None = _DEFAULT_TASK_TIMEOUT,
150146
max_retries: NonNegativeInt = _DEFAULT_MAX_RETRIES,
@@ -156,7 +152,7 @@ def register_task(
156152
@overload
157153
def register_task(
158154
app: Celery,
159-
fn: Callable[Concatenate[AbortableTask, P], R],
155+
fn: Callable[Concatenate[Task, P], R],
160156
task_name: str | None = None,
161157
timeout: timedelta | None = _DEFAULT_TASK_TIMEOUT,
162158
max_retries: NonNegativeInt = _DEFAULT_MAX_RETRIES,
@@ -168,8 +164,8 @@ def register_task(
168164
def register_task( # type: ignore[misc]
169165
app: Celery,
170166
fn: (
171-
Callable[Concatenate[AbortableTask, TaskID, P], Coroutine[Any, Any, R]]
172-
| Callable[Concatenate[AbortableTask, P], R]
167+
Callable[Concatenate[Task, TaskID, P], Coroutine[Any, Any, R]]
168+
| Callable[Concatenate[Task, P], R]
173169
),
174170
task_name: str | None = None,
175171
timeout: timedelta | None = _DEFAULT_TASK_TIMEOUT,
@@ -186,7 +182,7 @@ def register_task( # type: ignore[misc]
186182
delay_between_retries -- dealy between each attempt in case of error (default: {_DEFAULT_WAIT_BEFORE_RETRY})
187183
dont_autoretry_for -- exceptions that should not be retried when raised by the task
188184
"""
189-
wrapped_fn: Callable[Concatenate[AbortableTask, P], R]
185+
wrapped_fn: Callable[Concatenate[Task, P], R]
190186
if asyncio.iscoroutinefunction(fn):
191187
wrapped_fn = _async_task_wrapper(app)(fn)
192188
else:
@@ -202,7 +198,6 @@ def register_task( # type: ignore[misc]
202198
app.task(
203199
name=task_name or fn.__name__,
204200
bind=True,
205-
base=AbortableTask,
206201
time_limit=None if timeout is None else timeout.total_seconds(),
207202
pydantic=True,
208203
)(wrapped_fn)

0 commit comments

Comments
 (0)