Skip to content

Commit 8263a5b

Browse files
author
Andrei Neagu
committed
Merge remote-tracking branch 'upstream/master' into pr-osparc-resilient-long-running-decorator
2 parents a81a2d8 + 3c12ac2 commit 8263a5b

File tree

148 files changed

+3564
-1106
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

148 files changed

+3564
-1106
lines changed

api/specs/web-server/_functions.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# pylint: disable=protected-access
2+
# pylint: disable=redefined-outer-name
3+
# pylint: disable=too-many-arguments
4+
# pylint: disable=unused-argument
5+
# pylint: disable=unused-variable
6+
7+
8+
from typing import Annotated
9+
10+
from fastapi import APIRouter, Depends, status
11+
from models_library.api_schemas_webserver.functions import (
12+
FunctionToRegister,
13+
RegisteredFunctionGet,
14+
)
15+
from models_library.generics import Envelope
16+
from simcore_service_webserver._meta import API_VTAG
17+
from simcore_service_webserver.functions._controller._functions_rest_schemas import (
18+
FunctionPathParams,
19+
)
20+
21+
router = APIRouter(
22+
prefix=f"/{API_VTAG}",
23+
tags=[
24+
"functions",
25+
],
26+
)
27+
28+
29+
@router.post(
30+
"/functions",
31+
response_model=Envelope[RegisteredFunctionGet],
32+
)
33+
async def register_function(
34+
_body: FunctionToRegister,
35+
) -> Envelope[RegisteredFunctionGet]: ...
36+
37+
38+
@router.get(
39+
"/functions/{function_id}",
40+
response_model=Envelope[RegisteredFunctionGet],
41+
)
42+
async def get_function(
43+
_path: Annotated[FunctionPathParams, Depends()],
44+
): ...
45+
46+
47+
@router.delete(
48+
"/functions/{function_id}",
49+
status_code=status.HTTP_204_NO_CONTENT,
50+
)
51+
async def delete_function(
52+
_path: Annotated[FunctionPathParams, Depends()],
53+
): ...

api/specs/web-server/openapi.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
"_computations",
3636
"_exporter",
3737
"_folders",
38+
"_functions",
3839
"_long_running_tasks",
3940
"_long_running_tasks_legacy",
4041
"_licensed_items",
Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,72 @@
1-
from typing import TypeAlias
1+
from typing import Final, Literal, TypeAlias
2+
3+
from dask.typing import Key
4+
from distributed.scheduler import TaskStateState as SchedulerTaskState
5+
from distributed.worker_state_machine import TaskStateState as WorkerTaskState
6+
from models_library.projects_state import RunningState
7+
from pydantic import BaseModel
28

