From 09463c4542bb0439b6353a70dcf22d5fafa82565 Mon Sep 17 00:00:00 2001 From: Chris Sewell Date: Fri, 5 Mar 2021 00:51:20 +0100 Subject: [PATCH 1/5] =?UTF-8?q?=F0=9F=90=9B=20FIX:=20Task.cancel`=20should?= =?UTF-8?q?=20not=20set=20state=20as=20EXCEPTED?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- aiida/engine/processes/calcjobs/tasks.py | 3 ++- aiida/engine/transports.py | 4 ++++ aiida/manage/external/rmq.py | 5 +++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/aiida/engine/processes/calcjobs/tasks.py b/aiida/engine/processes/calcjobs/tasks.py index 721b5317f9..47fbd37520 100644 --- a/aiida/engine/processes/calcjobs/tasks.py +++ b/aiida/engine/processes/calcjobs/tasks.py @@ -8,6 +8,7 @@ # For further information please visit http://www.aiida.net # ########################################################################### """Transport tasks for calculation jobs.""" +import asyncio import functools import logging import tempfile @@ -406,7 +407,7 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override else: logger.warning(f'killed CalcJob<{node.pk}> but async future was None') raise - except (plumpy.process_states.Interruption, plumpy.futures.CancelledError): + except (plumpy.process_states.Interruption, plumpy.futures.CancelledError, asyncio.CancelledError): node.set_process_status(f'Transport task {command} was interrupted') raise else: diff --git a/aiida/engine/transports.py b/aiida/engine/transports.py index b722140834..575472bb4b 100644 --- a/aiida/engine/transports.py +++ b/aiida/engine/transports.py @@ -108,6 +108,10 @@ def do_open(): try: transport_request.count += 1 yield transport_request.future + except asyncio.CancelledError: # pylint: disable=try-except-raise + # note this is only required in python<=3.7, + # where asyncio.CancelledError inherits from Exception + raise except Exception: _LOGGER.error('Exception whilst using transport:\n%s', traceback.format_exc()) raise diff --git a/aiida/manage/external/rmq.py b/aiida/manage/external/rmq.py index f2069603ab..c7cccfd149 100644 --- a/aiida/manage/external/rmq.py +++ b/aiida/manage/external/rmq.py @@ -9,6 +9,7 @@ ########################################################################### # pylint: disable=cyclic-import """Components to communicate tasks to RabbitMQ.""" +import asyncio from collections.abc import Mapping import logging import traceback @@ -209,6 +210,10 @@ async def _continue(self, communicator, pid, nowait, tag=None): message = 'the class of the process could not be imported.' self.handle_continue_exception(node, exception, message) raise + except asyncio.CancelledError: # pylint: disable=try-except-raise + # note this is only required in python<=3.7, + # where asyncio.CancelledError inherits from Exception + raise except Exception as exception: message = 'failed to recreate the process instance in order to continue it.' self.handle_continue_exception(node, exception, message) From 43236cc01f1186796fb5a326c3ad3c04428bdd2d Mon Sep 17 00:00:00 2001 From: Chris Sewell Date: Tue, 9 Mar 2021 02:40:33 +0100 Subject: [PATCH 2/5] Apply suggestions from code review --- aiida/engine/processes/calcjobs/tasks.py | 5 ++++- aiida/engine/transports.py | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/aiida/engine/processes/calcjobs/tasks.py b/aiida/engine/processes/calcjobs/tasks.py index 47fbd37520..cf667bc519 100644 --- a/aiida/engine/processes/calcjobs/tasks.py +++ b/aiida/engine/processes/calcjobs/tasks.py @@ -407,7 +407,10 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override else: logger.warning(f'killed CalcJob<{node.pk}> but async future was None') raise - except (plumpy.process_states.Interruption, plumpy.futures.CancelledError, asyncio.CancelledError): + except (plumpy.futures.CancelledError, asyncio.CancelledError): + node.set_process_status(f'Transport task {command} was cancelled') + raise + except plumpy.process_states.Interruption: node.set_process_status(f'Transport task {command} was interrupted') raise else: diff --git a/aiida/engine/transports.py b/aiida/engine/transports.py index 575472bb4b..d301235e27 100644 --- a/aiida/engine/transports.py +++ b/aiida/engine/transports.py @@ -111,6 +111,7 @@ def do_open(): except asyncio.CancelledError: # pylint: disable=try-except-raise # note this is only required in python<=3.7, # where asyncio.CancelledError inherits from Exception + _LOGGER.debug('Transport task cancelled') raise except Exception: _LOGGER.error('Exception whilst using transport:\n%s', traceback.format_exc()) From 4feccf65327499a6177e5918b3de761fff298386 Mon Sep 17 00:00:00 2001 From: Chris Sewell Date: Tue, 9 Mar 2021 15:20:47 +0100 Subject: [PATCH 3/5] Update kiwipy/plumpy versions --- environment.yml | 4 ++-- requirements/requirements-py-3.7.txt | 4 ++-- requirements/requirements-py-3.8.txt | 4 ++-- requirements/requirements-py-3.9.txt | 4 ++-- setup.json | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/environment.yml b/environment.yml index 09b03959f8..1f5d72c388 100644 --- a/environment.yml +++ b/environment.yml @@ -21,11 +21,11 @@ dependencies: - ipython~=7.20 - jinja2~=2.10 - jsonschema~=3.0 -- kiwipy[rmq]~=0.7.3 +- kiwipy[rmq]~=0.7.4 - numpy~=1.17 - pamqp~=2.3 - paramiko>=2.7.2,~=2.7 -- plumpy~=0.18.6 +- plumpy~=0.19.0 - pgsu~=0.1.0 - psutil~=5.6 - psycopg2>=2.8.3,~=2.8 diff --git a/requirements/requirements-py-3.7.txt b/requirements/requirements-py-3.7.txt index 49c846ca68..b1decfa255 100644 --- a/requirements/requirements-py-3.7.txt +++ b/requirements/requirements-py-3.7.txt @@ -57,7 +57,7 @@ jupyter-console==6.2.0 jupyter-core==4.7.1 jupyterlab-pygments==0.1.2 jupyterlab-widgets==1.0.0 -kiwipy==0.7.3 +kiwipy==0.7.4 kiwisolver==1.3.1 Mako==1.1.4 MarkupSafe==1.1.1 @@ -88,7 +88,7 @@ pickleshare==0.7.5 Pillow==8.1.0 plotly==4.14.3 pluggy==0.13.1 -plumpy==0.18.6 +plumpy==0.19.0 prometheus-client==0.9.0 prompt-toolkit==3.0.14 psutil==5.8.0 diff --git a/requirements/requirements-py-3.8.txt b/requirements/requirements-py-3.8.txt index 8ab1f0cd09..4d2326794d 100644 --- a/requirements/requirements-py-3.8.txt +++ b/requirements/requirements-py-3.8.txt @@ -56,7 +56,7 @@ jupyter-console==6.2.0 jupyter-core==4.7.1 jupyterlab-pygments==0.1.2 jupyterlab-widgets==1.0.0 -kiwipy==0.7.3 +kiwipy==0.7.4 kiwisolver==1.3.1 Mako==1.1.4 MarkupSafe==1.1.1 @@ -87,7 +87,7 @@ pickleshare==0.7.5 Pillow==8.1.0 plotly==4.14.3 pluggy==0.13.1 -plumpy==0.18.6 +plumpy==0.19.0 prometheus-client==0.9.0 prompt-toolkit==3.0.14 psutil==5.8.0 diff --git a/requirements/requirements-py-3.9.txt b/requirements/requirements-py-3.9.txt index f050094ca4..5bcba80782 100644 --- a/requirements/requirements-py-3.9.txt +++ b/requirements/requirements-py-3.9.txt @@ -56,7 +56,7 @@ jupyter-console==6.2.0 jupyter-core==4.7.1 jupyterlab-pygments==0.1.2 jupyterlab-widgets==1.0.0 -kiwipy==0.7.3 +kiwipy==0.7.4 kiwisolver==1.3.1 Mako==1.1.4 MarkupSafe==1.1.1 @@ -87,7 +87,7 @@ pickleshare==0.7.5 Pillow==8.1.0 plotly==4.14.3 pluggy==0.13.1 -plumpy==0.18.6 +plumpy==0.19.0 prometheus-client==0.9.0 prompt-toolkit==3.0.14 psutil==5.8.0 diff --git a/setup.json b/setup.json index 9dac042c00..1c62482f47 100644 --- a/setup.json +++ b/setup.json @@ -35,11 +35,11 @@ "ipython~=7.20", "jinja2~=2.10", "jsonschema~=3.0", - "kiwipy[rmq]~=0.7.3", + "kiwipy[rmq]~=0.7.4", "numpy~=1.17", "pamqp~=2.3", "paramiko~=2.7,>=2.7.2", - "plumpy~=0.18.6", + "plumpy~=0.19.0", "pgsu~=0.1.0", "psutil~=5.6", "psycopg2-binary~=2.8,>=2.8.3", From 6f041a56f7a185beefe7dc2f95f18eff9da68fcc Mon Sep 17 00:00:00 2001 From: Chris Sewell Date: Tue, 9 Mar 2021 18:20:48 +0100 Subject: [PATCH 4/5] add test --- tests/engine/test_daemon.py | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/tests/engine/test_daemon.py b/tests/engine/test_daemon.py index fd9c64ff7b..eb4a867c0b 100644 --- a/tests/engine/test_daemon.py +++ b/tests/engine/test_daemon.py @@ -8,8 +8,35 @@ # For further information please visit http://www.aiida.net # ########################################################################### """Test daemon module.""" -from aiida.backends.testbase import AiidaTestCase +import asyncio +import pytest -class TestDaemon(AiidaTestCase): - """Testing the daemon.""" +from aiida.manage.manager import get_manager +from plumpy.process_states import ProcessState +from tests.utils import processes as test_processes + + +async def reach_waiting_state(process): + while process.state != ProcessState.WAITING: + await asyncio.sleep(0.1) + + +@pytest.mark.usefixtures('clear_database_before_test') +def test_cancel_process_task(): + """This test is designed to replicate how processes are cancelled in the current `shutdown_runner` callback. + + The `CancelledError` should bubble up to the caller, and not be caught and transition the process to excepted. + """ + runner = get_manager().get_runner() + # create the process and start it running + process = runner.instantiate_process(test_processes.WaitProcess) + task = runner.loop.create_task(process.step_until_terminated()) + # wait for the process to reach a WAITING state + runner.loop.run_until_complete(asyncio.wait_for(reach_waiting_state(process), 5.0)) + # cancel the task and wait for the cancellation + task.cancel() + with pytest.raises(asyncio.CancelledError): + runner.loop.run_until_complete(asyncio.wait_for(task, 5.0)) + # the node should still record a waiting state, not excepted + assert process.node.process_state == ProcessState.WAITING From 87be4170869f68b447171024e8c1265c6ae53261 Mon Sep 17 00:00:00 2001 From: Chris Sewell Date: Tue, 9 Mar 2021 18:45:17 +0100 Subject: [PATCH 5/5] fix pre-commit --- tests/engine/test_daemon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/engine/test_daemon.py b/tests/engine/test_daemon.py index eb4a867c0b..53f6b4a20b 100644 --- a/tests/engine/test_daemon.py +++ b/tests/engine/test_daemon.py @@ -10,10 +10,10 @@ """Test daemon module.""" import asyncio +from plumpy.process_states import ProcessState import pytest from aiida.manage.manager import get_manager -from plumpy.process_states import ProcessState from tests.utils import processes as test_processes