Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
18e3669
added after event manager
Oct 2, 2025
992cfc1
added rest of code
Oct 2, 2025
8da80c9
added cancellable operation
Oct 2, 2025
b48c7e4
revert changes
Oct 3, 2025
6ee4e25
fixed test
Oct 3, 2025
9039b37
added capability to run operation when one is done
Oct 3, 2025
bcf2721
added tests to ensure switching of operations
Oct 6, 2025
52b4f7d
Merge remote-tracking branch 'upstream/master' into migrate-dy-schedu…
Oct 6, 2025
33f1b6e
event is actually emitted when the previous operation finished
Oct 6, 2025
c5201f2
fixed flaky tests
Oct 6, 2025
8880e81
Merge remote-tracking branch 'upstream/master' into migrate-dy-schedu…
Oct 6, 2025
83448f4
mypy + pylint
Oct 6, 2025
e44c0d7
typos
Oct 6, 2025
0bf48e7
typos
Oct 6, 2025
d617187
fixed broken functionality
Oct 6, 2025
29a947b
Merge remote-tracking branch 'upstream/master' into migrate-dy-schedu…
Oct 6, 2025
e1c4201
pylint
Oct 6, 2025
8ef6313
removed commented commander
Oct 7, 2025
38dac25
typo
Oct 7, 2025
93b83e6
refactor event scheduler
Oct 7, 2025
65b5256
Merge remote-tracking branch 'upstream/master' into migrate-dy-schedu…
Oct 7, 2025
e91aead
Merge remote-tracking branch 'upstream/master' into migrate-dy-schedu…
Oct 8, 2025
8e6a4f2
added test
Oct 8, 2025
7e42181
operation is composed instead of inherited form list
Oct 8, 2025
0c72939
renamed create/undo to execute/revert
Oct 8, 2025
878bd62
Merge remote-tracking branch 'upstream/master' into migrate-dy-schedu…
Oct 8, 2025
b021862
mypy
Oct 8, 2025
ca93197
Merge remote-tracking branch 'upstream/master' into migrate-dy-schedu…
Oct 8, 2025
c100e56
Merge branch 'master' into migrate-dy-scheduler-6
GitHK Oct 8, 2025
a2e528f
address possible flaky
Oct 8, 2025
a1338ac
Merge branch 'migrate-dy-scheduler-6' of github.com:GitHK/osparc-simc…
Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@
get_step_group_proxy,
get_step_store_proxy,
)
from ._event_after_registration import (
register_to_start_after_on_created_completed,
register_to_start_after_on_undo_completed,
)
from ._lifespan import generic_scheduler_lifespan
from ._models import (
OperationName,
OperationToStart,
ProvidedOperationContext,
RequiredOperationContext,
ScheduleId,
Expand All @@ -36,8 +41,11 @@
"OperationContextProxy",
"OperationName",
"OperationRegistry",
"OperationToStart",
"ParallelStepGroup",
"ProvidedOperationContext",
"register_to_start_after_on_created_completed",
"register_to_start_after_on_undo_completed",
"RequiredOperationContext",
"restart_operation_step_stuck_during_undo",
"restart_operation_step_stuck_in_manual_intervention_during_create",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,27 @@
from ._errors import (
CannotCancelWhileWaitingForManualInterventionError,
NoDataFoundError,
OperationNotCancellableError,
StepNameNotInCurrentGroupError,
StepNotInErrorStateError,
StepNotWaitingForManualInterventionError,
UnexpectedStepHandlingError,
)
from ._event import enqueue_schedule_event
from ._event import (
enqueue_create_completed_event,
enqueue_schedule_event,
enqueue_undo_completed_event,
)
from ._event_after_registration import (
register_to_start_after_on_created_completed,
register_to_start_after_on_undo_completed,
)
from ._models import (
EventType,
OperationContext,
OperationErrorType,
OperationName,
OperationToStart,
ScheduleId,
StepName,
StepStatus,
Expand All @@ -52,6 +63,7 @@
from ._store import (
DeleteStepKeys,
OperationContextProxy,
OperationEventsProxy,
ScheduleDataStoreProxy,
StepGroupProxy,
StepStoreProxy,
Expand Down Expand Up @@ -79,7 +91,11 @@ def __init__(
self._store: Store = Store.get_from_app_state(app)

async def start_operation(
self, operation_name: OperationName, initial_operation_context: OperationContext
self,
operation_name: OperationName,
initial_operation_context: OperationContext,
on_create_completed: OperationToStart | None,
on_undo_completed: OperationToStart | None,
) -> ScheduleId:
"""start an operation by it's given name and providing an initial context"""
schedule_id: ScheduleId = f"{uuid4()}"
Expand Down Expand Up @@ -112,6 +128,16 @@ async def start_operation(
)
await operation_content_proxy.create_or_update(initial_operation_context)

if on_create_completed:
await register_to_start_after_on_created_completed(
self.app, schedule_id, to_start=on_create_completed
)

if on_undo_completed:
await register_to_start_after_on_undo_completed(
self.app, schedule_id=schedule_id, to_start=on_undo_completed
)

await enqueue_schedule_event(self.app, schedule_id)
return schedule_id

Expand Down Expand Up @@ -140,6 +166,10 @@ async def cancel_operation(self, schedule_id: ScheduleId) -> None:
group_index = await schedule_data_proxy.read("group_index")

operation = OperationRegistry.get_operation(operation_name)

if operation.is_cancellable is False:
raise OperationNotCancellableError(operation_name=operation_name)

group = operation[group_index]

group_step_proxies = get_group_step_proxies(
Expand All @@ -152,15 +182,14 @@ async def cancel_operation(self, schedule_id: ScheduleId) -> None:
)

# not allowed to cancel while waiting for manual intervention
if any(
await limited_gather(
*(
get_requires_manual_intervention(step)
for step in group_step_proxies.values()
),
limit=PARALLEL_REQUESTS,
)
):
require_manual_intervention = await limited_gather(
*(
get_requires_manual_intervention(step)
for step in group_step_proxies.values()
),
limit=PARALLEL_REQUESTS,
)
if any(require_manual_intervention):
raise CannotCancelWhileWaitingForManualInterventionError(
schedule_id=schedule_id
)
Expand Down Expand Up @@ -465,7 +494,7 @@ async def _advance_as_creating(
# CREATION logic:
# 1) if all steps in group in SUUCESS
# - 1a) -> move to next group
# - 1b) if reached the end of the CREATE operation -> remove all created data
# - 1b) if reached the end of the CREATE operation -> remove all created data [EMIT create complete event]
# 2) if manual intervention is required -> do nothing else
# 3) if any step in CANCELLED or FAILED (and not in manual intervention) -> move to undo

Expand All @@ -483,10 +512,33 @@ async def _advance_as_creating(
await enqueue_schedule_event(self.app, schedule_id)
except IndexError:

# 1b) if reached the end of the CREATE operation -> remove all created data
# 1b) if reached the end of the CREATE operation -> remove all created data [EMIT create complete event]
on_created_proxy = OperationEventsProxy(
self._store, schedule_id, EventType.ON_CREATED_COMPLETED
)
on_create_operation_name: OperationName | None = None
on_create_initial_context: OperationContext | None = None
if await on_created_proxy.exists():
on_create_operation_name = await on_created_proxy.read(
"operation_name"
)
on_create_initial_context = await on_created_proxy.read(
"initial_context"
)

await cleanup_after_finishing(
self._store, schedule_id=schedule_id, is_creating=True
)
if (
on_create_operation_name is not None
and on_create_initial_context is not None
):
await enqueue_create_completed_event(
self.app,
schedule_id,
on_create_operation_name,
on_create_initial_context,
)

return

Expand Down Expand Up @@ -551,7 +603,7 @@ async def _advance_as_undoing(
) -> None:
# UNDO logic:
# 1) if all steps in group in SUCCESS
# - 1a) if reached the end of the UNDO operation -> remove all created data
# - 1a) if reached the end of the UNDO operation -> remove all created data [EMIT undo complete event]
# - 1b) -> move to previous group
# 2) it is unexpected to have a FAILED step -> do nothing else
# 3) it is unexpected to have a CANCELLED step -> do nothing else
Expand All @@ -561,10 +613,31 @@ async def _advance_as_undoing(
previous_group_index = group_index - 1
if previous_group_index < 0:

# 1a) if reached the end of the UNDO operation -> remove all created data
# 1a) if reached the end of the UNDO operation -> remove all created data [EMIT undo complete event]
on_undo_proxy = OperationEventsProxy(
self._store, schedule_id, EventType.ON_UNDO_COMPLETED
)
on_undo_operation_name: OperationName | None = None
on_undo_initial_context: OperationContext | None = None
if await on_undo_proxy.exists():
on_undo_operation_name = await on_undo_proxy.read("operation_name")
on_undo_initial_context = await on_undo_proxy.read(
"initial_context"
)

await cleanup_after_finishing(
self._store, schedule_id=schedule_id, is_creating=False
)
if (
on_undo_operation_name is not None
and on_undo_initial_context is not None
):
await enqueue_undo_completed_event(
self.app,
schedule_id,
on_undo_operation_name,
on_undo_initial_context,
)
return

# 1b) -> move to previous group
Expand Down Expand Up @@ -634,9 +707,15 @@ async def start_operation(
app: FastAPI,
operation_name: OperationName,
initial_operation_context: OperationContext,
*,
on_create_completed: OperationToStart | None = None,
on_undo_completed: OperationToStart | None = None,
) -> ScheduleId:
return await Core.get_from_app_state(app).start_operation(
operation_name, initial_operation_context
operation_name,
initial_operation_context,
on_create_completed,
on_undo_completed,
)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import TYPE_CHECKING

from fastapi import FastAPI

if TYPE_CHECKING:
from ._core import Core
from ._event_after import AfterEventManager
from ._event_scheduler import EventScheduler

# NOTE:
# Due to circular dependencies it is not possible to use the following:
# - `Core.get_from_app_state(app)`
# - `AfterEventManager.get_from_app_state(app)`
# - `EventScheduler.get_from_app_state(app)`
# This module avoids issues with circular dependencies


def get_core(app: FastAPI) -> "Core":
core: Core = app.state.generic_scheduler_core
return core


def get_after_event_manager(app: FastAPI) -> "AfterEventManager":
after_event_manager: AfterEventManager = app.state.after_event_manager
return after_event_manager


def get_event_scheduler(app: FastAPI) -> "EventScheduler":
event_scheduler: EventScheduler = app.state.generic_scheduler_event_scheduler
return event_scheduler
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class InitialOperationContextKeyNotAllowedError(BaseGenericSchedulerError):
)


class OperationNotCancellableError(BaseGenericSchedulerError):
msg_template: str = "Operation '{operation_name}' is not cancellable"


class CannotCancelWhileWaitingForManualInterventionError(BaseGenericSchedulerError):
msg_template: str = (
"Cannot cancel schedule_id='{schedule_id}' while one or more steps are waiting for manual intervention."
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,42 @@
from typing import TYPE_CHECKING

from fastapi import FastAPI

from ._models import ScheduleId

if TYPE_CHECKING:
from ._event_scheduler import EventScheduler
from ._dependencies import get_event_scheduler
from ._event_base_queue import OperationToStartEvent
from ._event_queues import CreateCompletedQueue, ScheduleQueue, UndoCompletedQueue
from ._models import OperationContext, OperationName, ScheduleId


async def enqueue_schedule_event(app: FastAPI, schedule_id: ScheduleId) -> None:
event_scheduler: EventScheduler = app.state.generic_scheduler_event_scheduler
await event_scheduler.enqueue_schedule_event(schedule_id)
await get_event_scheduler(app).enqueue_message_for(ScheduleQueue, schedule_id)


async def enqueue_create_completed_event(
app: FastAPI,
schedule_id: ScheduleId,
operation_name: OperationName,
initial_context: OperationContext,
) -> None:
await get_event_scheduler(app).enqueue_message_for(
CreateCompletedQueue,
OperationToStartEvent(
schedule_id=schedule_id,
operation_name=operation_name,
initial_context=initial_context,
),
)


async def enqueue_undo_completed_event(
app: FastAPI,
schedule_id: ScheduleId,
operation_name: OperationName,
initial_context: OperationContext,
) -> None:
await get_event_scheduler(app).enqueue_message_for(
UndoCompletedQueue,
OperationToStartEvent(
schedule_id=schedule_id,
operation_name=operation_name,
initial_context=initial_context,
),
)
Loading
Loading