39
DaskJobID: TypeAlias = str
410
DaskResources: TypeAlias = dict[str, int | float]
11+
12+
TASK_LIFE_CYCLE_EVENT: Final[str] = "task-lifecycle-{key}"
13+
TASK_RUNNING_PROGRESS_EVENT: Final[str] = "task-progress-{key}"
14+
_SCHEDULER_TASK_STATE_TO_RUNNING_STATE: Final[
15+
dict[SchedulerTaskState, RunningState]
16+
] = {
17+
"released": RunningState.NOT_STARTED, # Known but not actively computing or in memory
18+
"waiting": RunningState.PENDING, # On track to be computed, waiting on dependencies to arrive in memory
19+
"no-worker": RunningState.WAITING_FOR_RESOURCES, # Ready to be computed, but no appropriate worker exists (for example because of resource restrictions, or because no worker is connected at all).
20+
"queued": RunningState.WAITING_FOR_RESOURCES, # Ready to be computed, but all workers are already full.
21+
"processing": RunningState.PENDING, # All dependencies are available and the task is assigned to a worker for compute (the scheduler doesn’t know whether it’s in a worker queue or actively being computed).
22+
"memory": RunningState.SUCCESS, # In memory on one or more workers
23+
"erred": RunningState.FAILED, # Task computation, or one of its dependencies, has encountered an error
24+
"forgotten": RunningState.UNKNOWN, # Task is no longer needed by any client or dependent task, so it disappears from the scheduler as well. As soon as a task reaches this state, it is immediately dereferenced from the scheduler.
25+
}
26+
27+
_WORKER_TASK_STATE_TO_RUNNING_STATE: Final[dict[WorkerTaskState, RunningState]] = {
28+
"cancelled": RunningState.ABORTED, # The scheduler asked to forget about this task, but it’s technically impossible at the moment. See Task cancellation. The task can be found in whatever collections it was in its previous state.
29+
"constrained": RunningState.PENDING, # Like ready, but the user specified resource constraints for this task. The task can be found in the WorkerState.constrained queue.
30+
"error": RunningState.FAILED, # Task execution failed
31+
"executing": RunningState.STARTED, # The task is currently being computed on a thread. It can be found in the WorkerState.executing set and in the distributed.worker.Worker.active_threads dict.
32+
"fetch": RunningState.PENDING, # This task is in memory on one or more peer workers, but not on this worker. Its data is queued to be transferred over the network, either because it’s a dependency of a task in waiting state, or because the Active Memory Manager requested it to be replicated here. The task can be found in the WorkerState.data_needed heap.
33+
"flight": RunningState.PENDING, # The task data is currently being transferred over the network from another worker. The task can be found in the WorkerState.in_flight_tasks and WorkerState.in_flight_workers collections.
34+
"forgotten": RunningState.UNKNOWN, # The scheduler asked this worker to forget about the task, and there are neither dependents nor dependencies on the same worker.
35+
"long-running": RunningState.STARTED, # Like executing, but the user code called distributed.secede() so the task no longer counts towards the maximum number of concurrent tasks. It can be found in the WorkerState.long_running set and in the distributed.worker.Worker.active_threads dict.
36+
"memory": RunningState.SUCCESS, # Task execution completed, or the task was successfully transferred from another worker, and is now held in either WorkerState.data or WorkerState.actors.
37+
"missing": RunningState.PENDING, # Like fetch, but all peer workers that were listed by the scheduler are either unreachable or have responded they don’t actually have the task data. The worker will periodically ask the scheduler if it knows of additional replicas; when it does, the task will transition again to fetch. The task can be found in the WorkerState.missing_dep_flight set.
38+
"ready": RunningState.PENDING, # The task is ready to be computed; all of its dependencies are in memory on the current worker and it’s waiting for an available thread. The task can be found in the WorkerState.ready heap.
39+
"released": RunningState.PENDING, # Known but not actively computing or in memory. A task can stay in this state when the scheduler asked to forget it, but it has dependent tasks on the same worker.
40+
"rescheduled": RunningState.PENDING, # The task just raised the Reschedule exception. This is a transitory state, which is not stored permanently.
41+
"resumed": RunningState.PENDING, # The task was recovered from cancelled state. See Task cancellation. The task can be found in whatever collections it was in its previous state.
42+
"waiting": RunningState.PENDING, # The scheduler has added the task to the worker queue. All of its dependencies are in memory somewhere on the cluster, but not all of them are in memory on the current worker, so they need to be fetched.
43+
}
44+
45+
46+
class TaskLifeCycleState(BaseModel):
47+
key: str
48+
source: Literal["scheduler", "worker"]
49+
worker: str | None
50+
state: RunningState
51+
52+
@classmethod
53+
def from_scheduler_task_state(
54+
cls, key: Key, worker: str | None, task_state: SchedulerTaskState
55+
) -> "TaskLifeCycleState":
56+
return cls(
57+
key=f"{key!r}",
58+
source="scheduler",
59+
worker=worker,
60+
state=_SCHEDULER_TASK_STATE_TO_RUNNING_STATE[task_state],
61+
)
62+
63+
@classmethod
64+
def from_worker_task_state(
65+
cls, key: Key, worker: str | None, task_state: WorkerTaskState
66+
) -> "TaskLifeCycleState":
67+
return cls(
68+
key=f"{key!r}",
69+
source="worker",
70+
worker=worker,
71+
state=_WORKER_TASK_STATE_TO_RUNNING_STATE[task_state],
72+
)

packages/dask-task-models-library/src/dask_task_models_library/plugins/__init__.py

