Skip to content

Commit bc25323

Browse files
authored
StashCalculation: a new CalcJob plugin (aiidateam#6772)
Historically, stashing was only possible, if it was instructed before running a generic calcjob. The instruction had to be "attached" to the original calcjob, like this for example: ```python inputs = { 'MyInputs': <MyInputs>, 'metadata': { 'computer': Computer.collection.get(label="localhost"), 'options': { 'resources': {'num_machines': 1}, 'stash': { 'stash_mode': StashMode.COPY.value, 'target_base': '/scratch/', 'source_list': ['heavy_data.xyz'], }, }, }, } run(MyCalculation, **inputs) ``` However, if a user would realize they need to stash something only after running a calcjob, this would not be possible. This commit, introduces a new calcjob, which is able to perform a stashing operation after a calculation is finished. The usage is very similar, and for consistency and user-friendliness, we keep the instruction as part of the metadata. The only main input is obviously a source node which is `RemoteData` node of the calculation to be stashed, for example: ```python StashCalculation_ = CalculationFactory('core.stash') MyCalculation = orm.load_node(pk=<PK>) inputs = { 'metadata': { 'computer': Computer.collection.get(label="localhost"), 'options': { 'resources': {'num_machines': 1}, 'stash': { 'stash_mode': StashMode.COPY.value, 'target_base': '/scratch/', 'source_list': ['heavy_data.xyz'], }, }, }, 'source_node': orm.RemoteData, } result = run(StashCalculation_, **inputs) ```
1 parent eb34b06 commit bc25323

File tree

6 files changed

+173
-10
lines changed

6 files changed

+173
-10
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ requires-python = '>=3.9'
6464

6565
[project.entry-points.'aiida.calculations']
6666
'core.arithmetic.add' = 'aiida.calculations.arithmetic.add:ArithmeticAddCalculation'
67+
'core.stash' = 'aiida.calculations.stash:StashCalculation'
6768
'core.templatereplacer' = 'aiida.calculations.templatereplacer:TemplatereplacerCalculation'
6869
'core.transfer' = 'aiida.calculations.transfer:TransferCalculation'
6970

src/aiida/calculations/stash.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
###########################################################################
2+
# Copyright (c), The AiiDA team. All rights reserved. #
3+
# This file is part of the AiiDA code. #
4+
# #
5+
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
6+
# For further information on the license, see the LICENSE.txt file #
7+
# For further information please visit http://www.aiida.net #
8+
###########################################################################
9+
""""""
10+
11+
from aiida import orm
12+
from aiida.common.datastructures import CalcInfo
13+
from aiida.engine import CalcJob
14+
15+
16+
class StashCalculation(CalcJob):
17+
"""
18+
Utility to stash files/folders from `RemoteData`, `SinglefileData`, or `FolderData`.
19+
20+
An example of how the input should look like:
21+
22+
.. code-block:: python
23+
24+
inputs = {
25+
'metadata': {
26+
'computer': Computer.collection.get(label="localhost"),
27+
'options': {
28+
'resources': {'num_machines': 1},
29+
'stash': {
30+
'stash_mode': StashMode.COPY.value,
31+
'target_base': '/scratch/my_stashing/',
32+
'source_list': ['aiida.in', '_aiidasubmit.sh'],
33+
},
34+
},
35+
},
36+
'source_node': node_1,
37+
}
38+
39+
Ideally one could use the same computer as the one of the `source_node`.
40+
However if you cannot access the stash storage from the same computer anymore
41+
but you have access to it from another computer, you can can specify the computer in `metadata.computer`.
42+
"""
43+
44+
def __init__(self, *args, **kwargs):
45+
super().__init__(*args, **kwargs)
46+
47+
@classmethod
48+
def define(cls, spec):
49+
super().define(spec)
50+
51+
spec.input(
52+
'source_node',
53+
valid_type=orm.RemoteData,
54+
required=True,
55+
help='',
56+
)
57+
58+
# Code is irrelevant for this calculation.
59+
spec.inputs.pop('code', None)
60+
61+
spec.inputs['metadata']['computer'].required = True
62+
spec.inputs['metadata']['options']['stash'].required = True
63+
spec.inputs['metadata']['options']['stash']['stash_mode'].required = True
64+
spec.inputs['metadata']['options']['stash']['target_base'].required = True
65+
spec.inputs['metadata']['options']['stash']['source_list'].required = True
66+
spec.inputs['metadata']['options']['resources'].default = {
67+
'num_machines': 1,
68+
'num_mpiprocs_per_machine': 1,
69+
}
70+
71+
def prepare_for_submission(self, folder):
72+
calc_info = CalcInfo()
73+
calc_info.skip_submit = True
74+
75+
calc_info.codes_info = []
76+
calc_info.retrieve_list = []
77+
calc_info.local_copy_list = []
78+
calc_info.remote_copy_list = []
79+
calc_info.remote_symlink_list = []
80+
81+
return calc_info

src/aiida/calculations/transfer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ def define(cls, spec):
185185
help='All the nodes that contain files referenced in the instructions.',
186186
)
187187

