Skip to content

Commit 6ecb5a1

Browse files
committed
instantly stop non started tasks
publish event on pipeline status update
1 parent d8a1fd0 commit 6ecb5a1

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
from ...models.comp_tasks import CompTaskAtDB
5353
from ...utils.computations import get_pipeline_state_from_task_states
5454
from ...utils.rabbitmq import (
55+
publish_pipeline_scheduling_state,
5556
publish_project_log,
5657
publish_service_resource_tracking_heartbeat,
5758
publish_service_resource_tracking_started,
@@ -238,6 +239,19 @@ async def _set_run_result(
238239
final_state=(run_result in COMPLETED_STATES),
239240
)
240241

242+
if run_result in COMPLETED_STATES:
243+
# send event to notify the piipeline is done
244+
await publish_project_log(
245+
self.rabbitmq_client,
246+
user_id=user_id,
247+
project_id=project_id,
248+
log=f"Pipeline run {run_result.value} for iteration {iteration} is done with {run_result.value} state",
249+
log_level=logging.INFO,
250+
)
251+
await publish_pipeline_scheduling_state(
252+
self.rabbitmq_client, user_id, project_id, run_result
253+
)
254+
241255
async def _set_schedule_done(
242256
self,
243257
user_id: UserID,
@@ -622,7 +636,7 @@ async def apply(
622636
)
623637
# 3. do we want to stop the pipeline now?
624638
if comp_run.cancelled:
625-
await self._schedule_tasks_to_stop(
639+
comp_tasks = await self._schedule_tasks_to_stop(
626640
user_id, project_id, comp_tasks, comp_run
627641
)
628642
else:
@@ -710,20 +724,29 @@ async def _schedule_tasks_to_stop(
710724
project_id: ProjectID,
711725
comp_tasks: dict[NodeIDStr, CompTaskAtDB],
712726
comp_run: CompRunsAtDB,
713-
) -> None:
714-
# get any running task and stop them
727+
) -> dict[NodeIDStr, CompTaskAtDB]:
728+
# NOTE: tasks that were not yet started but can be marked as ABORTED straight away,
729+
# the tasks that are already processing need some time to stop
730+
# and we need to stop them in the backend
731+
tasks_instantly_stopeable = [
732+
t for t in comp_tasks.values() if t.state in TASK_TO_START_STATES
733+
]
715734
comp_tasks_repo = CompTasksRepository.instance(self.db_engine)
716735
await (
717736
comp_tasks_repo.mark_project_published_waiting_for_cluster_tasks_as_aborted(
718737
project_id, comp_run.run_id
719738
)
720739
)
740+
for task in tasks_instantly_stopeable:
741+
comp_tasks[f"{task}"].state = RunningState.ABORTED
721742
# stop any remaining running task, these are already submitted
722743
if tasks_to_stop := [
723744
t for t in comp_tasks.values() if t.state in PROCESSING_STATES
724745
]:
725746
await self._stop_tasks(user_id, tasks_to_stop, comp_run)
726747

748+
return comp_tasks
749+
727750
async def _schedule_tasks_to_start( # noqa: C901
728751
self,
729752
user_id: UserID,

services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from models_library.projects_nodes_io import NodeID
66
from models_library.projects_state import RunningState
77
from models_library.rabbitmq_messages import (
8+
ComputationalPipelineStatusMessage,
89
InstrumentationRabbitMessage,
910
LoggerRabbitMessage,
1011
ProgressRabbitMessageNode,
@@ -197,3 +198,17 @@ async def publish_project_log(
197198
log_level=log_level,
198199
)
199200
await rabbitmq_client.publish(message.channel_name, message)
201+
202+
203+
async def publish_pipeline_scheduling_state(
204+
rabbitmq_client: RabbitMQClient,
205+
user_id: UserID,
206+
project_id: ProjectID,
207+
state: RunningState,
208+
) -> None:
209+
message = ComputationalPipelineStatusMessage.model_construct(
210+
user_id=user_id,
211+
project_id=project_id,
212+
run_result=state,
213+
)
214+
await rabbitmq_client.publish(message.channel_name, message)

0 commit comments

Comments
 (0)