-
Notifications
You must be signed in to change notification settings - Fork 32
✨🐛Computational Backend: Introduce Dask plugins for tasks lifecycle #7686
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
✨🐛Computational Backend: Introduce Dask plugins for tasks lifecycle #7686
Conversation
6857668 to
a954c4a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces Dask plugins to publish task lifecycle events and integrates them into the worker and scheduler setups, along with corresponding test scaffolding and minor scripting adjustments.
- Adds
TaskLifecycleWorkerPluginandTaskLifecycleSchedulerPluginto emit lifecycle events for each task. - Integrates the new plugins into
dask_setupfor both worker and scheduler, updating error handling and teardown logs. - Updates tests to cover the new plugins (though some tests are still stubs) and adjusts the LocalCluster fixture to preload the scheduler plugin.
- Tweaks a manual-intervention check in an autoscaling SSH script to include a time-based condition.
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| services/dask-sidecar/tests/unit/test_scheduler.py | Adds basic tests for task lifecycle event emission (with debug prints) |
| services/dask-sidecar/tests/unit/test_rabbitmq_plugin.py | Adds a stub test and a failure scenario for RabbitMQ plugin init |
| services/dask-sidecar/tests/unit/conftest.py | Configures LocalCluster to preload the scheduler plugin |
| services/dask-sidecar/src/simcore_service_dask_sidecar/worker.py | Registers and handles errors for TaskLifecycleWorkerPlugin |
| services/dask-sidecar/src/simcore_service_dask_sidecar/scheduler.py | Registers TaskLifecycleSchedulerPlugin and updates teardown log |
| services/dask-sidecar/src/simcore_service_dask_sidecar/task_life_cycle_worker_plugin.py | New worker plugin emitting task transitions |
| services/dask-sidecar/src/simcore_service_dask_sidecar/task_life_cycle_scheduler_plugin.py | New scheduler plugin emitting task transitions |
| scripts/maintenance/computational-clusters/autoscaled_monitor/ssh.py | Extends manual-intervention logic with a 2-minute age check |
Comments suppressed due to low confidence (4)
services/dask-sidecar/tests/unit/test_rabbitmq_plugin.py:21
- This test is currently just a stub (ellipsis) and has no assertions; implement its body or remove it to avoid misleading coverage results.
def test_rabbitmq_plugin_initializes(dask_client: distributed.Client): ...
services/dask-sidecar/tests/unit/test_rabbitmq_plugin.py:33
- The test only sleeps and does not assert any outcome; add assertions to verify that the worker actually closes or raises the expected exception.
async def test_dask_worker_closes_if_plugin_fails_on_start(
scripts/maintenance/computational-clusters/autoscaled_monitor/ssh.py:269
- The time comparison
containers[0].created_at - now > 2minis inverted. To check if a container is older than 2 minutes, usearrow.utcnow().datetime - containers[0].created_at > timedelta(...).
needs_manual_intervention=_needs_manual_intervention(containers) and ((containers[0].created_at - arrow.utcnow().datetime) > datetime.timedelta(minutes=2))
services/dask-sidecar/src/simcore_service_dask_sidecar/worker.py:83
- The bare
raiseis outside theexceptblock due to mis-indentation; this will cause a syntax error. It should be indented under theexceptor replaced with a specific exception.
raise
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #7686 +/- ##
==========================================
+ Coverage 87.56% 87.95% +0.38%
==========================================
Files 1814 1753 -61
Lines 70414 67136 -3278
Branches 1144 1144
==========================================
- Hits 61659 59047 -2612
+ Misses 8443 7777 -666
Partials 312 312
Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
bf97c8f to
3764bf7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request introduces a task lifecycle tracking system for Dask by adding worker and scheduler plugins, updating configurations to preload these plugins, and refactoring RabbitMQ integration to clarify its semantics.
- Introduces TaskLifecycleWorkerPlugin and TaskLifecycleSchedulerPlugin for tracking task state transitions.
- Updates Dask worker and scheduler configurations to integrate lifecycle plugins.
- Refactors RabbitMQ plugin naming and adjusts test setups to reflect new plugin structure.
Reviewed Changes
Copilot reviewed 20 out of 24 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| services/director-v2/modules/comp_scheduler/_scheduler_base.py | Modified task state triage and introduction of tuple return for changed and executing tasks. |
| services/director-v2/models/dask_subsystem.py | Cleanup: removed outdated Dask client task state enum. |
| services/dask-sidecar/* | Updated plugin imports and improved error messaging; added lifecycle event publishing support. |
| scripts/maintenance/computational-clusters/autoscaled_monitor/ssh.py | Enhanced manual intervention check with a time-based condition. |
| packages/dask-task-models-library/* | Added new plugins for worker and scheduler with updated mappings for task states. |
Files not reviewed (4)
- services/clusters-keeper/requirements/ci.txt: Language not supported
- services/clusters-keeper/requirements/dev.txt: Language not supported
- services/director-v2/requirements/_test.in: Language not supported
- services/director-v2/requirements/_test.txt: Language not supported
Comments suppressed due to low confidence (3)
services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_base.py:332
- The _get_changed_tasks_from_backend function now returns a tuple instead of a single list. Please ensure that any consumers of this API are updated and that the change is clearly documented.
return ( [ ... ], [ ... ] ),
scripts/maintenance/computational-clusters/autoscaled_monitor/ssh.py:268
- [nitpick] The additional time-based condition may be too strict if container creation times or timezone handling are inconsistent; please verify that the datetime comparison handles timezones correctly.
needs_manual_intervention=_needs_manual_intervention(containers) and ( (containers[0].created_at - arrow.utcnow().datetime) > datetime.timedelta(minutes=2) ),
services/dask-sidecar/src/simcore_service_dask_sidecar/rabbitmq_worker_plugin.py:101
- [nitpick] Consider revising the warning message for clarity and professionalism by removing informal language such as 'Beware!' and clarifying any pytest-specific behavior.
_logger.warning("RabbitMQ client plugin setup is not the main thread! Beware! if in pytest it's ok.")
cfe7113 to
81ec393
Compare
GitHK
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks
packages/dask-task-models-library/src/dask_task_models_library/models.py
Show resolved
Hide resolved
matusdrobuliak66
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
367f0cc to
1567bdd
Compare
pcrespov
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx!
...task-models-library/src/dask_task_models_library/plugins/task_life_cycle_scheduler_plugin.py
Show resolved
Hide resolved
...task-models-library/src/dask_task_models_library/plugins/task_life_cycle_scheduler_plugin.py
Show resolved
Hide resolved
...sk-task-models-library/src/dask_task_models_library/plugins/task_life_cycle_worker_plugin.py
Show resolved
Hide resolved
|



What do these changes do?
This pull request introduces a task lifecycle tracking system for Dask. The changes include the addition of plugins for tracking task states, configuration updates for Dask workers and the scheduler for testing.
This should fix the issue described in #7629 by tracking the state of each dask task using dask plugins facilities. Instead of relying solely on events that might be missed, the state of the tasks is kept in the dask-scheduler and can now be retrieved from the director-v2 at will. That information is kept in the dask-scheduler until it gets switched off.
Task Lifecycle Tracking
TaskLifecycleSchedulerPluginandTaskLifecycleWorkerPluginto track task state transitions on the scheduler and workers, respectively. These plugins log lifecycle events using theTaskLifeCycleStatemodel.RunningStateto standardize state representation.Dask Configuration Updates
TaskLifecycleWorkerPluginfor all worker types in tests.TaskLifecycleSchedulerPluginin tests.Test and Dependency Updates
simcore-dask-task-models-libraryas a dependency inci.txtanddev.txtfor the clusters-keeper service for testing purpose.Miscellaneous Improvements
_needs_manual_interventionfunction of the autoscaling monitoring script to include a time-based condition for container handling.Related issue/s
Bonus:
How to test
Dev-ops