188-
# The transfer just needs a computer, the code are resources are set here
188+
# The transfer just needs a computer, the code and resources are set here
189189
spec.inputs.pop('code', None)
190190
spec.inputs['metadata']['computer'].required = True
191191
spec.inputs['metadata']['options']['resources'].default = {

src/aiida/engine/daemon/execmanager.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -440,11 +440,19 @@ async def stash_calculation(calculation: CalcJobNode, transport: Transport) -> N
440440

441441
logger_extra = get_dblogger_extra(calculation)
442442

443+
if calculation.process_type == 'aiida.calculations:core.stash':
444+
remote_node = load_node(calculation.inputs.source_node.pk)
445+
uuid = remote_node.uuid
446+
source_basepath = Path(remote_node.get_remote_path())
447+
else:
448+
uuid = calculation.uuid
449+
source_basepath = Path(calculation.get_remote_workdir())
450+
443451
stash_options = calculation.get_option('stash')
444452
stash_mode = stash_options.get('stash_mode')
445453
source_list = stash_options.get('source_list', [])
446-
uuid = calculation.uuid
447-
source_basepath = Path(calculation.get_remote_workdir())
454+
target_base = Path(stash_options['target_base'])
455+
dereference = stash_options.get('dereference', False)
448456

449457
if not source_list:
450458
return
@@ -454,7 +462,7 @@ async def stash_calculation(calculation: CalcJobNode, transport: Transport) -> N
454462
)
455463

456464
if stash_mode == StashMode.COPY.value:
457-
target_basepath = Path(stash_options['target_base']) / uuid[:2] / uuid[2:4] / uuid[4:]
465+
target_basepath = target_base / uuid[:2] / uuid[2:4] / uuid[4:]
458466

459467
for source_filename in source_list:
460468
if transport.has_magic(source_filename):
@@ -475,7 +483,7 @@ async def stash_calculation(calculation: CalcJobNode, transport: Transport) -> N
475483
except (OSError, ValueError) as exception:
476484
EXEC_LOGGER.warning(f'failed to stash {source_filepath} to {target_filepath}: {exception}')
477485
# try to clean up in case of a failure
478-
await transport.rmtree_async(Path(stash_options['target_base']) / uuid[:2])
486+
await transport.rmtree_async(target_base / uuid[:2])
479487
else:
480488
EXEC_LOGGER.debug(f'stashed {source_filepath} to {target_filepath}')
481489

@@ -496,12 +504,10 @@ async def stash_calculation(calculation: CalcJobNode, transport: Transport) -> N
496504
# 'tar', 'tar.gz', 'tar.bz2', or 'tar.xz'
497505
compression_format = stash_mode
498506
file_name = uuid
499-
dereference = stash_options.get('dereference', False)
500-
target_basepath = Path(stash_options['target_base'])
501507
authinfo = calculation.get_authinfo()
502508
aiida_remote_base = authinfo.get_workdir().format(username=transport.whoami())
503509

504-
target_destination = str(target_basepath / file_name) + '.' + compression_format
510+
target_destination = str(target_base / file_name) + '.' + compression_format
505511

