Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
18e3669
added after event manager
Oct 2, 2025
992cfc1
added rest of code
Oct 2, 2025
8da80c9
added cancellable operation
Oct 2, 2025
b48c7e4
revert changes
Oct 3, 2025
6ee4e25
fixed test
Oct 3, 2025
9039b37
added capability to run operation when one is done
Oct 3, 2025
bcf2721
added tests to ensure switching of operations
Oct 6, 2025
52b4f7d
Merge remote-tracking branch 'upstream/master' into migrate-dy-schedu…
Oct 6, 2025
33f1b6e
event is actually emitted when the previous operation finished
Oct 6, 2025
c5201f2
fixed flaky tests
Oct 6, 2025
8880e81
Merge remote-tracking branch 'upstream/master' into migrate-dy-schedu…
Oct 6, 2025
83448f4
mypy + pylint
Oct 6, 2025
e44c0d7
typos
Oct 6, 2025
0bf48e7
typos
Oct 6, 2025
d617187
fixed broken functionality
Oct 6, 2025
29a947b
Merge remote-tracking branch 'upstream/master' into migrate-dy-schedu…
Oct 6, 2025
e1c4201
pylint
Oct 6, 2025
8ef6313
removed commented commander
Oct 7, 2025
38dac25
typo
Oct 7, 2025
93b83e6
refactor event scheduler
Oct 7, 2025
65b5256
Merge remote-tracking branch 'upstream/master' into migrate-dy-schedu…
Oct 7, 2025
e91aead
Merge remote-tracking branch 'upstream/master' into migrate-dy-schedu…
Oct 8, 2025
8e6a4f2
added test
Oct 8, 2025
7e42181
operation is composed instead of inherited form list
Oct 8, 2025
0c72939
renamed create/undo to execute/revert
Oct 8, 2025
878bd62
Merge remote-tracking branch 'upstream/master' into migrate-dy-schedu…
Oct 8, 2025
b021862
mypy
Oct 8, 2025
ca93197
Merge remote-tracking branch 'upstream/master' into migrate-dy-schedu…
Oct 8, 2025
c100e56
Merge branch 'master' into migrate-dy-scheduler-6
GitHK Oct 8, 2025
a2e528f
address possible flaky
Oct 8, 2025
a1338ac
Merge branch 'migrate-dy-scheduler-6' of github.com:GitHK/osparc-simc…
Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
from ._core import (
cancel_operation,
restart_operation_step_stuck_during_undo,
restart_operation_step_stuck_in_manual_intervention_during_create,
restart_operation_step_stuck_during_revert,
restart_operation_step_stuck_in_manual_intervention_during_execute,
start_operation,
)
from ._deferred_runner import (
get_operation_context_proxy,
get_step_group_proxy,
get_step_store_proxy,
)
from ._event_after_registration import (
register_to_start_after_on_executed_completed,
register_to_start_after_on_reverted_completed,
)
from ._lifespan import generic_scheduler_lifespan
from ._models import (
OperationName,
OperationToStart,
ProvidedOperationContext,
RequiredOperationContext,
ScheduleId,
Expand All @@ -36,11 +41,14 @@
"OperationContextProxy",
"OperationName",
"OperationRegistry",
"OperationToStart",
"ParallelStepGroup",
"ProvidedOperationContext",
"register_to_start_after_on_executed_completed",
"register_to_start_after_on_reverted_completed",
"RequiredOperationContext",
"restart_operation_step_stuck_during_undo",
"restart_operation_step_stuck_in_manual_intervention_during_create",
"restart_operation_step_stuck_during_revert",
"restart_operation_step_stuck_in_manual_intervention_during_execute",
"ScheduleId",
"SingleStepGroup",
"start_operation",
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ async def get_steps_statuses(
async def start_and_mark_as_started(
step_proxy: StepStoreProxy,
*,
is_creating: bool,
is_executing: bool,
expected_steps_count: NonNegativeInt,
) -> None:
await DeferredRunner.start(
schedule_id=step_proxy.schedule_id,
operation_name=step_proxy.operation_name,
step_group_name=step_proxy.step_group_name,
step_name=step_proxy.step_name,
is_creating=is_creating,
is_executing=is_executing,
expected_steps_count=expected_steps_count,
)
await step_proxy.create_or_update_multiple(
Expand Down Expand Up @@ -117,7 +117,7 @@ async def get_step_error_traceback(
operation_name=operation_name,
step_group_name=current_step_group.get_step_group_name(index=group_index),
step_name=step_name,
is_creating=False,
is_executing=False,
)
return step_name, await step_proxy.read("error_traceback")

Expand All @@ -129,7 +129,7 @@ def get_group_step_proxies(
operation_name: OperationName,
group_index: NonNegativeInt,
step_group: BaseStepGroup,
is_creating: bool,
is_executing: bool,
) -> dict[StepName, StepStoreProxy]:
return {
step.get_step_name(): StepStoreProxy(
Expand All @@ -138,7 +138,7 @@ def get_group_step_proxies(
operation_name=operation_name,
step_group_name=step_group.get_step_group_name(index=group_index),
step_name=step.get_step_name(),
is_creating=is_creating,
is_executing=is_executing,
)
for step in step_group.get_step_subgroup_to_run()
}
Expand Down Expand Up @@ -168,7 +168,7 @@ async def _get_steps_to_start(
async def start_steps_which_were_not_started(
group_step_proxies: dict[StepName, StepStoreProxy],
*,
is_creating: bool,
is_executing: bool,
group_step_count: NonNegativeInt,
) -> bool:
"""retruns True if any step was started"""
Expand All @@ -186,7 +186,7 @@ async def start_steps_which_were_not_started(
*(
start_and_mark_as_started(
step_proxy,
is_creating=is_creating,
is_executing=is_executing,
expected_steps_count=group_step_count,
)
for step_proxy in to_start_step_proxies
Expand All @@ -198,11 +198,11 @@ async def start_steps_which_were_not_started(


async def cleanup_after_finishing(
store: Store, *, schedule_id: ScheduleId, is_creating: bool
store: Store, *, schedule_id: ScheduleId, is_executing: bool
) -> None:
removal_proxy = OperationRemovalProxy(store=store, schedule_id=schedule_id)
await removal_proxy.delete()
verb = "COMPLETED" if is_creating else "UNDONE"
verb = "COMPLETED" if is_executing else "REVERTED"
_logger.debug("Operation for schedule_id='%s' %s successfully", verb, schedule_id)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ def get_step_store_proxy(context: DeferredContext) -> StepStoreProxy:
operation_name: OperationName = context["operation_name"]
step_group_name: StepGroupName = context["step_group_name"]
step_name: StepName = context["step_name"]
is_creating = context["is_creating"]
is_executing = context["is_executing"]

return StepStoreProxy(
store=Store.get_from_app_state(app),
schedule_id=schedule_id,
operation_name=operation_name,
step_group_name=step_group_name,
step_name=step_name,
is_creating=is_creating,
is_executing=is_executing,
)


Expand All @@ -51,14 +51,14 @@ def get_step_group_proxy(context: DeferredContext) -> StepGroupProxy:
schedule_id: ScheduleId = context["schedule_id"]
operation_name: OperationName = context["operation_name"]
step_group_name: StepGroupName = context["step_group_name"]
is_creating = context["is_creating"]
is_executing = context["is_executing"]

return StepGroupProxy(
store=Store.get_from_app_state(app),
schedule_id=schedule_id,
operation_name=operation_name,
step_group_name=step_group_name,
is_creating=is_creating,
is_executing=is_executing,
)


Expand Down Expand Up @@ -124,36 +124,36 @@ async def start( # type:ignore[override] # pylint:disable=arguments-differ
operation_name: OperationName,
step_group_name: StepGroupName,
step_name: StepName,
is_creating: bool,
is_executing: bool,
expected_steps_count: NonNegativeInt,
) -> DeferredContext:
return {
"schedule_id": schedule_id,
"operation_name": operation_name,
"step_group_name": step_group_name,
"step_name": step_name,
"is_creating": is_creating,
"is_executing": is_executing,
"expected_steps_count": expected_steps_count,
}

@classmethod
async def get_retries(cls, context: DeferredContext) -> int:
is_creating = context["is_creating"]
is_executing = context["is_executing"]
step = _get_step(context)
return (
await step.get_create_retries(context)
if is_creating
else await step.get_undo_retries(context)
await step.get_execute_retries(context)
if is_executing
else await step.get_revert_retries(context)
)

@classmethod
async def get_timeout(cls, context: DeferredContext) -> timedelta:
is_creating = context["is_creating"]
is_executing = context["is_executing"]
step = _get_step(context)
return (
await step.get_create_wait_between_attempts(context)
if is_creating
else await step.get_undo_wait_between_attempts(context)
await step.get_execute_wait_between_attempts(context)
if is_executing
else await step.get_revert_wait_between_attempts(context)
)

@classmethod
Expand All @@ -165,7 +165,7 @@ async def on_created(cls, task_uid: TaskUID, context: DeferredContext) -> None:
@classmethod
async def run(cls, context: DeferredContext) -> None:
app = context["app"]
is_creating = context["is_creating"]
is_executing = context["is_executing"]

await get_step_store_proxy(context).create_or_update(
"status", StepStatus.RUNNING
Expand All @@ -175,31 +175,31 @@ async def run(cls, context: DeferredContext) -> None:

operation_context_proxy = get_operation_context_proxy(context)

if is_creating:
if is_executing:
required_context = await operation_context_proxy.read(
*step.get_create_requires_context_keys()
*step.get_execute_requires_context_keys()
)
_raise_if_any_context_value_is_none(required_context)

step_provided_operation_context = await step.create(app, required_context)
step_provided_operation_context = await step.execute(app, required_context)
provided_operation_context = step_provided_operation_context or {}
create_provides_keys = step.get_create_provides_context_keys()
execute_provides_keys = step.get_execute_provides_context_keys()

_raise_if_provided_context_keys_are_missing_or_none(
provided_operation_context, create_provides_keys
provided_operation_context, execute_provides_keys
)
else:
required_context = await operation_context_proxy.read(
*step.get_undo_requires_context_keys()
*step.get_revert_requires_context_keys()
)
_raise_if_any_context_value_is_none(required_context)

step_provided_operation_context = await step.undo(app, required_context)
step_provided_operation_context = await step.revert(app, required_context)
provided_operation_context = step_provided_operation_context or {}
undo_provides_keys = step.get_undo_provides_context_keys()
revert_provides_keys = step.get_revert_provides_context_keys()

_raise_if_provided_context_keys_are_missing_or_none(
provided_operation_context, undo_provides_keys
provided_operation_context, revert_provides_keys
)

await operation_context_proxy.create_or_update(provided_operation_context)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import TYPE_CHECKING

from fastapi import FastAPI

if TYPE_CHECKING:
from ._core import Core
from ._event_after import AfterEventManager
from ._event_scheduler import EventScheduler

# NOTE:
# Due to circular dependencies it is not possible to use the following:
# - `Core.get_from_app_state(app)`
# - `AfterEventManager.get_from_app_state(app)`
# - `EventScheduler.get_from_app_state(app)`
# This module avoids issues with circular dependencies


def get_core(app: FastAPI) -> "Core":
core: Core = app.state.generic_scheduler_core
return core


def get_after_event_manager(app: FastAPI) -> "AfterEventManager":
after_event_manager: AfterEventManager = app.state.after_event_manager
return after_event_manager


def get_event_scheduler(app: FastAPI) -> "EventScheduler":
event_scheduler: EventScheduler = app.state.generic_scheduler_event_scheduler
return event_scheduler
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class InitialOperationContextKeyNotAllowedError(BaseGenericSchedulerError):
)


class OperationNotCancellableError(BaseGenericSchedulerError):
msg_template: str = "Operation '{operation_name}' is not cancellable"


class CannotCancelWhileWaitingForManualInterventionError(BaseGenericSchedulerError):
msg_template: str = (
"Cannot cancel schedule_id='{schedule_id}' while one or more steps are waiting for manual intervention."
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,42 @@
from typing import TYPE_CHECKING

from fastapi import FastAPI

from ._models import ScheduleId

if TYPE_CHECKING:
from ._event_scheduler import EventScheduler
from ._dependencies import get_event_scheduler
from ._event_base_queue import OperationToStartEvent
from ._event_queues import ExecuteCompletedQueue, RevertCompletedQueue, ScheduleQueue
from ._models import OperationContext, OperationName, ScheduleId


async def enqueue_schedule_event(app: FastAPI, schedule_id: ScheduleId) -> None:
event_scheduler: EventScheduler = app.state.generic_scheduler_event_scheduler
await event_scheduler.enqueue_schedule_event(schedule_id)
await get_event_scheduler(app).enqueue_message_for(ScheduleQueue, schedule_id)


async def enqueue_execute_completed_event(
app: FastAPI,
schedule_id: ScheduleId,
operation_name: OperationName,
initial_context: OperationContext,
) -> None:
await get_event_scheduler(app).enqueue_message_for(
ExecuteCompletedQueue,
OperationToStartEvent(
schedule_id=schedule_id,
operation_name=operation_name,
initial_context=initial_context,
),
)


async def enqueue_revert_completed_event(
app: FastAPI,
schedule_id: ScheduleId,
operation_name: OperationName,
initial_context: OperationContext,
) -> None:
await get_event_scheduler(app).enqueue_message_for(
RevertCompletedQueue,
OperationToStartEvent(
schedule_id=schedule_id,
operation_name=operation_name,
initial_context=initial_context,
),
)
Loading
Loading