Skip to content

Commit 474e0fa

Browse files
khsraliagoscinski
andauthored
Transport: Three bug fixed in test_execmanager and AsyncSshTransport (#6855)
This commit fixes, three bugs! Everything is squashed to one commit as they are co-dependent and separating them will not do much benefit. First bug fix: - Adopt `AsyncSshTransport` with the *COUNTER-INTUITIVE* and expected behavior of engine: `transport.copy()` should raise `FileNotFoundError` if remote source, or remote destination are not found. This later, will result in silent failure of a process, handled here: `execmanager.py`::_copy_remote_files Second bug fix: - `AsyncTransport::run_command_blocking` had to use the current event loop of the manager instead of creating a new one. This likely increases efficiency quite a lot, by removing unnecessarily dead locks. - `test_execmanager.py::node_and_calc_info` was not being run for `core.ssh_async` this was the reason that let go of these bugs un-notice. Fixed via this commit. Third bug fix: - Converts all sync tests in `test_execmanager.py` that where `runner.loop.run_until_complete()` into async tests. This prevents attaching a `Future` to a wrong event loop via `asyncssh` - Improves `event_loop` fixture: Formally always required a profile to be loaded, therefore the event loop was not cleaned before passing to a new test. Now passes `aiida_profile_clean` to `event_loop` fixture, so that a profile is loaded for all async tests, and also the event loop is cleaned for each test. This is done because `aiida_profile_clean` also triggers `manager.reset_profile()`. --------- Co-authored-by: Alexander Goscinski <[email protected]>
1 parent cf2614f commit 474e0fa

File tree

4 files changed

+60
-43
lines changed

4 files changed

+60
-43
lines changed

src/aiida/transports/plugins/ssh_async.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,8 @@ async def copy_async(
618618
:type preserve: bool
619619
620620
:raises: OSError, src does not exist or if the copy execution failed.
621+
:raises: FileNotFoundError, if either remotesource does not exists
622+
or remotedestination's parent path does not exists
621623
"""
622624

623625
remotesource = str(remotesource)
@@ -648,7 +650,8 @@ async def copy_async(
648650
)
649651
else:
650652
if not await self.path_exists_async(remotesource):
651-
raise OSError(f'The remote path {remotesource} does not exist')
653+
raise FileNotFoundError(f'The remote path {remotesource} does not exist')
654+
652655
await self._sftp.copy(
653656
remotesource,
654657
remotedestination,
@@ -657,6 +660,13 @@ async def copy_async(
657660
follow_symlinks=dereference,
658661
remote_only=True,
659662
)
663+
except asyncssh.sftp.SFTPNoSuchFile as exc:
664+
# note: one could just create directories, but aiida engine expects this behavior
665+
# see `execmanager.py`::_copy_remote_files for more details
666+
raise FileNotFoundError(
667+
f'The remote path {remotedestination} is not reachable,'
668+
f'perhaps the parent folder does not exists: {exc}'
669+
)
660670
except asyncssh.sftp.SFTPFailure as exc:
661671
raise OSError(f'Error while copying {remotesource} to {remotedestination}: {exc}')
662672
else:

src/aiida/transports/transport.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
"""Transport interface."""
1010

1111
import abc
12-
import asyncio
1312
import fnmatch
1413
import os
1514
import re
@@ -1793,7 +1792,11 @@ class AsyncTransport(Transport):
17931792
"""
17941793

17951794
def run_command_blocking(self, func, *args, **kwargs):
1796-
loop = asyncio.get_event_loop()
1795+
"""The event loop must be the one of manager."""
1796+
1797+
from aiida.manage import get_manager
1798+
1799+
loop = get_manager().get_runner()
17971800
return loop.run_until_complete(func(*args, **kwargs))
17981801

17991802
def open(self):

tests/conftest.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -525,8 +525,10 @@ def runner(manager):
525525

526526

527527
@pytest.fixture
528-
def event_loop(manager):
529-
"""Get the event loop instance of the currently loaded profile.
528+
def event_loop(manager, aiida_profile_clean):
529+
"""Get the event loop instance of a cleaned profile.
530+
This works, because ``aiida_profile_clean`` fixture, apart from loading a profile and cleaning it,
531+
and also triggers ``manager.reset_profile()`` which clears the event loop.
530532
531533
This is automatically called as a fixture for any test marked with ``@pytest.mark.asyncio``.
532534
"""

tests/engine/daemon/test_execmanager.py

Lines changed: 40 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
from aiida.common.datastructures import CalcInfo, CodeInfo, FileCopyOperation, StashMode
1717
from aiida.common.folders import SandboxFolder
1818
from aiida.engine.daemon import execmanager
19-
from aiida.manage import get_manager
2019
from aiida.orm import CalcJobNode, FolderData, PortableCode, RemoteData, SinglefileData
20+
from aiida.plugins import entry_point
2121
from aiida.transports.plugins.local import LocalTransport
2222

2323

@@ -41,14 +41,19 @@ def file_hierarchy_simple():
4141
}
4242

4343

44-
@pytest.fixture(params=('core.local', 'core.ssh'))
45-
def node_and_calc_info(aiida_localhost, aiida_computer_ssh, aiida_code_installed, request):
44+
@pytest.fixture(
45+
scope='function',
46+
params=[name for name in entry_point.get_entry_point_names('aiida.transports') if name.startswith('core.')],
47+
)
48+
def node_and_calc_info(aiida_localhost, aiida_computer_ssh, aiida_computer_ssh_async, aiida_code_installed, request):
4649
"""Return a ``CalcJobNode`` and associated ``CalcInfo`` instance."""
4750

4851
if request.param == 'core.local':
4952
node = CalcJobNode(computer=aiida_localhost)
5053
elif request.param == 'core.ssh':
5154
node = CalcJobNode(computer=aiida_computer_ssh())
55+
elif request.param == 'core.ssh_async':
56+
node = CalcJobNode(computer=aiida_computer_ssh_async())
5257
else:
5358
raise ValueError(f'unsupported transport: {request.param}')
5459

@@ -111,7 +116,8 @@ def test_hierarchy_utility(file_hierarchy, tmp_path, create_file_hierarchy, seri
111116
(['file_a.txt', 'file_u.txt', 'path/file_u.txt', ('path/sub/file_u.txt', '.', 3)], {'file_a.txt': 'file_a'}),
112117
),
113118
)
114-
def test_retrieve_files_from_list(
119+
@pytest.mark.asyncio
120+
async def test_retrieve_files_from_list(
115121
tmp_path_factory,
116122
generate_calcjob_node,
117123
file_hierarchy,
@@ -125,15 +131,15 @@ def test_retrieve_files_from_list(
125131
target = tmp_path_factory.mktemp('target')
126132

127133
create_file_hierarchy(file_hierarchy, source)
128-
runner = get_manager().get_runner()
129134

130135
with LocalTransport() as transport:
131136
node = generate_calcjob_node(workdir=source)
132-
runner.loop.run_until_complete(execmanager.retrieve_files_from_list(node, transport, target, retrieve_list))
137+
await execmanager.retrieve_files_from_list(node, transport, target, retrieve_list)
133138

134139
assert serialize_file_hierarchy(target, read_bytes=False) == expected_hierarchy
135140

136141

142+
@pytest.mark.asyncio
137143
@pytest.mark.parametrize(
138144
('local_copy_list', 'expected_hierarchy'),
139145
(
@@ -148,7 +154,7 @@ def test_retrieve_files_from_list(
148154
(['sub', 'target/another-sub'], {'target': {'another-sub': {'b': 'file_b'}}}),
149155
),
150156
)
151-
def test_upload_local_copy_list(
157+
async def test_upload_local_copy_list(
152158
fixture_sandbox,
153159
node_and_calc_info,
154160
file_hierarchy_simple,
@@ -168,8 +174,7 @@ def test_upload_local_copy_list(
168174
calc_info.local_copy_list = [[folder.uuid] + local_copy_list]
169175

170176
with node.computer.get_transport() as transport:
171-
runner = get_manager().get_runner()
172-
runner.loop.run_until_complete(execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox))
177+
await execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox)
173178

174179
# Check that none of the files were written to the repository of the calculation node, since they were communicated
175180
# through the ``local_copy_list``.
@@ -180,7 +185,8 @@ def test_upload_local_copy_list(
180185
assert written_hierarchy == expected_hierarchy
181186

182187

183-
def test_upload_local_copy_list_files_folders(
188+
@pytest.mark.asyncio
189+
async def test_upload_local_copy_list_files_folders(
184190
fixture_sandbox, node_and_calc_info, file_hierarchy, tmp_path, create_file_hierarchy, serialize_file_hierarchy
185191
):
186192
"""Test the ``local_copy_list`` functionality in ``upload_calculation``.
@@ -206,8 +212,7 @@ def test_upload_local_copy_list_files_folders(
206212
]
207213

208214
with node.computer.get_transport() as transport:
209-
runner = get_manager().get_runner()
210-
runner.loop.run_until_complete(execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox))
215+
await execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox)
211216