506512
remote_stash = RemoteStashCompressedData(
507513
computer=calculation.computer,

src/aiida/engine/processes/calcjobs/tasks.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,30 @@ def load_instance_state(self, saved_state, load_context):
481481
self._killing = None
482482

483483
async def execute(self) -> plumpy.process_states.State: # type: ignore[override]
484-
"""Override the execute coroutine of the base `Waiting` state."""
484+
"""Override the execute coroutine of the base `Waiting` state.
485+
Using the plumpy state machine the waiting state is repeatedly re-entered with different commands.
486+
The waiting state is not always the same instance, it could be re-instantiated when re-entering this method,
487+
therefor any newly created attribute in each command block
488+
(e.g. `SUBMIT_COMMAND`, `UPLOAD_COMMAND`, etc.) will be lost, and is not usable in other blocks.
489+
The advantage of this design, is that the sequence is interruptable,
490+
meaning, the process can potentially come back and start from where it left off.
491+
492+
The overall sequence is as follows:
493+
in case `skip_submit` is True:
494+
495+
UPLOAD -> STASH -> RETRIEVE
496+
| ^ | ^ | ^
497+
v | v | v |
498+
.. .. .. .. .. ..
499+
500+
otherwise:
501+
502+
UPLOAD -> SUBMIT -> UPDATE -> STASH -> RETRIEVE
503+
| ^ | ^ | ^ | ^ | ^
504+
v | v | v | v | v |
505+
.. .. .. .. .. .. .. .. .. ..
506+
"""
507+
485508
node = self.process.node
486509
transport_queue = self.process.runner.transport
487510
result: plumpy.process_states.State = self
@@ -493,7 +516,7 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override
493516
if self._command == UPLOAD_COMMAND:
494517
skip_submit = await self._launch_task(task_upload_job, self.process, transport_queue)
495518
if skip_submit:
496-
result = self.retrieve(monitor_result=self._monitor_result)
519+
result = self.stash(monitor_result=self._monitor_result)
497520
else:
498521
result = self.submit()
499522

tests/calculations/test_stash.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
###########################################################################
2+
# Copyright (c), The AiiDA team. All rights reserved. #
3+
# This file is part of the AiiDA code. #
4+
# #
5+
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
6+
# For further information on the license, see the LICENSE.txt file #
7+
# For further information please visit http://www.aiida.net #
8+
###########################################################################
9+
"""Tests for the `StashCalculation` plugin.
10+
11+
Note: testing the main functionality is done in via `test_execmanager.py`.
12+
Here, we mainly check for redirection, of the calcjob.
13+
"""
14+
15+
import pytest
16+
17+
from aiida import orm
18+
from aiida.common.datastructures import StashMode
19+
20+
21+
@pytest.mark.requires_rmq
22+
def test_stash_calculation_basic(fixture_sandbox, aiida_localhost, generate_calc_job, tmp_path):
23+
"""Test that the basic implementation of `StashCalculation` functions."""
24+
25+
target_base = tmp_path / 'target'
26+
source = tmp_path / 'source'
27+
source.mkdir()
28+
29+
inputs = {
30+
'metadata': {
31+
'computer': aiida_localhost,
32+
'options': {
33+
'resources': {'num_machines': 1},
34+
'stash': {
35+
'stash_mode': StashMode.COPY.value,
36+
'target_base': str(target_base),
37+
'source_list': ['*'],
38+
},
39+
},
40+
},
41+
'source_node': orm.RemoteData(computer=aiida_localhost, remote_path=str(source)),
42+
}
43+
entry_point_name = 'core.stash'
44+
calc_info = generate_calc_job(fixture_sandbox, entry_point_name, inputs)
45+
46+
assert calc_info.skip_submit is True
47+
48+
assert calc_info.codes_info == []
49+
assert calc_info.retrieve_list == []
50+
assert calc_info.local_copy_list == []
51+
assert calc_info.remote_copy_list == []
52+
assert calc_info.remote_symlink_list == []

0 commit comments

Comments
 (0)