|
52 | 52 | from ...models.comp_tasks import CompTaskAtDB |
53 | 53 | from ...utils.computations import get_pipeline_state_from_task_states |
54 | 54 | from ...utils.rabbitmq import ( |
| 55 | + publish_pipeline_scheduling_state, |
55 | 56 | publish_project_log, |
56 | 57 | publish_service_resource_tracking_heartbeat, |
57 | 58 | publish_service_resource_tracking_started, |
@@ -238,6 +239,19 @@ async def _set_run_result( |
238 | 239 | final_state=(run_result in COMPLETED_STATES), |
239 | 240 | ) |
240 | 241 |
|
| 242 | + if run_result in COMPLETED_STATES: |
| 243 | + # send event to notify the piipeline is done |
| 244 | + await publish_project_log( |
| 245 | + self.rabbitmq_client, |
| 246 | + user_id=user_id, |
| 247 | + project_id=project_id, |
| 248 | + log=f"Pipeline run {run_result.value} for iteration {iteration} is done with {run_result.value} state", |
| 249 | + log_level=logging.INFO, |
| 250 | + ) |
| 251 | + await publish_pipeline_scheduling_state( |
| 252 | + self.rabbitmq_client, user_id, project_id, run_result |
| 253 | + ) |
| 254 | + |
241 | 255 | async def _set_schedule_done( |
242 | 256 | self, |
243 | 257 | user_id: UserID, |
@@ -622,7 +636,7 @@ async def apply( |
622 | 636 | ) |
623 | 637 | # 3. do we want to stop the pipeline now? |
624 | 638 | if comp_run.cancelled: |
625 | | - await self._schedule_tasks_to_stop( |
| 639 | + comp_tasks = await self._schedule_tasks_to_stop( |
626 | 640 | user_id, project_id, comp_tasks, comp_run |
627 | 641 | ) |
628 | 642 | else: |
@@ -710,20 +724,29 @@ async def _schedule_tasks_to_stop( |
710 | 724 | project_id: ProjectID, |
711 | 725 | comp_tasks: dict[NodeIDStr, CompTaskAtDB], |
712 | 726 | comp_run: CompRunsAtDB, |
713 | | - ) -> None: |
714 | | - # get any running task and stop them |
| 727 | + ) -> dict[NodeIDStr, CompTaskAtDB]: |
| 728 | + # NOTE: tasks that were not yet started but can be marked as ABORTED straight away, |
| 729 | + # the tasks that are already processing need some time to stop |
| 730 | + # and we need to stop them in the backend |
| 731 | + tasks_instantly_stopeable = [ |
| 732 | + t for t in comp_tasks.values() if t.state in TASK_TO_START_STATES |
| 733 | + ] |
715 | 734 | comp_tasks_repo = CompTasksRepository.instance(self.db_engine) |
716 | 735 | await ( |
717 | 736 | comp_tasks_repo.mark_project_published_waiting_for_cluster_tasks_as_aborted( |
718 | 737 | project_id, comp_run.run_id |
719 | 738 | ) |
720 | 739 | ) |
| 740 | + for task in tasks_instantly_stopeable: |
| 741 | + comp_tasks[f"{task}"].state = RunningState.ABORTED |
721 | 742 | # stop any remaining running task, these are already submitted |
722 | 743 | if tasks_to_stop := [ |
723 | 744 | t for t in comp_tasks.values() if t.state in PROCESSING_STATES |
724 | 745 | ]: |
725 | 746 | await self._stop_tasks(user_id, tasks_to_stop, comp_run) |
726 | 747 |
|
| 748 | + return comp_tasks |
| 749 | + |
727 | 750 | async def _schedule_tasks_to_start( # noqa: C901 |
728 | 751 | self, |
729 | 752 | user_id: UserID, |
|
0 commit comments