forked from aiidateam/aiida-pythonjob
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpyfunction.py
More file actions
172 lines (132 loc) · 5.18 KB
/
pyfunction.py
File metadata and controls
172 lines (132 loc) · 5.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""
PyFunction
===============
"""
# %%#
# Default and custom outputs
# --------------------------
#
# The ``@pyfunction`` decorator turns a Python function into an AiiDA ``PyFunction``.
# By default, the entire return value is stored in a single output node named ``result``.
#
from aiida import load_profile
from aiida.engine import run_get_node
from aiida_pythonjob import pyfunction, spec
load_profile()
@pyfunction()
def add_and_subtract(x, y):
return {"sum": x + y, "diff": x - y}
result, node = run_get_node(add_and_subtract, x=1, y=2)
print("Default result: ", result)
# %%#
# You can specify the ``outputs`` parameter to unpack a returned dictionary into
# separate output nodes.
#
@pyfunction(outputs=spec.namespace(sum=any, diff=any))
def add_and_subtract(x, y):
return {"sum": x + y, "diff": x - y}
result, node = run_get_node(add_and_subtract, x=1, y=2)
print("Unpacked results: ")
print("sum: ", result["sum"])
print("diff: ", result["diff"])
# %%#
# Advanced features
# -----------------
# ``pyfunction`` supports many advanced features for data handling and workflow control.
# These functionalities are shared with ``PythonJob``.
#
# .. seealso::
#
# For a detailed guide on the following topics, please refer to the :doc:`Common Concepts <common_concepts>`:
#
# - **Dynamic and Nested Namespaces**: For handling complex or variable outputs.
# - **Custom Exit Codes**: For robust error handling and workflow control.
# - **Data Serialization and Deserialization**: For working with custom data types like ``ase.Atoms`` or other AiiDA data nodes. # noqa: E501
#
# Here is a more complex example demonstrating a dynamic namespace output with custom data types.
from ase import Atoms # noqa: E402
from ase.build import bulk # noqa: E402
@pyfunction(outputs=spec.dynamic(Atoms))
def generate_structures(element: str, factors: list) -> dict:
"""Scale a bulk structure by the given factors."""
scaled_structures = {}
initial_structure = bulk(element)
for i, factor in enumerate(factors):
atoms = initial_structure.copy()
atoms.set_cell(atoms.cell * factor, scale_atoms=True)
scaled_structures[f"s_{i}"] = atoms
return scaled_structures
result, node = run_get_node(generate_structures, element="Al", factors=[0.95, 1.0, 1.05])
print("Generated scaled structures:")
for key, value in result.items():
print(key, value)
# %%
# Async functions
# ---------------
# ``pyfunction`` also supports Python's ``async`` functions. This is a powerful feature for
# tasks that are I/O-bound (e.g., waiting for network requests, file operations) or for
# running multiple tasks concurrently without blocking the AiiDA daemon.
#
# When you ``submit`` an async function, the call returns immediately with a process node,
# allowing your script to continue running while the function executes in the background.
#
from aiida.engine import submit
import datetime
from aiida_pythonjob import prepare_pyfunction_inputs, PyFunction
@pyfunction()
async def add_async(x, y, time: float):
"""A simple function that adds two numbers."""
import asyncio
# Simulate asynchronous I/O or computation
await asyncio.sleep(time)
return x + y
inputs = prepare_pyfunction_inputs(
add_async,
function_inputs={"x": 2, "y": 3, "time": 2.0},
)
node = submit(PyFunction, **inputs)
# %%#
# Killing an async process
# ~~~~~~~~~~~~~~~~~~~~~~~~
# Since async functions run as regular AiiDA processes, they can be controlled and killed
# programmatically. This is useful for managing long-running or stuck tasks.
# You can kill a running async function using the AiiDA command line interface.
#
# .. code-block:: bash
#
# $ verdi process kill <pk>
#
# Monitor external events
# ------------------------
#
# Async functions are particularly useful for monitoring external events or conditions without blocking the AiiDA daemon.
# Here is an example that waits until a specified time.
#
async def monitor_time(time: datetime.datetime, interval: float = 0.5, timeout: float = 60.0):
"""Monitor the current time until it reaches the specified target time."""
import asyncio
start_time = datetime.datetime.now()
while datetime.datetime.now() < time:
print("Waiting...")
await asyncio.sleep(interval)
if (datetime.datetime.now() - start_time).total_seconds() > timeout:
raise TimeoutError("Monitoring timed out.")
inputs = prepare_pyfunction_inputs(
monitor_time,
function_inputs={"time": datetime.datetime.now() + datetime.timedelta(seconds=5), "interval": 1.0},
)
node = submit(PyFunction, **inputs)
# %%
# For user's convenience, we provide a dedicated ``MonitorFunction`` class that inherits from ``PyFunction``.
# User only need to write normal function, which returns True when the monitoring condition is met.
from aiida_pythonjob import MonitorPyFunction
def monitor_time(time: datetime.datetime):
# return True when the current time is greater than the target time
return datetime.datetime.now() > time
inputs = prepare_pyfunction_inputs(
monitor_time,
function_inputs={"time": datetime.datetime.now() + datetime.timedelta(seconds=5)},
interval=1.0,
timeout=20.0,
)
node = submit(MonitorPyFunction, **inputs)