-
Notifications
You must be signed in to change notification settings - Fork 32
🐛Improvements on pipeline cancellation and ensure pipeline state is consistent #7996
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛Improvements on pipeline cancellation and ensure pipeline state is consistent #7996
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #7996 +/- ##
==========================================
+ Coverage 87.39% 89.38% +1.98%
==========================================
Files 1578 1379 -199
Lines 62317 57741 -4576
Branches 1011 477 -534
==========================================
- Hits 54464 51610 -2854
+ Misses 7541 6010 -1531
+ Partials 312 121 -191
Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
wvangeit
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks for the effort @sanderegg.
Just a small remark. Isn't there a way to test some of this new functionality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I missed it, but I don't see any backend protection. I know this shouldn't happen from the frontend anymore, but I would still add a check on the backend when a pipeline is about to be scheduled - to ensure there's not already one scheduled or running in the system. If there is, it should return an error.
Ah now I see it, this is it right?
with contextlib.suppress(ComputationalRunNotFoundError):
last_run = await comp_runs_repo.get(
user_id=computation.user_id, project_id=computation.project_id
)
pipeline_state = last_run.result
if utils.is_pipeline_running(pipeline_state):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Project {computation.project_id} already started, current state is {pipeline_state}",
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks looks good! 👍 Maybe in the next iterations you can start to use the new comp_runs_snapshot_tasks
odeimaiz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Let's check tomorrow how these states are mapped to the Start/Stop buttons.
pcrespov
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx. Good analysis!
...eb/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py
Outdated
Show resolved
Hide resolved
services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_publisher.py
Show resolved
Hide resolved
93ed455 to
27eda3e
Compare
@wvangeit |
@matusdrobuliak66 there were actually already protections. The problem here originated in the fact that |
bisgaard-itis
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix
27eda3e to
38f0943
Compare
38f0943 to
55a9b2d
Compare
publish event on pipeline status update
9fddcb5 to
54551df
Compare
|
@mergify queue |
✅ The pull request has been merged automaticallyThe pull request has been merged automatically at 2f1b484 |
|



Context
While testing following issues were identified:
comp_taskstable due to the historical way we transmitted events from that table towards the frontend. This lead to inconsistencies where the frontend would already show the start button as enabled even though the computational scheduler was not finished scheduling. This would lead to have multiple times the same pipeline being scheduled, and this would lead to inconsistencies and some undefined behavior. This PR fixes this by always returning the pipeline state from thecomp_runstable instead so that this becomes the ground truth.What do these changes do?
This pull request introduces significant changes to the computation pipeline management and its integration with RabbitMQ messaging. Key updates include the addition of a new RabbitMQ message type for pipeline status, refactoring of pipeline state handling, and improved notification mechanisms for computational pipeline updates.
RabbitMQ Integration Enhancements:
ComputationalPipelineStatusMessageclass inmodels_library/rabbitmq_messages.pyto represent pipeline status messages, including a routing key based onproject_id.publish_pipeline_scheduling_statefunction inutils/rabbitmq.pyto send pipeline status updates via RabbitMQ._computational_pipeline_status_message_parserinnotifications/_rabbitmq_exclusive_queue_consumers.pyto handle pipeline status updates and notify users. [1] [2]Pipeline State Refactoring:
CompRunsRepositoryfor determining pipeline state across multiple endpoints incomputations.py. [1] [2] [3] [4]CompRunsRepositoryin various computation-related functions. [1] [2] [3]Scheduler Improvements:
_set_run_resultinmodules/comp_scheduler/_scheduler_base.pyto publish pipeline completion logs and scheduling state updates._schedule_tasks_to_stopto handle tasks that can be instantly marked as aborted and return updated task states.Code Cleanup:
_get_pipeline_at_dbinmodules/comp_scheduler/_manager.pyby removing redundant variable assignments.request_pipeline_schedulinginmodules/comp_scheduler/_publisher.py. [1] [2]Related issue/s
How to test
Dev-ops