-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworkflow_fixing.py
More file actions
415 lines (353 loc) · 15.3 KB
/
workflow_fixing.py
File metadata and controls
415 lines (353 loc) · 15.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
import os
from typing import Any
from pyiron_core.pyiron_workflow import as_function_node
import copy
import re
from typing import Any
from pyiron_core.pyiron_workflow import as_function_node
from pyiron_core.pyiron_workflow.graph.base import Graph
"""
move_node_path – pyiron_core workflow node
Replace a dot‑separated token sequence in the ``import_path`` value of every
node of a workflow graph. The function returns a **deep copy** of the input
graph, leaving the original untouched.
Parameters
----------
graph : Graph
The workflow graph (pyiron_core.pyiron_workflow.graph.base.Graph).
original : str
Token sequence to be replaced (may contain dots).
new : str
Replacement token sequence.
Returns
-------
Graph
A new graph instance with the updated ``import_path`` entries.
"""
def _replace_path(path: str, old: str, new: str) -> str:
"""
Replace ``old`` by ``new`` only when ``old`` matches a whole token
sequence in a dot‑separated import path.
* ``old`` may appear at the beginning, middle or end of the path.
* If ``new`` is the empty string, any resulting double‑dots are collapsed
to a single dot and a leading dot is removed.
Parameters
----------
path : str
Original import path (e.g. ``"pyiron_core.pyiron_nodes.atomistic.structure"``).
old : str
Token sequence to replace (may contain dots, e.g. ``"pyiron_nodes.atomistic"``).
new : str
Replacement token sequence. An empty string removes the token(s).
Returns
-------
str
The updated import path.
"""
# ``\\b`` is a word‑boundary. Tokens consist of letters, digits or
# underscores; the dot is a non‑word character, so a word‑boundary appears
# both before the first token and after the last token – this works for
# matches at the start of the string as well.
pattern = r"\b" + re.escape(old) + r"\b"
replaced = re.sub(pattern, new, path)
# If the replacement removed a token, we may end up with ".." or a leading "."
if new == "":
# Collapse any run of two or more dots to a single dot
replaced = re.sub(r"\.{2,}", ".", replaced)
# Strip a leading dot (but keep a trailing one if it exists – it would be
# an artefact of a trailing dot in the original path, which is unlikely)
replaced = replaced.lstrip(".")
return replaced
@as_function_node("move_node_path")
def move_node_path(graph: Graph, original: str, new: str) -> Graph:
"""
Workflow node that copies *graph* and rewrites all ``import_path`` values.
The node follows the same contract as a normal pyiron_core function node:
it receives the graph as the first argument and returns a new graph.
"""
# ------------------------------------------------------------------ #
# 1. Deep copy the whole graph – we must not mutate the input.
# ------------------------------------------------------------------ #
new_graph: Graph = copy.deepcopy(graph)
# ------------------------------------------------------------------ #
# 2. Iterate over every node and replace the import_path if present.
# ------------------------------------------------------------------ #
for node_id, node_data in new_graph.nodes.items():
# node_data behaves like a dict
if hasattr(node_data, "import_path"):
old_path = getattr(node_data, "import_path")
if isinstance(old_path, str):
setattr(node_data, "import_path", _replace_path(old_path, original, new))
if hasattr(node_data, "function"):
old_func = getattr(node_data, "function")
if isinstance(old_func, str):
setattr(node_data, "function",
_replace_path(old_func, original, new))
# ------------------------------------------------------------------ #
# 3. Return the modified copy.
# ------------------------------------------------------------------ #
return new_graph
"""
merge_paths – pyiron_core workflow node
Combine two filesystem‑style paths (e.g. “a/b” + “c/d.txt”) using the
appropriate OS separator and normalise the result. The node does **not**
interpret “.” as a token separator – it works with ordinary path strings.
Parameters
----------
path_a : str
The first (base) path.
path_b : str
The second (relative or additional) path.
Returns
-------
str
The merged, normalised path.
"""
def _norm_join(p1: str, p2: str) -> str:
"""
Join *p1* and *p2* with ``os.path.join`` and collapse any ``..`` or ``.``.
"""
# ``os.path.normpath`` removes redundant separators and resolves
# ``.`` / ``..`` components without touching the filesystem.
return os.path.normpath(os.path.join(p1, p2))
@as_function_node("merge_paths")
def merge_paths(path_a: str, path_b: str) -> str:
"""
Workflow node that merges two standard filesystem paths.
The function is deliberately tiny – the heavy lifting is delegated to
``_norm_join`` so the node stays pure and easily testable.
"""
# Guard against non‑string inputs (pyiron may forward ``None`` etc.).
if not isinstance(path_a, str) or not isinstance(path_b, str):
raise TypeError(
f"merge_paths expects two strings, got {type(path_a)} and {type(path_b)}"
)
return _norm_join(path_a, path_b)
"""
compare_and_write_graph
Node that compares the import_path attribute of the nodes that exist in both
graphs. If dry_run is True the node returns a list with the ids of the
different nodes. Otherwise the node writes *graph_b* to *path* as JSON by
using the internal ``_save_graph`` helper from ``pyiron_core``. When
backup is True a backup of an existing file is created first; if the backup
file already exists an exception is raised.
Only ASCII characters are used in comments.
"""
import shutil
from pathlib import Path
from typing import List, Any
from pyiron_core.pyiron_workflow import as_function_node
from pyiron_core.pyiron_workflow.graph.base import Graph
# internal helper that knows how to serialise a Graph to the standard
# pyiron JSON format
from pyiron_core.pyiron_workflow.graph.graph_json import _save_graph,_get_absolute_file_path
def _diff_nodes(g_a: Graph, g_b: Graph) -> List[Any]:
"""
Return a list with the ids of nodes that are present in both graphs and
have different ``import_path`` values. The attribute is accessed with
``getattr`` because the values stored in ``graph.nodes`` are objects, not
plain dictionaries.
"""
diff: List[Any] = []
common = set(g_a.nodes.keys()) & set(g_b.nodes.keys())
for nid in common:
a_path = getattr(g_a.nodes[nid], "import_path", None)
b_path = getattr(g_b.nodes[nid], "import_path", None)
if a_path != b_path:
diff.append(nid)
return diff
@as_function_node("compare_and_write_graph")
def compare_and_write_graph(
graph_a: Graph,
graph_b: Graph,
path: str | Path,
*,
dry_run: bool = True,
backup: bool = True,
) -> List[Any]:
"""
Compare two graphs and optionally write ``graph_b`` to a JSON file.
Parameters
----------
graph_a : Graph
Original graph.
graph_b : Graph
Graph that may contain updated import_path values.
path : str or pathlib.Path
Destination file for the JSON representation of ``graph_b``.
dry_run : bool, default True
If True only return the list of differing node ids.
backup : bool, default True
If True create a backup of an existing file before overwriting.
The backup name is ``<path>.bak``. If that file already exists
raise ``FileExistsError``.
Returns
-------
List[Any]
Node identifiers whose ``import_path`` differs. The list is returned
in both dry‑run and non‑dry‑run modes.
"""
# ------------------------------------------------------------------- #
# 1. Find nodes that differ in their import_path attribute.
# ------------------------------------------------------------------- #
differing = _diff_nodes(graph_a, graph_b)
# ------------------------------------------------------------------- #
# 2. Dry‑run mode – just report the differences.
# ------------------------------------------------------------------- #
if dry_run:
return differing
# ------------------------------------------------------------------- #
# 3. Non‑dry‑run – write only if there is at least one change.
# ------------------------------------------------------------------- #
if not differing:
return differing
# ------------------------------------------------------------------- #
# 4. Optional backup handling.
# ------------------------------------------------------------------- #
target_path = _get_absolute_file_path(path, None)
if backup and target_path.is_file():
backup_path=target_path.with_suffix(target_path.suffix + ".bak")
if backup_path.is_file():
raise FileExistsError(
f"Backup file '{backup_path}' already exists – aborting."
)
shutil.move(target_path, backup_path)
# ------------------------------------------------------------------- #
# 5. Save the graph using the internal pyiron helper.
# ``overwrite=True`` is safe because we have either created a backup
# or the caller explicitly allowed overwriting.
# ------------------------------------------------------------------- #
_save_graph(
graph=graph_b,
filename=path,
workflow_dir=None,
overwrite=not backup,
)
return differing
# --------------------------------------------------------------------------- #
# Utilities that operate on the JSON representation of a pyiron_core Graph.
# They use the existing helpers _get_absolute_file_path and _replace_path.
# --------------------------------------------------------------------------- #
import json
import shutil
from pathlib import Path
from typing import List, Tuple, Union
def _recurse_replace(
container: dict | list,
node_id: str,
old: str,
new: str,
changed: list[tuple[str, str]],
) -> None:
"""
Walk ``container`` (a dict or a list) and replace any string value that
belongs to the keys ``'import_path'`` or ``'function'``. When a change is
made the pair ``(node_id, key)`` is appended to ``changed``.
"""
if isinstance(container, dict):
for key, value in list(container.items()):
# ----- leaf that needs replacement ---------------------------------
if key in ("import_path", "function") and isinstance(value, str):
new_val = _replace_path(value, old, new)
if new_val != value:
container[key] = new_val
changed.append((node_id, key))
# ----- descend into nested structures -------------------------------
elif isinstance(value, (dict, list)):
_recurse_replace(value, f"{node_id}.{key}", old, new, changed)
elif isinstance(container, list):
for item in container:
if isinstance(item, (dict, list)):
_recurse_replace(item, node_id, old, new, changed)
from pyiron_core.pyiron_workflow.graph.graph_json import (
_get_absolute_file_path, # already defined in the library
)
# _replace_path is assumed to be imported from the module where it is defined
# --------------------------------------------------------------------------- #
# replace_in_graph_json
# --------------------------------------------------------------------------- #
@as_function_node
def replace_in_graph_json(
filename: Union[str, Path],
old: str,
new: str,
*,
workflow_dir: Union[str, Path, None] = None,
dryrun: bool = True,
backup: bool = True,
) -> List[Tuple[str, str]]:
"""
Load a graph JSON file, replace occurrences of ``old`` with ``new`` in the
values of the keys ``import_path`` and ``function``, and optionally write the
modified JSON back to disk.
Parameters
----------
filename : str | Path
Name of the JSON file (relative or absolute).
old : str
Sub‑string to be replaced.
new : str
Replacement string (may be empty).
workflow_dir : str | Path | None, optional
Passed to ``_get_absolute_file_path``.
dryrun : bool, default True
If True only return the list of changed items, do not write anything.
backup : bool, default True
If True create a backup of the existing file before overwriting.
The backup name is ``<file>.bak``; an existing backup raises
``FileExistsError``.
Returns
-------
List[Tuple[str, str]]
A list of ``(node_id, key)`` pairs where a change was made.
``node_id`` is the key of the enclosing dictionary (the node name) and
``key`` is either ``'import_path'`` or ``'function'``.
"""
# --------------------------------------------------------------- #
# 1. Resolve absolute path
# --------------------------------------------------------------- #
json_path: Path = _get_absolute_file_path(filename, workflow_dir)
# --------------------------------------------------------------- #
# 2. Load JSON
# --------------------------------------------------------------- #
with json_path.open("r", encoding="utf-8") as f:
state = json.load(f)
# The graph state is a dict that contains a ``nodes`` entry.
# Each node is itself a dict of attributes.
nodes = state.get("nodes", {})
changed: List[Tuple[str, str]] = []
# --------------------------------------------------------------- #
# 3. Walk through the whole JSON hierarchy and replace values
# --------------------------------------------------------------- #
# ``nodes`` is the dict that maps a node identifier to its attribute dict.
changed: list[tuple[str, str]] = []
for node_id, node_dict in nodes.items():
_recurse_replace(node_dict, node_id, old, new, changed)
# --------------------------------------------------------------- #
# 4. Dry‑run: just report changes
# --------------------------------------------------------------- #
if dryrun:
return changed
# --------------------------------------------------------------- #
# 5. No changes → nothing to write
# --------------------------------------------------------------- #
if not changed:
return changed
# --------------------------------------------------------------- #
# 6. Backup handling
# --------------------------------------------------------------- #
if backup and json_path.is_file():
backup_path = json_path.with_suffix(json_path.suffix + ".bak")
if backup_path.is_file():
raise FileExistsError(
f"Backup file '{backup_path}' already exists – aborting."
)
# move the existing file to the backup location
shutil.move(str(json_path), str(backup_path))
# --------------------------------------------------------------- #
# 7. Write the modified JSON (overwrite is safe now)
# --------------------------------------------------------------- #
json_path.parent.mkdir(parents=True, exist_ok=True)
with json_path.open("w", encoding="utf-8") as f:
json.dump(state, f, indent=4, ensure_ascii=False)
return changed