Whitespace-only changes.
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# pylint: disable=unused-argument
2+
import logging
3+
from typing import Any
4+
5+
import click
6+
from dask.typing import Key
7+
from distributed import Scheduler, SchedulerPlugin
8+
from distributed.scheduler import TaskStateState
9+
10+
from ..models import TASK_LIFE_CYCLE_EVENT, TaskLifeCycleState
11+
12+
_logger = logging.getLogger(__name__)
13+
14+
15+
class TaskLifecycleSchedulerPlugin(SchedulerPlugin):
16+
def __init__(self) -> None:
17+
self.scheduler = None
18+
_logger.info("initialized TaskLifecycleSchedulerPlugin")
19+
20+
async def start(self, scheduler: Scheduler) -> None:
21+
self.scheduler = scheduler # type: ignore[assignment]
22+
_logger.info("started TaskLifecycleSchedulerPlugin")
23+
24+
def transition(
25+
self,
26+
key: Key,
27+
start: TaskStateState,
28+
finish: TaskStateState,
29+
*args: Any, # noqa: ARG002
30+
stimulus_id: str,
31+
**kwargs: Any,
32+
):
33+
_logger.debug(
34+
"Task %s transition from %s to %s due to %s",
35+
key,
36+
start,
37+
finish,
38+
stimulus_id,
39+
)
40+
41+
assert self.scheduler # nosec
42+
43+
self.scheduler.log_event(
44+
TASK_LIFE_CYCLE_EVENT.format(key=key),
45+
TaskLifeCycleState.from_scheduler_task_state(
46+
key, kwargs.get("worker"), finish
47+
).model_dump(mode="json"),
48+
)
49+
50+
51+
@click.command()
52+
def dask_setup(scheduler):
53+
plugin = TaskLifecycleSchedulerPlugin()
54+
scheduler.add_plugin(plugin)
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import logging
2+
from collections.abc import Awaitable
3+
from typing import Any
4+
5+
import click
6+
from dask.typing import Key
7+
from distributed import WorkerPlugin
8+
from distributed.worker import Worker
9+
from distributed.worker_state_machine import TaskStateState
10+
11+
from ..models import TASK_LIFE_CYCLE_EVENT, TaskLifeCycleState
12+
13+
_logger = logging.getLogger(__name__)
14+
15+
16+
class TaskLifecycleWorkerPlugin(WorkerPlugin):
17+
def __init__(self) -> None:
18+
self._worker = None
19+
_logger.info("TaskLifecycleWorkerPlugin initialized")
20+
21+
def setup(self, worker: Worker) -> Awaitable[None]:
22+
async def _() -> None:
23+
self._worker = worker # type: ignore[assignment]
24+
_logger.info("TaskLifecycleWorkerPlugin setup completed")
25+
26+
return _()
27+
28+
def transition(
29+
self,
30+
key: Key,
31+
start: TaskStateState,
32+
finish: TaskStateState,
33+
**kwargs: Any,
34+
):
35+
_logger.info("Task '%s' transition from %s to %s", key, start, finish)
36+
assert self._worker # nosec
37+
self._worker.log_event(
38+
TASK_LIFE_CYCLE_EVENT.format(key=key),
39+
TaskLifeCycleState.from_worker_task_state(
40+
key, kwargs.get("worker"), finish
41+
).model_dump(mode="json"),
42+
)
43+
44+
45+
@click.command()
46+
async def dask_setup(worker: Worker) -> None:
47+
plugin = TaskLifecycleWorkerPlugin()
48+
await worker.plugin_add(plugin)
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# pylint: disable=unused-import
2+
3+
from ..functions import (
4+
Function,
5+
FunctionClass,
6+
FunctionClassSpecificData,
7+
FunctionID,
8+
FunctionIDNotFoundError,
9+
FunctionInputs,
10+
FunctionInputSchema,
11+
FunctionInputsList,
12+
FunctionInputsValidationError,
13+
FunctionJob,
14+
FunctionJobClassSpecificData,
15+
FunctionJobCollection,
16+
FunctionJobCollectionID,
17+
FunctionJobCollectionIDNotFoundError,
18+
FunctionJobCollectionStatus,
19+
FunctionJobID,
20+
FunctionJobIDNotFoundError,
21+
FunctionJobStatus,
22+
FunctionOutputs,
23+
FunctionOutputSchema,
24+
FunctionSchemaClass,
25+
JSONFunctionInputSchema,
26+
JSONFunctionOutputSchema,
27+
ProjectFunction,
28+
ProjectFunctionJob,
29+
RegisteredFunction,
30+
RegisteredFunctionJob,
31+
RegisteredFunctionJobCollection,
32+
RegisteredProjectFunction,
33+
RegisteredProjectFunctionJob,
34+
SolverFunction,
35+
SolverFunctionJob,
36+
UnsupportedFunctionClassError,
37+
UnsupportedFunctionFunctionJobClassCombinationError,
38+
)
39+
40+
__all__ = [
41+
"Function",
42+
"FunctionClass",
43+
"FunctionClassSpecificData",
44+
"FunctionID",
45+
"FunctionIDNotFoundError",
46+
"FunctionInputSchema",
47+
"FunctionInputs",
48+
"FunctionInputsList",
49+
"FunctionInputsValidationError",
50+
"FunctionJob",
51+
"FunctionJobClassSpecificData",
52+
"FunctionJobCollection",
53+
"FunctionJobCollectionID",
54+
"FunctionJobCollectionIDNotFoundError",
55+
"FunctionJobCollectionStatus",
56+
"FunctionJobID",
57+
"FunctionJobIDNotFoundError",
58+
"FunctionJobStatus",
59+
"FunctionOutputSchema",
60+
"FunctionOutputs",
61+
"FunctionSchemaClass",
62+
"JSONFunctionInputSchema",
63+
"JSONFunctionOutputSchema",
64+
"ProjectFunction",
65+
"ProjectFunctionJob",
66+
"RegisteredFunction",
67+
"RegisteredFunctionJob",
68+
"RegisteredFunctionJobCollection",
69+
"RegisteredProjectFunction",
70+
"RegisteredProjectFunctionJob",
71+
"SolverFunction",
72+
"SolverFunctionJob",
73+
"UnsupportedFunctionClassError",
74+
"UnsupportedFunctionFunctionJobClassCombinationError",
75+
]

packages/models-library/src/models_library/api_schemas_catalog/services.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,5 +385,19 @@ class ServiceListFilters(Filters):
385385
),
386386
] = None
387387

388+
service_key_pattern: Annotated[
389+
str | None,
390+
Field(
391+
description="Filter services by key pattern (e.g. 'simcore/services/comp/itis/*')",
392+
),
393+
] = None
394+
395+
version_display_pattern: Annotated[
396+
str | None,
397+
Field(
398+
description="Filter services by version display pattern (e.g. '*2023*')",
399+
),
400+
] = None
401+
388402

389403
__all__: tuple[str, ...] = ("ServiceRelease",)

0 commit comments

Comments
 (0)