212217
# Check that none of the files were written to the repository of the calculation node, since they were communicated
213218
# through the ``local_copy_list``.
@@ -222,7 +227,8 @@ def test_upload_local_copy_list_files_folders(
222227
assert expected_hierarchy == written_hierarchy
223228

224229

225-
def test_upload_remote_symlink_list(
230+
@pytest.mark.asyncio
231+
async def test_upload_remote_symlink_list(
226232
fixture_sandbox, node_and_calc_info, file_hierarchy, tmp_path, create_file_hierarchy
227233
):
228234
"""Test the ``remote_symlink_list`` functionality in ``upload_calculation``.
@@ -238,8 +244,7 @@ def test_upload_remote_symlink_list(
238244
]
239245

240246
with node.computer.get_transport() as transport:
241-
runner = get_manager().get_runner()
242-
runner.loop.run_until_complete(execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox))
247+
await execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox)
243248

244249
filepath_workdir = pathlib.Path(node.get_remote_workdir())
245250
assert (filepath_workdir / 'file_a.txt').is_symlink()
@@ -270,7 +275,8 @@ def test_upload_remote_symlink_list(
270275
),
271276
),
272277
)
273-
def test_upload_file_copy_operation_order(node_and_calc_info, tmp_path, order, expected):
278+
@pytest.mark.asyncio
279+
async def test_upload_file_copy_operation_order(node_and_calc_info, tmp_path, order, expected):
274280
"""Test the ``CalcInfo.file_copy_operation_order`` controls the copy order."""
275281
node, calc_info = node_and_calc_info
276282

@@ -303,8 +309,7 @@ def test_upload_file_copy_operation_order(node_and_calc_info, tmp_path, order, e
303309
calc_info.file_copy_operation_order = order
304310

305311
with node.computer.get_transport() as transport:
306-
runner = get_manager().get_runner()
307-
runner.loop.run_until_complete(execmanager.upload_calculation(node, transport, calc_info, sandbox, inputs))
312+
await execmanager.upload_calculation(node, transport, calc_info, sandbox, inputs)
308313
filepath = pathlib.Path(node.get_remote_workdir()) / 'file.txt'
309314
assert filepath.is_file()
310315
assert filepath.read_text() == expected
@@ -377,6 +382,7 @@ def test_upload_file_copy_operation_order(node_and_calc_info, tmp_path, order, e
377382
),
378383
# Only remote copy of a single file to the "pseudo" directory
379384
# -> Copy fails silently since target directory does not exist: final directory structure is empty
385+
# COUNTER-INTUITIVE: the silent behavior is expected. See `execmanager.py`::_copy_remote_files for more details
380386
(
381387
{},
382388
(),
@@ -385,6 +391,7 @@ def test_upload_file_copy_operation_order(node_and_calc_info, tmp_path, order, e
385391
None,
386392
),
387393
# -> Copy fails silently since target directory does not exist: final directory structure is empty
394+
# COUNTER-INTUITIVE: the silent behavior is expected. See `execmanager.py`::_copy_remote_files for more details
388395
(
389396
{},
390397
(),
@@ -532,7 +539,8 @@ def test_upload_file_copy_operation_order(node_and_calc_info, tmp_path, order, e
532539
),
533540
],
534541
)
535-
def test_upload_combinations(
542+
@pytest.mark.asyncio
543+
async def test_upload_combinations(
536544
fixture_sandbox,
537545
node_and_calc_info,
538546
tmp_path,
@@ -598,27 +606,25 @@ def test_upload_combinations(
598606
calc_info.remote_copy_list.append(
599607
(node.computer.uuid, (sub_tmp_path_remote / source_path).as_posix(), target_path)
600608
)
601-
runner = get_manager().get_runner()
602609
if expected_exception is not None:
603610
with pytest.raises(expected_exception):
604611
with node.computer.get_transport() as transport:
605-
runner.loop.run_until_complete(
606-
execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox)
607-
)
612+
await execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox)
608613

609614
filepath_workdir = pathlib.Path(node.get_remote_workdir())
610615

611616
assert serialize_file_hierarchy(filepath_workdir, read_bytes=False) == expected_hierarchy
612617
else:
613618
with node.computer.get_transport() as transport:
614-
runner.loop.run_until_complete(execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox))
619+
await execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox)
615620

616621
filepath_workdir = pathlib.Path(node.get_remote_workdir())
617622

618623
assert serialize_file_hierarchy(filepath_workdir, read_bytes=False) == expected_hierarchy
619624

620625

621-
def test_upload_calculation_portable_code(fixture_sandbox, node_and_calc_info, tmp_path):
626+
@pytest.mark.asyncio
627+
async def test_upload_calculation_portable_code(fixture_sandbox, node_and_calc_info, tmp_path):
622628
"""Test ``upload_calculation`` with a ``PortableCode`` for different transports.
623629
624630
Regression test for https://github.com/aiidateam/aiida-core/issues/6518
@@ -639,14 +645,11 @@ def test_upload_calculation_portable_code(fixture_sandbox, node_and_calc_info, t
639645
calc_info.codes_info = [code_info]
640646

641647
with node.computer.get_transport() as transport:
642-
runner = get_manager().get_runner()
643-
runner.loop.run_until_complete(
644-
execmanager.upload_calculation(
645-
node,
646-
transport,
647-
calc_info,
648-
fixture_sandbox,
649-
)
648+
await execmanager.upload_calculation(
649+
node,
650+
transport,
651+
calc_info,
652+
fixture_sandbox,
650653
)
651654

652655

@@ -664,7 +667,8 @@ def test_upload_calculation_portable_code(fixture_sandbox, node_and_calc_info, t
664667
StashMode.COMPRESS_TARXZ.value,
665668
],
666669
)
667-
def test_stashing(
670+
@pytest.mark.asyncio
671+
async def test_stashing(
668672
generate_calcjob_node,
669673
stash_mode,
670674
file_hierarchy,
@@ -699,8 +703,6 @@ def test_stashing(
699703
},
700704
)
701705

702-
runner = get_manager().get_runner()
703-
704706
def mock_get_authinfo(*args, **kwargs):
705707
class MockAuthInfo:
706708
def get_workdir(self, *args, **kwargs):
@@ -716,7 +718,7 @@ def get_workdir(self, *args, **kwargs):
716718
# Here we using local transport we test basic functionality of `stash_calculation`.
717719

718720
with LocalTransport() as transport:
719-
runner.loop.run_until_complete(execmanager.stash_calculation(node, transport))
721+
await execmanager.stash_calculation(node, transport)
720722

721723
if stash_mode != StashMode.COPY.value:
722724
# more detailed test on integrity of the zip file is in `test_all_plugins.py`
@@ -762,7 +764,7 @@ async def mock_compress_async(*args, **kwargs):
762764
# no error should be raised
763765
# the error should only be logged to EXEC_LOGGER.warning and exit with 0
764766
with caplog.at_level(logging.WARNING):
765-
runner.loop.run_until_complete(execmanager.stash_calculation(node, transport))
767+
await execmanager.stash_calculation(node, transport)
766768
assert any('failed to stash' in message for message in caplog.messages)
767769

768770
# Ensure no files were created in the destination path after the error

0 commit comments

Comments
 (0)